Java 9 Flow API example – Publisher and Subscriber

java-9-flow-api-example-publisher-and-subscriber-feature-image

In previous post, we have general knowledge about Reactive Streams and Java 9 Flow API Components and Behaviour. In this tutorial, we’re gonna look at an example that implements Publisher and Subscriber for reactive programming.

Related Articles:
Java 9 Flow API – Reactive Streams
Java 9 Flow API example – Processor
Java 9 FLow SubmissionPublisher – A Concrete Publisher

I. Technologies

– Java 9
– Eclipse with Java 9 Support for Oxygen (4.7)

II. Project Overview


We will create a Publisher that is subscribed by two Subscribers.
Publisher maintains a list of Subscriptions, each Subscription is correlative to each Subscriber above.
Publisher uses one Subscription to push items to correlative Subscriber by Subscriber::onNext() method.
Subscriber uses Subscription to request items from Publisher by Subscription::request() method.
Publisher defines an Executor for multi-threading. Then request() and onNext() method work asynchronously, producing data to each Subscriber by Subscription is also asynchronous.
– After receiving all items successfully, Subscriber can request new data or cancel Subscription (random).

III. Practice

To understand how Publisher, Subscriber and Subscription behave and way to implementing them, please visit: Java 9 Flow API – Reactive Streams

1. Create implementation of Publisher


package com.javasampleapproach.java9flow.pubsub;

import static java.lang.Thread.currentThread;
import static java.util.concurrent.Executors.newSingleThreadExecutor;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class MyPublisher implements Publisher {

	private static final String LOG_MESSAGE_FORMAT = "Publisher >> [%s] %s%n";

	final ExecutorService executor = Executors.newFixedThreadPool(4);
	private List subscriptions = Collections.synchronizedList(new ArrayList());

	private final CompletableFuture terminated = new CompletableFuture<>();

	@Override
	public void subscribe(Subscriber subscriber) {
		MySubscription subscription = new MySubscription(subscriber, executor);

		subscriptions.add(subscription);

		subscriber.onSubscribe(subscription);
	}

	public void waitUntilTerminated() throws InterruptedException {
		try {
			terminated.get();
		} catch (ExecutionException e) {
			System.out.println(e);
		}
	}

	private class MySubscription implements Subscription {

		private final ExecutorService executor;

		private Subscriber subscriber;
		private final AtomicInteger value;
		private AtomicBoolean isCanceled;

		public MySubscription(Subscriber subscriber, ExecutorService executor) {
			this.subscriber = subscriber;
			this.executor = executor;

			value = new AtomicInteger();
			isCanceled = new AtomicBoolean(false);
		}

		@Override
		public void request(long n) {
			if (isCanceled.get())
				return;

			if (n < 0)
				executor.execute(() -> subscriber.onError(new IllegalArgumentException()));
			else
				publishItems(n);
		}

		@Override
		public void cancel() {
			isCanceled.set(true);

			synchronized (subscriptions) {
				subscriptions.remove(this);
				if (subscriptions.size() == 0)
					shutdown();
			}
		}

		private void publishItems(long n) {
			for (int i = 0; i < n; i++) {

				executor.execute(() -> {
					int v = value.incrementAndGet();
					log("publish item: [" + v + "] ...");
					subscriber.onNext(v);
				});
			}
		}

		private void shutdown() {
			log("Shut down executor...");
			executor.shutdown();
			newSingleThreadExecutor().submit(() -> {

				log("Shutdown complete.");
				terminated.complete(null);
			});
		}

	}

	private void log(String message, Object... args) {
		String fullMessage = String.format(LOG_MESSAGE_FORMAT, currentThread().getName(), message);

		System.out.printf(fullMessage, args);
	}
}

2. Create implementation of Subscriber


package com.javasampleapproach.java9flow.pubsub;

import static java.lang.Thread.currentThread;

import java.util.Random;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

public class MySubscriber implements Subscriber {

	private static final String LOG_MESSAGE_FORMAT = "Subscriber %s >> [%s] %s%n";

	private static final int DEMAND = 3;
	private static final Random RANDOM = new Random();

	private String name;
	private Subscription subscription;

	private int count;
	
	public MySubscriber(String name) {
		this.name = name;
	}

	@Override
	public void onSubscribe(Subscription subscription) {
		log("Subscribed");
		this.subscription = subscription;

		count = DEMAND;
		requestItems(DEMAND);
	}

	private void requestItems(int n) {
		log("Requesting %d new items...", n);
		subscription.request(n);
	}

	@Override
	public void onNext(Integer item) {
		if (item != null) {
			log(item.toString());

			synchronized (this) {
				count--;

				if (count == 0) {
					if (RANDOM.nextBoolean()) {
						count = DEMAND;
						requestItems(count);
					} else {
						count = 0;
						log("Cancelling subscription...");
						subscription.cancel();
					}
				}
			}
		} else {
			log("Null Item!");
		}
	}

	@Override
	public void onComplete() {
		log("Complete!");
	}

	@Override
	public void onError(Throwable t) {
		log("Subscriber Error >> %s", t);
	}

	private void log(String message, Object... args) {
		String fullMessage = String.format(LOG_MESSAGE_FORMAT, this.name, currentThread().getName(), message);

		System.out.printf(fullMessage, args);
	}
}

3. Create Test Class


package com.javasampleapproach.java9flow.pubsub;

public class MainApp {

	public static void main(String[] args) throws InterruptedException {
		
		MyPublisher publisher = new MyPublisher();
		MySubscriber subscriberA = new MySubscriber("A");
		MySubscriber subscriberB = new MySubscriber("B");
		
		publisher.subscribe(subscriberA);
		publisher.subscribe(subscriberB);
		
		publisher.waitUntilTerminated();
	}
}

