SpringBoot RabbitMQ Topic Exchange

In the past post, we had introduced about RabbitMQ Publish/Subcribe pattern with fanout exchange. Today, JavaSampleApproach will show you how to work with SpringBoot RabbitMQ Topic Exchange.

Related posts:
RabbitMQ – How to create Spring RabbitMQ Producer/Consumer applications with SpringBoot
RabbitMQ – How to send/receive Java object messages with Spring RabbitMq | SpringBoot
RabbitMq – How to create Spring RabbitMq Publish/Subcribe pattern with SpringBoot
SpringBoot RabbitMq Headers Exchange
SpringBoot RabbitMq Exchange to Exchange Topology

I. Technologies

– Java 8
– Maven 3.6.1
– Spring Tool Suite – Version 3.8.1.RELEASE
– Spring Boot: 1.5.7.RELEASE
– RabbitMQ

II. RabbitMq Topic Exchange

routing_key of messages sent to a topic exchange must be a list of words, delimited by dots, example:

#.error
*.prod.*
sys.#

Note:
* (star) must be an exactly one word.
# (hash) can be zero or more words.

springboot rabbitmq topic - architecture

With the above topic exchange design,
– when we send a message with routing key: sys.dev.info, it will just be delivered to Q1.
– when we send a message with routing key: app.prod.error, it will just be delivered to Q2.
– when we send a message with routing key: sys.test.error, it will be delivered to both queues {Q1, Q2}.

Topic exchange is strong tool and it can act as other exchanges as below:

– When a queue is bound with “#” (hash) binding key – it is as an fanout exchange.
– When don’t use * & # in bindings, it will behave as a direct exchange.

III. Practices

In the tutorial, we create 2 SpringBoot project as below:

springboot rabbitmq topic -project structure

Step to do:
– Create SpringBoot projects
– Define data model
– Implement RabbitMq Producer
– Implement RabbitMq consumer
– Run and check results

1. Create SpringBoot projects

Using SpringToolSuite, create 2 SpringBoot projects, then add need dependency spring-boot-starter-amqp:

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. Define data model

Create Log data model for both projects:


package com.javasampleapproach.rabbitmq.model;

public class Log {
	private String content;
	private String routingKey;
	
	public Log(){};
	
	public Log(String content, String routingKey){
		this.content = content;
		this.routingKey = routingKey;
	}
	
	public String getContent(){
		return this.content;
	}
	
	public void setContent(String content){
		this.content = content;
	}
	
	public String getRoutingKey(){
		return this.routingKey;
	}
	
	public void setRoutingKey(String routingKey){
		this.routingKey = routingKey;
	}
	
	@Override
	public String toString() {
		return String.format("{content = %s, routingKey = %s}", content, routingKey);
	}
}

3. Implement RabbitMq producer

3.1 Configure RabbitMq producer


package com.javasampleapproach.rabbitmq.config;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {
	
    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
    
    public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }
}

Open application.properties, add configuration:


spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
jsa.rabbitmq.exchange=jsa.exchange.logs
jsa.rabbitmq.queue=jsa.queue
jsa.rabbitmq.routingkey=jsa.routingkey

3.2 Implement RabbitMq producer


package com.javasampleapproach.rabbitmq.producer;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.javasampleapproach.rabbitmq.model.Log;

@Component
public class Producer {
	
	@Autowired
	private AmqpTemplate amqpTemplate;
	
	@Value("${jsa.rabbitmq.exchange}")
	private String exchange;
	
	public void produce(Log logs){
		String routingKey = logs.getRoutingKey();
		amqpTemplate.convertAndSend(exchange, routingKey, logs);
		System.out.println("Send msg = " + logs);
	}
}

3.3 Implement RabbitMqProducer client


package com.javasampleapproach.rabbitmq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import com.javasampleapproach.rabbitmq.model.Log;
import com.javasampleapproach.rabbitmq.producer.Producer;

@SpringBootApplication
public class SpringRabbitMqProducerApplication  implements CommandLineRunner{

