ActiveMq – How to work with Spring JMS ActiveMq Topic (Publisher-Subcribers pattern) using SpringBoot

The previous tutorials, We had learned how to create a Spring JMS applications for working with ActiveMq Queue. In the tutorial, JavaSampleApproach will guide you how to create Spring JMS applications for working with ActiveMq Topic.

Related articles:
How to use Spring JMS with ActiveMQ – JMS Consumer and JMS Producer | Spring Boot
Spring Jms ActiveMq – How to send Java object messages to ActiveMQ server (specially with Bi-Directional relationship Java objects)
ActiveMq – Explicitly configure Spring ActiveMq ConnectionFactory with SpringBoot
How to resolve Json Infinite Recursion problem when working with Jackson


I. Spring Jms ActiveMq Topic

ActiveMq provides the Publish-Subscribe pattern (pub-sub) for building Jms message distributed systems.
How it work? -> When you publish a messages, all active subscribers will receive a copy of the message.

spring activeMq - publiser subcriber

With SpringBoot application, we need to enable pubSubDomain (.setPubSubDomain(true)) for 2 beans {JmsTemplate, JmsListenerContainerFactory}:

@Bean
public JmsListenerContainerFactory jsaFactory(ConnectionFactory connectionFactory,
                                                DefaultJmsListenerContainerFactoryConfigurer configurer) {
    ...
    factory.setPubSubDomain(true);
    ...
    return factory;
}

@Bean
public JmsTemplate jmsTemplate(){
    JmsTemplate template = new JmsTemplate();
	...
    template.setPubSubDomain(true);
    return template;
}

And set spring.jms.pub-sub-domain=true in application.properties file.

II. Practice

In the tutorial, we use SpringBoot to create 2 applications: PublisherSubcriber.

Technologies
– Java 8
– Maven 3.6.1
– Spring Tool Suite: Version 3.8.4.RELEASE
– Spring Boot: 1.5.4.RELEASE
– Apache ActiveMQ 5.14.0

spring jms activeMq topic- project structure

Step to do:
– Create SpringBoot projects
– Create Java message models
– Configure ConnectionFactory
– Create Jms Publisher/Subcriber
– Implement Client for Publisher
– Run and check results

1. Create SpringBoot projects

Using SpringToolSuite to create 2 SpringBoot projects: one for Publisher, one for Subcriber. Then add dependencies for both of them:


	org.springframework.boot
	spring-boot-starter-activemq


         com.fasterxml.jackson.core
         jackson-databind


	org.json
	json

2. Create Java message models

Create 2 Java message models: Company & Product with one-to-many relationship:

– Company

package com.javasampleapproach.activemq.models;

import java.util.List;

import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
 
import com.fasterxml.jackson.annotation.JsonIdentityInfo;
import com.fasterxml.jackson.annotation.ObjectIdGenerators;
 
@JsonIdentityInfo(generator=ObjectIdGenerators.IntSequenceGenerator.class,property="@id", scope = Company.class)
public class Company {
    private String name;
 
    private List products;
	
    public Company(){
    }
    
    public Company(String name, List products){
    	this.name = name;
    	this.products = products;
    }
    
    // name
    public String getName() {
        return name;
    }
    
    public void setName(String name) {
        this.name = name;
    }
    
    // products
    public void setProducts(List products){
    	this.products = products;
    }
    
    public List getProducts(){
    	return this.products;
    }
 
	/**
	 * 
	 * Show Detail View
	 */
	public String toString(){
		JSONObject jsonInfo = new JSONObject();
		
		try {
			jsonInfo.put("name", this.name);
 
			JSONArray productArray = new JSONArray();
			if (this.products != null) {
				this.products.forEach(product -> {
					JSONObject subJson = new JSONObject();
					try {
						subJson.put("name", product.getName());
					} catch (JSONException e) {}
					
					productArray.put(subJson);
				});
			}
			jsonInfo.put("products", productArray);
		} catch (JSONException e1) {}
		return jsonInfo.toString();
	}
 
}

– Product