4. Check Result


Subscriber A >> [main] Subscribed
Subscriber A >> [main] Requesting 3 new items...
Subscriber B >> [main] Subscribed
Subscriber B >> [main] Requesting 3 new items...
Publisher >> [pool-1-thread-2] publish item: [2] ...
Subscriber A >> [pool-1-thread-2] 2
Publisher >> [pool-1-thread-1] publish item: [1] ...
Subscriber A >> [pool-1-thread-1] 1
Publisher >> [pool-1-thread-1] publish item: [1] ...
Subscriber B >> [pool-1-thread-1] 1
Publisher >> [pool-1-thread-2] publish item: [2] ...
Subscriber B >> [pool-1-thread-2] 2
Publisher >> [pool-1-thread-3] publish item: [3] ...
Subscriber A >> [pool-1-thread-3] 3
Subscriber A >> [pool-1-thread-3] Cancelling subscription...
Publisher >> [pool-1-thread-4] publish item: [3] ...
Subscriber B >> [pool-1-thread-4] 3
Subscriber B >> [pool-1-thread-4] Requesting 3 new items...
Publisher >> [pool-1-thread-1] publish item: [4] ...
Subscriber B >> [pool-1-thread-1] 4
Publisher >> [pool-1-thread-2] publish item: [5] ...
Subscriber B >> [pool-1-thread-2] 5
Publisher >> [pool-1-thread-3] publish item: [6] ...
Subscriber B >> [pool-1-thread-3] 6
Subscriber B >> [pool-1-thread-1] Cancelling subscription...
Publisher >> [pool-1-thread-1] Shut down executor...
Publisher >> [pool-2-thread-1] Shutdown complete.

37 thoughts on “Java 9 Flow API example – Publisher and Subscriber”

  1. 374137 247830Be the precise blog in case you have wants to learn about this topic. You comprehend considerably its almost onerous to argue to you (not that I personally would needHaHa). You undoubtedly put a new spin for a topic thats been discussing for some time. Good stuff, just good! 830448

  2. After study a handful of the web sites for your web site now, and i also genuinely such as your technique for blogging. I bookmarked it to my bookmark site list and you will be checking back soon. Pls look at my site as well and let me know if you agree.

  3. I have been exploring for a little bit for any high quality articles or blog posts in this sort of space . Exploring in Yahoo I eventually stumbled upon this web site. Studying this info So i am happy to express that I’ve an incredibly just right uncanny feeling I found out just what I needed. I such a lot for sure will make sure to do not fail to remember this site and provides it a look a relentless basis.

  4. Concerning me and my partner we have owned far more MP3 players about the many years than I can rely, including Sansas, iRivers, iPods (classic & touch), the Ibiza Rhapsody, and so on. But, the last several years I have resolved down in the direction of just one line of avid gamers. Why? Mainly because I was delighted in direction of uncover how well-designed and exciting in the direction of retain the services of the underappreciated (and broadly mocked) Zunes are.

  5. Heya i’m for the first time here. I came across this board and I find It really useful & it helped me out a lot. I hope to give something back and aid others like you helped me.

  6. of course like your web site however you have to check the spelling on several of your posts. A number of them are rife with spelling issues and I in finding it very troublesome to tell the truth then again I?¦ll certainly come back again.

  7. Good day! I could have sworn I’ve been to this site before but after checking through some of the post I realized it’s new to me. Anyhow, I’m definitely delighted I found it and I’ll be book-marking and checking back frequently!

  8. I wish to show appreciation to you for rescuing me from this type of situation. Because of surfing around through the search engines and seeing solutions that were not beneficial, I was thinking my entire life was well over. Existing minus the strategies to the issues you have sorted out by way of your site is a crucial case, as well as those which might have adversely damaged my entire career if I had not come across your website. Your personal ability and kindness in taking care of every aspect was priceless. I am not sure what I would’ve done if I hadn’t come across such a thing like this. I can also at this time look ahead to my future. Thanks so much for the specialized and effective help. I won’t think twice to recommend your web blog to any person who will need guidelines about this issue.

  9. Good day! This is kind of off topic but I need some advice from an established blog. Is it very difficult to set up your own blog? I’m not very techincal but I can figure things out pretty quick. I’m thinking about making my own but I’m not sure where to start. Do you have any ideas or suggestions? Appreciate it

  10. Whats Going down i am new to this, I stumbled upon this I’ve found It absolutely useful and it has helped me out loads. I’m hoping to contribute & aid other users like its helped me. Great job.

  11. you’re actually a excellent webmaster. The site loading pace is incredible. It kind of feels that you are doing any unique trick. Furthermore, The contents are masterpiece. you have done a wonderful process in this matter!

  12. Hey there would you mind letting me know which web host you’re working with? I’ve loaded your blog in 3 different internet browsers and I must say this blog loads a lot quicker then most. Can you suggest a good hosting provider at a honest price? Cheers, I appreciate it!

  13. Thanks for the marvelous posting! I seriously enjoyed reading it, you happen to be a great author.I will be sure to bookmark your blog and will often come back very soon. I want to encourage that you continue your great writing, have a nice weekend!

  14. Wonderful goods from you, man. I’ve take into account your stuff previous to and you are just extremely excellent. I actually like what you’ve obtained here, certainly like what you’re saying and the way in which you say it. You are making it enjoyable and you continue to take care of to stay it wise. I cant wait to learn much more from you. That is actually a tremendous web site.

  15. Nice post. I learn something more challenging on different blogs everyday. It will always be stimulating to read content from other writers and practice a little something from their store. I’d prefer to use some with the content on my blog whether you don’t mind. Natually I’ll give you a link on your web blog. Thanks for sharing.

Leave a Reply

Your email address will not be published. Required fields are marked *