	public static void main(String[] args) {
		SpringApplication.run(SpringRabbitMqProducerApplication.class, args);
	}
	
	@Autowired
	Producer producer;

	@Override
	public void run(String... args) throws Exception {
		
		/**
		 *  1
		 */
		String content = "2014-03-05 10:58:51.1  INFO 45469 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet Engine: Apache Tomcat/7.0.52";
		String routingKey = "sys.dev.info";
		
		// send to RabbitMQ
		producer.produce(new Log(content, routingKey));
		
		/**
		 *  2
		 */
		content = "2017-10-10 10:57:51.10 ERROR in ch.qos.logback.core.joran.spi.Interpreter@4:71 - no applicable action for [springProperty], current ElementPath is [[configuration][springProperty]]";
		routingKey = "sys.test.error";
		
		// send to RabbitMQ
		producer.produce(new Log(content, routingKey));
		
		/**
		 *  3
		 */
		content = "2017-10-10 10:57:51.112  ERROR java.lang.Exception: java.lang.Exception";
		routingKey = "app.prod.error";
		
		// send to RabbitMQ
		producer.produce(new Log(content, routingKey));
	}
}

4. Implement RabbitMq consumer

4.1 Configure RabbitMq consumer


package com.javasampleapproach.rabbitmq.config;

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {
	
    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
    
    @Bean
    public SimpleRabbitListenerContainerFactory jsaFactory(ConnectionFactory connectionFactory,
            SimpleRabbitListenerContainerFactoryConfigurer configurer) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(jsonMessageConverter());
        return factory;
    }
}

Open application.properties, add configuration:


spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
jsa.rabbitmq.queue=jsa.logs.sys
#jsa.rabbitmq.queue=jsa.logs.prod.error

4.2 Implement consumer


package com.javasampleapproach.rabbitmq.consumer;

import org.apache.commons.logging.Log;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class Consumer {
	
	@RabbitListener(queues="${jsa.rabbitmq.queue}", containerFactory="jsaFactory")
    public void recievedMessage(Log logs) {
        System.out.println("Recieved Message: " + logs);
    }
}

5. Run and check results

5.1 Setup RabbitMq exchange, queues

Enable rabbitmq_management by cmd: rabbitmq-plugins enable rabbitmq_management --online
Then go to: http://localhost:15672 -> login with user/password: guest/guest

springboot rabbitmq topic - connection

Add exchange:
Go to http://localhost:15672/#/exchanges, add exchange: jsa.exchange.logs

springboot rabbitmq topic - create exchange

Add queue:
Go to http://localhost:15672/#/queues, add 2 queues: jsa.logs.sys, jsa.logs.prod.error.

springboot rabbitmq topic - 2 queue

– Binding the queues with above exchange:

springboot rabbitmq topic - exchange bindling

5.2 Run & check results

– Run SpringBootRabbitMqProducer with commandline mvn spring-boot:run,

-> Console’s logs:


Send msg = {content = 2014-03-05 10:58:51.1  INFO 45469 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet Engine: Apache Tomcat/7.0.52, routingKey = sys.dev.info}
Send msg = {content = 2017-10-10 10:57:51.10 ERROR in ch.qos.logback.core.joran.spi.Interpreter@4:71 - no applicable action for [springProperty], current ElementPath is [[configuration][springProperty]], routingKey = sys.test.error}
Send msg = {content = 2017-10-10 10:57:51.112  ERROR java.lang.Exception: java.lang.Exception, routingKey = app.prod.error}

-> See queues’ status:

springboot rabbitmq topic - message queue after sending

– Run SpringBootRabbitMqConsumer which listen to jsa.logs.sys queue with configuration: jsa.rabbitmq.queue=jsa.logs.sys:

-> Console’s logs:


Recieved Message: {content = 2014-03-05 10:58:51.1  INFO 45469 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet Engine: Apache Tomcat/7.0.52, routingKey = sys.dev.info}
Recieved Message: {content = 2017-10-10 10:57:51.10 ERROR in ch.qos.logback.core.joran.spi.Interpreter@4:71 - no applicable action for [springProperty], current ElementPath is [[configuration][springProperty]], routingKey = sys.test.error}

-> See queues’s status:

springboot rabbitmq topic - aftern consume in jsa.logs.sys

– Run SpringBootRabbitMqConsumer which listen to jsa.logs.prod.error queue with configuration: jsa.rabbitmq.queue=jsa.logs.prod.error:

-> Console’s logs:


Recieved Message: {content = 2017-10-10 10:57:51.10 ERROR in ch.qos.logback.core.joran.spi.Interpreter@4:71 - no applicable action for [springProperty], current ElementPath is [[configuration][springProperty]], routingKey = sys.test.error}
Recieved Message: {content = 2017-10-10 10:57:51.112  ERROR java.lang.Exception: java.lang.Exception, routingKey = app.prod.error}

-> See queues’s status:

IV. Sourcecode

SpringRabbitMqProducer
SpringRabbitMqConsumer

25 thoughts on “SpringBoot RabbitMQ Topic Exchange”

  1. Whe n i try to run the code, i am gtting the following error :
    Bean [com.javasampleapproach.rabbitmq.consumer.Consumer@3b9e56db]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:129) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:106) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:848) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:771) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:102) [spring-rabbit-1.7.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:198) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1311) [spring-rabbit-1.7.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:752) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1254) [spring-rabbit-1.7.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1224) [spring-rabbit-1.7.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:102) [spring-rabbit-1.7.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1470) [spring-rabbit-1.7.4.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]

    1. import com.javasampleapproach.rabbitmq.model.Log;
      instead of
      import org.apache.commons.logging.Log;
      in class Consumer{}

  2. Hi, I want to ask why the queue name of producer is “jsa.rabbitmq.queue=jsa.queue”, but queue name of consumer is “jsa.rabbitmq.queue=jsa.logs.sys”? I’m thinking that they (producer & consumer) have to use a same queue to communicate, is that correct, or they communicate by 2 separate queues?
    Thank so much,
    Lucas

  3. I have a use case where the client exposed their topic exchange and routingKeys, I just need to listen the messages and have no rights to create queues. In this example, we are binding queues to routingKeys, but in my case, I have only topic exchange and routingKeys. Please suggest how I can make the connection and get the data?

  4. 792687 475399Youre so cool! I dont suppose Ive read anything like this before. So nice to search out any individual with some original thoughts on this topic. realy thank you for starting this up. this web site is 1 thing thats wanted on the web, somebody with a bit of originality. helpful job for bringing something new towards the internet! 107523

  5. If you happen to excited about eco items, sometimes be tough shock to anyone them recognise that to help make unique baskets just for this quite liquids carry basic steps liters associated ceiling fan oil producing. dc free mommy blog giveaways family trip home gardening house power wash baby laundry detergent

  6. After study a few of the websites for your internet site now, and I genuinely as if your technique for blogging. I bookmarked it to my bookmark internet site list and will be checking back soon. Pls look at my website likewise and let me know what you think.

  7. Of course, things are balanced out by some great characterisation work and developing relationships and the movie does well to mix the drama and development in amongst the spectacle but it still feels as if perhaps it could have added a little bit more to reinforce the scale of things and to impress on a simplistically entertaining level.

  8. This is certainly my initial stop by and I really like what I’m seeing. Your weblog is so much fun to look over, quite compelling as well as informative. I’ll undoubtedly recommend it to my friends. Nevertheless, I did have some problem with the commenting. It kept giving me an problem whenever I clicked on publish comment. I hope, that can be fixed. Many thanks

  9. Hi! I know this is kinda off topic but I was wondering which blog platform are you using for this site? I’m getting tired of WordPress because I’ve had issues with hackers and I’m looking at alternatives for another platform. I would be great if you could point me in the direction of a good platform.

Leave a Reply

Your email address will not be published. Required fields are marked *