package com.javasampleapproach.activemq.models;
nnotations to handle the
import com.fasterxml.jackson.annotation.JsonIdentityInfo;
import com.fasterxml.jackson.annotation.ObjectIdGenerators;
 
@JsonIdentityInfo(generator=ObjectIdGenerators.IntSequenceGenerator.class,property="@id", scope = Product.class)
public class Product {
    private String name;
    
    private Company company;
	
    public Product(){
    }
    
    public Product(String name){
    	this.name = name;
    }
    
    public Product(String name, Company company){
    	this.name = name;
    	this.company = company;
    }
    
    // name
    public String getName() {
        return name;
    }
    
    public void setName(String name) {
        this.name = name;
    }
    
    // products
    public void setCompany(Company company){
    	this.company = company;
    }
    
    public Company getCompany(){
    	return this.company;
    }
}

Note: We use @JsonIdentityInfo annotation to handle the Infinite Recursion problem for serializing Bi-Directional relationship objects with Jackson.

3. Configure ConnectionFactory

We create ConnectionFactoryConfig for both Publisher and Subcriber.

package com.javasampleapproach.activemq.config;

import javax.jms.ConnectionFactory;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;

@Configuration
@EnableJms
public class ConnectionFactoryConfig {
	@Value("${jsa.activemq.broker.url}")
	String brokerUrl;
	
	@Value("${jsa.activemq.borker.username}")
	String userName;
	
	@Value("${jsa.activemq.borker.password}")
	String password;

	/*
	 * Initial ConnectionFactory
	 */
    @Bean
    public ConnectionFactory connectionFactory(){
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(brokerUrl);
        connectionFactory.setUserName(userName);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }
    
	@Bean // Serialize message content to json using TextMessage
	public MessageConverter jacksonJmsMessageConverter() {
	    MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
	    converter.setTargetType(MessageType.TEXT);
	    converter.setTypeIdPropertyName("_type");
	    return converter;
	}
    
	...
}

We use jacksonJmsMessageConverter bean to serialize Java object messages.
>>> See more at: How to send Java object messages to ActiveMQ server.

With Publisher application, we configure additional jmsTemplate bean in ConnectionFactoryConfig file as below code:

@Bean
public JmsTemplate jmsTemplate(){
    JmsTemplate template = new JmsTemplate();
    template.setConnectionFactory(connectionFactory());
    template.setMessageConverter(jacksonJmsMessageConverter());
    template.setPubSubDomain(true);
    return template;
}

With Subcriber application, we configure additional jsaFactory bean in ConnectionFactoryConfig file as below code:

@Bean
public JmsListenerContainerFactory jsaFactory(ConnectionFactory connectionFactory,
                                                DefaultJmsListenerContainerFactoryConfigurer configurer) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setPubSubDomain(true);
    factory.setMessageConverter(jacksonJmsMessageConverter());
    configurer.configure(factory, connectionFactory);
    return factory;
}
4. Create Jms Publisher/Subcriber

With Publisher application, create a JmsPublisher component as below:

package com.javasampleapproach.activemq.jms;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

import com.javasampleapproach.activemq.models.Company;

@Component
public class JmsPublisher {
	@Autowired
	JmsTemplate jmsTemplate;
	
	@Value("${jsa.activemq.topic}")
	String topic;
	
	public void send(Company apple){
		jmsTemplate.convertAndSend(topic, apple);
	}
}

With Subcriber application, create a JmsPublisher component as below:

package com.javasampleapproach.activemq.jms;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import com.javasampleapproach.activemq.models.Company;


@Component
public class JmsSubcriber {
	
	@JmsListener(destination = "${jsa.activemq.topic}")
	public void receive(Company msg){
		System.out.println("Recieved Message: " + msg);
	}
}

5. Implement Client for Publisher

In main class SpringActiveMqTopicProducerApplication, we use CommandLineRunner interface to implement code:
– Initial 2 Company object messages {apple, samsung}
– Sending them to ActiveMQ Topic server.

