Over the past few years, Apache Kafka has emerged as the leading standard for data streaming. Fast forward to the present day: Kafka has achieved ubiquity, adopted by at least 80% of the Fortune 100. This widespread adoption is attributed to Kafka’s architecture, which goes beyond basic messaging. The versatility of Kafka’s architecture makes it extremely well-suited for streaming data at a massive “Internet” level, providing the fault tolerance and data consistency critical to supporting mission-critical applications.
Flink is a unified high-throughput batch and stream processing engine, known for its ability to handle continuous data streams on a large scale. It integrates seamlessly with Kafka and offers robust support for exactly-once semantics, ensuring that every event is processed exactly once, even in the midst of system failures. Flink appears as a natural choice as a stream processor for Kafka. Although Apache Flink enjoys significant success and popularity as a real-time data processing tool, accessing sufficient resources and current examples to learn Flink can be challenging.
In this article, I’ll walk you through the step-by-step process of integrating Kafka 2.13-3.7.0 with Flink 1.18.1 to consume topic data and process it within Flink on a single-node cluster. Ubuntu-22.04 LTS was used as the OS in the cluster.
Assumptions
- The system has a minimum of 8 GB RAM and 250 GB SSD along with Ubuntu-22.04.2 amd64 as the operating system.
- OpenJDK 11 is installed with
JAVA_HOME
environment variable configuration. - Python 3 or Python 2 along with Perl 5 is available on the system.
- A single node Apache Kafka-3.7.0 cluster is up and running with Apache Zookeeper -3.5.6. (Read how to set up a Kafka cluster here.).
Install and run Flink 1.18.1
- You can download the Flink-1.18.1 binary distribution here.
- Extract the archive flink-1.18.1-bin-scala_2.12.tgz on the terminal using
$ tar -xzf flink-1.18.1-bin-scala_2.12.tgz
. After successful extraction, the directory big-1.18.1 will be created. Make sure it’s insidebin/
,conf/
andexamples/
directories are available. - Go to
bin
directory through the terminal and execute$ ./bin/start-cluster.sh
to run a single-node Flink cluster. - Moreover, we can use Flink’s web interface to monitor the cluster status and running jobs by accessing the browser on port 8081.
- A Flink cluster can be stopped by executing
$ ./bin/stop-cluster.sh
.
List of dependent JARs
The following .jars should be included in the classpath/build file:
I created a basic Java program using Eclipse IDE 23-12 to continuously consume messages within Flink from a Kafka topic. Dummy string messages are posted to the topic using Kafka’s built-in kafka-console-publisher
the script. Upon arrival in the Flink mechanism, no data transformation occurs for each message. Instead, an additional string is simply appended to each message and printed for verification, ensuring that messages are continuously transmitted to Flink.
package com.dataview.flink;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dataview.flink.util.IKafkaConstants;
public class readFromKafkaTopic
public static void main(String[] args) throws Exception
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(IKafkaConstants.KAFKA_BROKERS)
.setTopics(IKafkaConstants.FIRST_TOPIC_NAME)
.setGroupId(IKafkaConstants.GROUP_ID_CONFIG)
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> messageStream = see.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
messageStream.rebalance().map(new MapFunction<String, String>()
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception
return "Kafka and Flink says: " + value;
).print();
see.execute();
The entire execution was recorded on the screen. If you are interested, you can look below:<
I hope you enjoyed reading this. Stay tuned for another upcoming article where I’ll explain how to transfer messages/data from Flink to a Kafka theme.