The Spring Apache Kafka (spring-kafka) provides a high-level abstraction for Kafka-based messaging solutions. And in the previous post, We had developed a Spring Kafka Application with the auto-configuration supported by SpringBoot (from version 1.5). But when we need explicitly configure Kafka factories (Kafka Producer and Kafka Consumer) for development, how to do it? So in the tutorial, JavaSampleApproach will introduce an alternative solution by manually configure Kafka factories to build a Spring Kafka Application.
Related Articles:
– How to start Apache Kafka
– How to start Spring Apache Kafka Application with SpringBoot Auto-Configuration
– How to use Spring Kafka JsonSerializer(JsonDeserializer) to produce/consume Java Object messages
– How to create Spring RabbitMQ Producer/Consumer application with SpringBoot
I. Technologies
– Java 8
– Maven build
– Spring Boot
– Spring Kafka
– Apache Kafka
– Spring Tool Suite editor
II. Overview
We will explicitly implement a ProducerFactory and ConsumerFactory with customized properties:
@Bean public ProducerFactoryproducerFactory() { Map configProps = new HashMap<>(); ... return new DefaultKafkaProducerFactory<>(configProps); } @Bean public ConsumerFactory consumerFactory() { Map props = new HashMap<>(); ... return new DefaultKafkaConsumerFactory<>(props); }
Then use ProducerFactory to build KafkaTemplate and use ConsumerFactory to build
ConcurrentKafkaListenerContainerFactory which will handle @KafkaListener
later:
@Bean public KafkaTemplatekafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } ... @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; }
III. Practice
We create a SpringBoot project with 2 main services: KafkaProducer and KafkaConsumer for sending and receiving messages from Apache Kafka cluster.
And export 2 RestAPIs {‘/producer’, ‘/consumer’} for interaction.
Step to do:
– Create a SpringBoot project
– Create Kafa Factories (ProducerFactory & ConsumerFactory)
– Create Services (Producer and Consumer)
– Export some RestAPIs
– Deployment
1. Create a SpringBoot project
Use SpringToolSuite to create a SpringBoot project, then add dependencies {spring-kafka, spring-boot-starter-web}:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
2. Create Kafa Factories (ProducerFactory & ConsumerFactory)
Open application.properties, add kafka configuration:
jsa.kafka.bootstrap-servers=localhost:9092 jsa.kafka.consumer.group-id=jsa-group jsa.kafka.topic=jsa-kafka-topic
– jsa.kafka.bootstrap-servers
is used to indicate the Kafka Cluster address.
– jsa.kafka.consumer.group-id
is used to indicate the consumer-group-id.
– jsa.kafka.topic
is used to define a Kafka topic name to produce and receive messages.
2.1 Create ProducerFactory and KafkaTemplate
package com.javasampleapproach.apachekafka.config; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; @Configuration public class KafkaProducerConfig { @Value("${jsa.kafka.bootstrap-servers}") private String bootstrapServer; @Bean public ProducerFactoryproducerFactory() { Map configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
2.2 Create ConsumerFactory and KafkaListenerContainerFactory
package com.javasampleapproach.apachekafka.config; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; @EnableKafka @Configuration public class KafkaConsumerConfig { @Value("${jsa.kafka.bootstrap-servers}") private String bootstrapServer; @Value("${jsa.kafka.consumer.group-id}") private String groupId; @Bean public ConsumerFactoryconsumerFactory() { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
@EnableKafka
is used to enable detection of @KafkaListener
annotation.
3. Create Services (Producer and Consumer)
– Create a KafkaProducer service:
package com.javasampleapproach.apachekafka.services; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class KafkaProducer { private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class); @Autowired private KafkaTemplatekafkaTemplate; @Value("${jsa.kafka.topic}") String kafkaTopic = "jsa-test"; public void send(String data) { log.info("sending data='{}'", data); kafkaTemplate.send(kafkaTopic, data); } }
– Create a KafkaConsumer service:
package com.javasampleapproach.apachekafka.services; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import com.javasampleapproach.apachekafka.storage.MessageStorage; @Component public class KafkaConsumer { private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class); @Autowired MessageStorage storage; @KafkaListener(topics="${jsa.kafka.topic}") public void processMessage(String content) { log.info("received content = '{}'", content); storage.put(content); } }
About MessageStorage, it is an additional implement to store Kafka-based messages after received. See details the implementation of MessageStorage:
package com.javasampleapproach.apachekafka.storage; import java.util.ArrayList; import java.util.List; import org.springframework.stereotype.Component; @Component public class MessageStorage { private Liststorage = new ArrayList (); public void put(String message){ storage.add(message); } public String toString(){ StringBuffer info = new StringBuffer(); storage.forEach(msg->info.append(msg).append("
")); return info.toString(); } public void clear(){ storage.clear(); } }
4. Export some RestAPIs
Create a Web Controller to export 2 RestAPIs {‘/producer’, ‘/consumer’}
package com.javasampleapproach.apachekafka.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import com.javasampleapproach.apachekafka.services.KafkaProducer; import com.javasampleapproach.apachekafka.storage.MessageStorage; @RestController @RequestMapping(value="/jsa/kafka") public class WebRestController { @Autowired KafkaProducer producer; @Autowired MessageStorage storage; @GetMapping(value="/producer") public String producer(@RequestParam("data")String data){ producer.send(data); return "Done"; } @GetMapping(value="/consumer") public String getAllRecievedMessage(){ String messages = storage.toString(); storage.clear(); return messages; } }
– /producer
is used to send messages from browser to KafkaProducer service.
– /consumer
is used to get all recieved messages that are buffered in MessageStorage.
5. Deployment
Start Apache Kafka Cluster:
– Start a ZooKeeper:
C:\kafka_2.12-0.10.2.1>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
– Start the Apache Kafka server:
.\bin\windows\kafka-server-start.bat .\config\server.properties
>>> More details at: How to start Apache Kafka
Build and Install the SpringBoot project with commandlines: mvn clean install
and mvn spring-boot:run
– Make a producer request: http://localhost:8080/jsa/kafka/producer?data=Hello World
-> Logs:
... 2017-06-08 13:49:47.111 INFO 12240 --- [io-8080-exec-10] c.j.apachekafka.services.KafkaProducer : sending data='Hello World' ... 2017-06-08 13:49:47.248 INFO 12240 --- [ntainer#0-0-L-1] c.j.apachekafka.services.KafkaProducer : received content = 'Hello World'
– Make another producer request: http://localhost:8080/jsa/kafka/producer?data=This is a SpringBoot Kafka Application
-> Logs:
2017-06-08 13:51:34.909 INFO 12240 --- [nio-8080-exec-7] c.j.apachekafka.services.KafkaProducer : sending data='This is a SpringBoot Kafka Application' 2017-06-08 13:51:34.913 INFO 12240 --- [ntainer#0-0-L-1] c.j.apachekafka.services.KafkaProducer : received content = 'This is a Sprin gBoot Kafka Application'
– Make a consumer request: http://localhost:8080/jsa/kafka/consumer
, result: