Streaming real-time data from Kafka 3.7.0 to Flink 1.18.1 for processing

Over the past few years, Apache Kafka has emerged as the leading standard for data streaming. Fast forward to the present day: Kafka has achieved ubiquity, adopted by at least 80% of the Fortune 100. This widespread adoption is attributed to Kafka’s architecture, which goes beyond basic messaging. The versatility of Kafka’s architecture makes it extremely well-suited for streaming data at a massive “Internet” level, providing the fault tolerance and data consistency critical to supporting mission-critical applications.

Flink is a unified high-throughput batch and stream processing engine, known for its ability to handle continuous data streams on a large scale. It integrates seamlessly with Kafka and offers robust support for exactly-once semantics, ensuring that every event is processed exactly once, even in the midst of system failures. Flink appears as a natural choice as a stream processor for Kafka. Although Apache Flink enjoys significant success and popularity as a real-time data processing tool, accessing sufficient resources and current examples to learn Flink can be challenging.

In this article, I’ll walk you through the step-by-step process of integrating Kafka 2.13-3.7.0 with Flink 1.18.1 to consume topic data and process it within Flink on a single-node cluster. Ubuntu-22.04 LTS was used as the OS in the cluster.

Assumptions

  • The system has a minimum of 8 GB RAM and 250 GB SSD along with Ubuntu-22.04.2 amd64 as the operating system.
  • OpenJDK 11 is installed with JAVA_HOME environment variable configuration.
  • Python 3 or Python 2 along with Perl 5 is available on the system.
  • A single node Apache Kafka-3.7.0 cluster is up and running with Apache Zookeeper -3.5.6. (Read how to set up a Kafka cluster here.).

Install and run Flink 1.18.1

  • You can download the Flink-1.18.1 binary distribution here.
  • Extract the archive flink-1.18.1-bin-scala_2.12.tgz on the terminal using $ tar -xzf flink-1.18.1-bin-scala_2.12.tgz. After successful extraction, the directory big-1.18.1 will be created. Make sure it’s inside bin/, conf/and examples/ directories are available.
  • Go to bin directory through the terminal and execute $ ./bin/start-cluster.sh to run a single-node Flink cluster.Run $ ./bin/start-cluster.sh to start a single-node Flink cluster
  • Moreover, we can use Flink’s web interface to monitor the cluster status and running jobs by accessing the browser on port 8081.

Flink web interface: monitor cluster status and running jobs

  • A Flink cluster can be stopped by executing $ ./bin/stop-cluster.sh.

List of dependent JARs

The following .jars should be included in the classpath/build file:

.jars to be included in the classpath/build file

I created a basic Java program using Eclipse IDE 23-12 to continuously consume messages within Flink from a Kafka topic. Dummy string messages are posted to the topic using Kafka’s built-in kafka-console-publisher the script. Upon arrival in the Flink mechanism, no data transformation occurs for each message. Instead, an additional string is simply appended to each message and printed for verification, ensuring that messages are continuously transmitted to Flink.

package com.dataview.flink;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.dataview.flink.util.IKafkaConstants;


public class readFromKafkaTopic 
	public static void main(String[] args) throws Exception 
		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
		KafkaSource<String> source = KafkaSource.<String>builder()
			    .setBootstrapServers(IKafkaConstants.KAFKA_BROKERS)
			    .setTopics(IKafkaConstants.FIRST_TOPIC_NAME)
			    .setGroupId(IKafkaConstants.GROUP_ID_CONFIG)
			    .setStartingOffsets(OffsetsInitializer.earliest())
			    .setValueOnlyDeserializer(new SimpleStringSchema())
			    .build();
		DataStream<String> messageStream = see.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
		messageStream.rebalance().map(new MapFunction<String, String>() 
			private static final long serialVersionUID = -6867736771747690202L;

			@Override
			public String map(String value) throws Exception 
				return "Kafka and Flink says: " + value;
			
		).print();

		see.execute();
	


The entire execution was recorded on the screen. If you are interested, you can look below:<

I hope you enjoyed reading this. Stay tuned for another upcoming article where I’ll explain how to transfer messages/data from Flink to a Kafka theme.

Source link

Leave a Reply

Your email address will not be published. Required fields are marked *