Create a Kafka Pipeline using Java Application | Apache Kafka

Introduction

This Article is about Programming Apache Kafka producer and consumer using Java language, as we’ll see, using Java we’ll be able to reproduce what the CLI does and even more.

Prerequisites

Creating Kafka project

Note: In this article, we assume that the editor is “Intellij IDEA”

  1. Open your Intellij IDEA
  2. Create a new “Maven” project with your preferred project name
  3. Open “Pom.xml” file in your project which contains all your dependencies in your code
  4.  We need to add the Kafka dependencies
    So, we will open <Dependencies> </ Dependencies> Tag to add our dependencies to it.
    we will add two dependencies the first one is the Kafka dependency and the next one will be a logging dependency.
    a. we will go to https://mvnrepository.com/ and search on apache Kafka to get “kafka-clients” Dependency as shown

and open it and select the latest version and copy our maven dependency to our dependencies tag.
b. on the same website we will search on “slf4j simple” and repeat the previous step and copy dependency then add it to dependencies tag what we did in the previous point.

4. our dependencies tag should be like that:

<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.5.0</version>
</dependency>
    <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.30</version>
</dependencies>

5. in Intellij editor toolbar click on View -> Tool Windows-> Maven and click on the refresh button to load our dependencies

6. Our dependences are loaded successfully let’s go doing some coding!

Creating java producer

  1. create your producer class with its main function.
  2. That’s all the imported libraries we will need
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

3. In our main function we will create our properties object to configure our producer properties

Properties properties = new Properties();

you will find out all the properties of the producer in kafka documentation Here

4. We will configure the required properties as shown

properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
  • bootstrap.servers: A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. And it conncted on our zookeeper port “9092”
  • key.serializer: Serializer class for key that implements the org.apache.kafka.common.serialization.Serializer interface.
  • value.serializer: Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface.

5. So, we will create our producer with our initialized properties

KafkaProducer<String,String> producer= new KafkaProducer<String,String>(properties);

6. Then we will create the record we will pass to our topic with the topic name we created already created

ProducerRecord<String,String> record = new ProducerRecord<String, String>("first_topic",”Hello World!”);

Note: We here sent a Hello World string to our topic named “First Topic”

7. Now, we will send our record to producer

producer.send(record);

and we need to wait until all data are produced so we will add using “flush” method

producer.flush();

then finally, close our producer

producer.close();

Create Java Consumer

  1. create your consumer class with its main function.
  2. That’s all the imported libraries we will need
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

3. In our main function we will create our properties object to configure our Consumer properties

Properties properties = new Properties();

you will find out all the properties of the Consumer in Kafka documentation Here.

4. We will configure the required properties as shown

properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-fourth-application");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
  • bootstrap.servers: A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. And it conncted on our zookeeper port “9092”
  • key.deserializer: Deserializer class for key that implements the org.apache.kafka.common.serialization.Deserializer interface.
  • value.deserializer: Deserializer class for value that implements the org.apache.kafka.common.serialization.Deserializer interface.
  • group.id: A unique string that identifies the consumer group this consumer belongs to.
  • offset.reset: What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):
    • earliest: automatically reset the offset to the earliest offset
    • latest: automatically reset the offset to the latest offset
    • none: throw exception to the consumer if no previous offset is found for the consumer’s group
    • anything else: throw exception to the consumer.

8. Now, we will create our Consumer with our initialized properties

KafkaConsumer<String,String> consumer =  new KafkaConsumer<String, String>(properties);

9. We need to subscribe consumer to our topic

consumer.subscribe(Collections.singleton("first_topic"));

10. Here, we need to poll our data in real time we will use a simple while loop and the poll property We created our records and then poll the data to it the we will show it on the screen by the logger

while (true){
    ConsumerRecords<String,String> records =
            consumer.poll(Duration.ofMillis(100));

    for (ConsumerRecord<String,String> record : records){
    logger.info(" value: "+ record.value());

    }
}

11. Here we go, Run our consumer class and do a simple test by producing any data.

  1. Start you zookeeper and kafka servers from CLI
  2. Run your consumer Java Class to start consume new messages
  3. Run Your Producer java class to produce our “Hello_World!” Message.
  4. go back to our consumer and you will find our produced message

You can replace our example data here with any data from database table, file, or data from any other stream

Mohamed Tarek

Mohamed Tarek is a Data Engineer with a great passion about data and all technologies around data Ecosystem, he specialized in data integration and data streaming technologies, he is a part of a development team who delivers end to end solutions including data integration, data model designs, and data analytics.

guest
0 Comments
Inline Feedbacks
View all comments