package com.javasampleapproach.activemq;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import com.javasampleapproach.activemq.jms.JmsPublisher;
import com.javasampleapproach.activemq.models.Company;
import com.javasampleapproach.activemq.models.Product;

@SpringBootApplication
public class SpringActiveMqTopicProducerApplication implements CommandLineRunner {

	@Autowired
	JmsPublisher publisher;
	
	public static void main(String[] args) {
		SpringApplication.run(SpringActiveMqTopicProducerApplication.class, args);
	}
	
	@Override
	public void run(String... args) throws Exception {
		/*
		 * Apple company & products
		 */
		// initial company and products 
		Product iphone7 = new Product("Iphone 7");
		Product iPadPro = new Product("IPadPro");
		
		List appleProducts = new ArrayList(Arrays.asList(iphone7, iPadPro));
		
		Company apple = new Company("Apple", appleProducts);
		
		iphone7.setCompany(apple);
		iPadPro.setCompany(apple);
		
		// send message to ActiveMQ
		publisher.send(apple);
        
        /*
         * Samsung company and products
         */
		Product galaxyS8 = new Product("Galaxy S8");
		Product gearS3 = new Product("Gear S3");
		
		List samsungProducts = new ArrayList(Arrays.asList(galaxyS8, gearS3));
		
		Company samsung = new Company("Samsung", samsungProducts);
		
		galaxyS8.setCompany(samsung);
		gearS3.setCompany(samsung);
		
        /*
         * send message to ActiveMQ
         */
		publisher.send(samsung);
	}
}

6. Run and check results

– Start ActiveMQ server with commandline: C:\apache-activemq-5.14.5>.\bin\activemq start.
– Build and Run the SpringBoot applications by commandlines: {mvn clean install, mvn spring-boot:run} with the following order:

6.1 Enable one active subscriber

– Start an instance of SpringBoot Subcriber on ActiveMQ topic: jsa-topic
– Then start an instance of SpringBoot Publiser.

-> We receive 2 Company messages {apple, samsung}. See the subscriber’s console logs:

Recieved Message: {"name":"Apple","products":[{"name":"Iphone 7"},{"name":"IPadPro"}]}
Recieved Message: {"name":"Samsung","products":[{"name":"Galaxy S8"},{"name":"Gear S3"}]}
6.2 Enable two active subcribers

Again, we start another instance of SpringBoot Subcriber on ActiveMQ topic: jsa-topic.
-> Now having 2 active subcribers on jsa-topic topic.

Start the SpringBoot Publiser application again for sending messages.

-> Results: we receive 2 new Company messages for each Subcribers.
So with the first Subcriber, it had recieved total 4 Company messages:

Recieved Message: {"name":"Apple","products":[{"name":"Iphone 7"},{"name":"IPadPro"}]}
Recieved Message: {"name":"Samsung","products":[{"name":"Galaxy S8"},{"name":"Gear S3"}]}
Recieved Message: {"name":"Apple","products":[{"name":"Iphone 7"},{"name":"IPadPro"}]}
Recieved Message: {"name":"Samsung","products":[{"name":"Galaxy S8"},{"name":"Gear S3"}]}

And with the second Subcriber (the newer), it had recieved 2 Company messages:

Recieved Message: {"name":"Apple","products":[{"name":"Iphone 7"},{"name":"IPadPro"}]}
Recieved Message: {"name":"Samsung","products":[{"name":"Galaxy S8"},{"name":"Gear S3"}]}

Spring Jms Applications (Publisher – Subcribers) are working fine with ActiveMq server! -> Let’s start now!

III. Sourcecode

SpringActiveMqTopicPublisher
SpringActiveMqTopicSubcriber

4 thoughts on “ActiveMq – How to work with Spring JMS ActiveMq Topic (Publisher-Subcribers pattern) using SpringBoot”

  1. You get into trouble when setting configurer.configure(factory, connectionFactory); as it would use the application.properties to set the pub-sub-domain. If spring.jms.pub-sub-domain=false for example, the default configurer will set the pub sub domain back to false. In which case the listener will be unable to consume from the queue.

Leave a Reply

Your email address will not be published. Required fields are marked *