Java 9 Flow API example – Publisher and Subscriber

java-9-flow-api-example-publisher-and-subscriber-feature-image

In previous post, we have general knowledge about Reactive Streams and Java 9 Flow API Components and Behaviour. In this tutorial, we’re gonna look at an example that implements Publisher and Subscriber for reactive programming.

Related Articles:
Java 9 Flow API – Reactive Streams
Java 9 Flow API example – Processor
Java 9 FLow SubmissionPublisher – A Concrete Publisher

I. Technologies

– Java 9
– Eclipse with Java 9 Support for Oxygen (4.7)

II. Project Overview


We will create a Publisher that is subscribed by two Subscribers.
Publisher maintains a list of Subscriptions, each Subscription is correlative to each Subscriber above.
Publisher uses one Subscription to push items to correlative Subscriber by Subscriber::onNext() method.
Subscriber uses Subscription to request items from Publisher by Subscription::request() method.
Publisher defines an Executor for multi-threading. Then request() and onNext() method work asynchronously, producing data to each Subscriber by Subscription is also asynchronous.
– After receiving all items successfully, Subscriber can request new data or cancel Subscription (random).

III. Practice

To understand how Publisher, Subscriber and Subscription behave and way to implementing them, please visit: Java 9 Flow API – Reactive Streams

1. Create implementation of Publisher


package com.javasampleapproach.java9flow.pubsub;

import static java.lang.Thread.currentThread;
import static java.util.concurrent.Executors.newSingleThreadExecutor;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class MyPublisher implements Publisher {

	private static final String LOG_MESSAGE_FORMAT = "Publisher >> [%s] %s%n";

	final ExecutorService executor = Executors.newFixedThreadPool(4);
	private List subscriptions = Collections.synchronizedList(new ArrayList());

	private final CompletableFuture terminated = new CompletableFuture<>();

	@Override
	public void subscribe(Subscriber subscriber) {
		MySubscription subscription = new MySubscription(subscriber, executor);

		subscriptions.add(subscription);

		subscriber.onSubscribe(subscription);
	}

	public void waitUntilTerminated() throws InterruptedException {
		try {
			terminated.get();
		} catch (ExecutionException e) {
			System.out.println(e);
		}
	}

	private class MySubscription implements Subscription {

		private final ExecutorService executor;

		private Subscriber subscriber;
		private final AtomicInteger value;
		private AtomicBoolean isCanceled;

		public MySubscription(Subscriber subscriber, ExecutorService executor) {
			this.subscriber = subscriber;
			this.executor = executor;

			value = new AtomicInteger();
			isCanceled = new AtomicBoolean(false);
		}

		@Override
		public void request(long n) {
			if (isCanceled.get())
				return;

			if (n < 0)
				executor.execute(() -> subscriber.onError(new IllegalArgumentException()));
			else
				publishItems(n);
		}

		@Override
		public void cancel() {
			isCanceled.set(true);

			synchronized (subscriptions) {
				subscriptions.remove(this);
				if (subscriptions.size() == 0)
					shutdown();
			}
		}

		private void publishItems(long n) {
			for (int i = 0; i < n; i++) {

				executor.execute(() -> {
					int v = value.incrementAndGet();
					log("publish item: [" + v + "] ...");
					subscriber.onNext(v);
				});
			}
		}

		private void shutdown() {
			log("Shut down executor...");
			executor.shutdown();
			newSingleThreadExecutor().submit(() -> {

				log("Shutdown complete.");
				terminated.complete(null);
			});
		}

	}

	private void log(String message, Object... args) {
		String fullMessage = String.format(LOG_MESSAGE_FORMAT, currentThread().getName(), message);

		System.out.printf(fullMessage, args);
	}
}

2. Create implementation of Subscriber


package com.javasampleapproach.java9flow.pubsub;

import static java.lang.Thread.currentThread;

import java.util.Random;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

public class MySubscriber implements Subscriber {

	private static final String LOG_MESSAGE_FORMAT = "Subscriber %s >> [%s] %s%n";

	private static final int DEMAND = 3;
	private static final Random RANDOM = new Random();

	private String name;
	private Subscription subscription;

	private int count;
	
	public MySubscriber(String name) {
		this.name = name;
	}

	@Override
	public void onSubscribe(Subscription subscription) {
		log("Subscribed");
		this.subscription = subscription;

		count = DEMAND;
		requestItems(DEMAND);
	}

	private void requestItems(int n) {
		log("Requesting %d new items...", n);
		subscription.request(n);
	}

	@Override
	public void onNext(Integer item) {
		if (item != null) {
			log(item.toString());

			synchronized (this) {
				count--;

				if (count == 0) {
					if (RANDOM.nextBoolean()) {
						count = DEMAND;
						requestItems(count);
					} else {
						count = 0;
						log("Cancelling subscription...");
						subscription.cancel();
					}
				}
			}
		} else {
			log("Null Item!");
		}
	}

	@Override
	public void onComplete() {
		log("Complete!");
	}

	@Override
	public void onError(Throwable t) {
		log("Subscriber Error >> %s", t);
	}

	private void log(String message, Object... args) {
		String fullMessage = String.format(LOG_MESSAGE_FORMAT, this.name, currentThread().getName(), message);

		System.out.printf(fullMessage, args);
	}
}

3. Create Test Class


package com.javasampleapproach.java9flow.pubsub;

public class MainApp {

	public static void main(String[] args) throws InterruptedException {
		
		MyPublisher publisher = new MyPublisher();
		MySubscriber subscriberA = new MySubscriber("A");
		MySubscriber subscriberB = new MySubscriber("B");
		
		publisher.subscribe(subscriberA);
		publisher.subscribe(subscriberB);
		
		publisher.waitUntilTerminated();
	}
}

4. Check Result


Subscriber A >> [main] Subscribed
Subscriber A >> [main] Requesting 3 new items...
Subscriber B >> [main] Subscribed
Subscriber B >> [main] Requesting 3 new items...
Publisher >> [pool-1-thread-2] publish item: [2] ...
Subscriber A >> [pool-1-thread-2] 2
Publisher >> [pool-1-thread-1] publish item: [1] ...
Subscriber A >> [pool-1-thread-1] 1
Publisher >> [pool-1-thread-1] publish item: [1] ...
Subscriber B >> [pool-1-thread-1] 1
Publisher >> [pool-1-thread-2] publish item: [2] ...
Subscriber B >> [pool-1-thread-2] 2
Publisher >> [pool-1-thread-3] publish item: [3] ...
Subscriber A >> [pool-1-thread-3] 3
Subscriber A >> [pool-1-thread-3] Cancelling subscription...
Publisher >> [pool-1-thread-4] publish item: [3] ...
Subscriber B >> [pool-1-thread-4] 3
Subscriber B >> [pool-1-thread-4] Requesting 3 new items...
Publisher >> [pool-1-thread-1] publish item: [4] ...
Subscriber B >> [pool-1-thread-1] 4
Publisher >> [pool-1-thread-2] publish item: [5] ...
Subscriber B >> [pool-1-thread-2] 5
Publisher >> [pool-1-thread-3] publish item: [6] ...
Subscriber B >> [pool-1-thread-3] 6
Subscriber B >> [pool-1-thread-1] Cancelling subscription...
Publisher >> [pool-1-thread-1] Shut down executor...
Publisher >> [pool-2-thread-1] Shutdown complete.

One thought on “Java 9 Flow API example – Publisher and Subscriber”

Leave a Reply

Your email address will not be published. Required fields are marked *