Spring Boot | How to consume JSON messages using Apache Kafka

Apache Kafka is a stream processing system which lets you send messages between processes, applications, and servers. In this article, we will see how to publish JSON messages on the console of a Spring boot application using Apache Kafka.
In order to learn how to create a Spring boot project, refer to this article.
Working Steps:Â
Â
- Go to Spring initializer and create a starter project with following dependency:Â
- Spring for Apache Kafka
- Open the project in an IDE and sync the dependencies. In this article, we would be creating a student model where we would be posting the student details. Therefore, create a model class Student. Add data members and create constructor and override the toString method to see the messages in JSON format. The following is the implementation of the student class:
Â
Student Model
// Java program to implement a// student classÂ
// Creating a student classpublic class Student {Â
    // Data members of the class    int id;    String firstName;    String lastName;Â
    // Constructor of the student    // class    public Student()    {    }Â
    // Parameterized constructor of    // the student class    public Student(int id, String firstName,                   String lastName)    {        this.id = id;        this.firstName = firstName;        this.lastName = lastName;    }Â
    @Override    public String toString()    {        return "Student{"            + "id = " + id            + ", firstName = '" + firstName + "'"            + ", lastName = '" + lastName + "'"            + "}";    }} |
- Â
- Create a new class Config and add annotations @Configuration and @EnableKafka. Now create beans ConsumerFactory and ConcurrentKafkaListenerContainerFactory with Student class object.
Â
Config clas
@EnableKafka@Configurationpublic class Config {Â
    // Function to establish a connection    // between Spring application    // and Kafka server    @Bean    public ConsumerFactory<String, Student>    studentConsumer()    {Â
        // HashMap to store the configurations        Map<String, Object> map            = new HashMap<>();Â
        // put the host IP in the map        map.put(ConsumerConfig                    .BOOTSTRAP_SERVERS_CONFIG,                "127.0.0.1:9092");Â
        // put the group ID of consumer in the map        map.put(ConsumerConfig                    .GROUP_ID_CONFIG,                "id");        map.put(ConsumerConfig                    .KEY_DESERIALIZER_CLASS_CONFIG,                StringDeserializer.class);        map.put(ConsumerConfig                    .VALUE_DESERIALIZER_CLASS_CONFIG,                JsonDeserializer.class);Â
        // return message in JSON formate        return new DefaultKafkaConsumerFactory<>(            map, new StringDeserializer(),            new JsonDeserializer<>(Student.class));    }Â
    @Bean    public ConcurrentKafkaListenerContainerFactory<String,                                                   Student>    studentListner()    {        ConcurrentKafkaListenerContainerFactory<String,                                                Student>            factory            = new ConcurrentKafkaListenerContainerFactory<>();        factory.setConsumerFactory(studentConsumer());        return factory;    }} |
- Â
- Create a class KafkaService with @Service annotation. This class will contain the listener method to publish the message on the console.
Â
KafkaService Class
@Servicepublic class KafkaService {Â
    // Annotation required to listen    // the message from Kafka server    @KafkaListener(topics = "JsonTopic",                   groupId = "id", containerFactory                                   = "studentListner")    public void    publish(Student student)    {        System.out.println("New Entry: "                           + student);    }} |
- Â
- Start zookeeper and Kafka server. Now we need to create a new topic with the name JsonTopic. To do so, open a new command prompt window and change directory to the Kafka directory.
- Now create a new topic using the command given below:
Â
bin/Kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name // for mac and linux
.\bin\windows\Kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name // for windowsÂ
Â
- Â
- Now to run Kafka producer console, use the command below:
Â
bin/Kafka-console-producer.sh –broker-list localhost:9092 –topic Kafka_Example // for mac and linux
.\bin\windows\Kafka-console-producer.bat –broker-list localhost:9092 –topic Kafka_Example // for windowsÂ
Â
- Â
- Run the application and and type message on Kafka producer and press enter.



