Java 9 FLow SubmissionPublisher – A Concrete Publisher

java-9-flow-submissionpublisher-concrete-publisher-feature-image

JDK 9 provides a concrete Publisher named SubmissionPublisher that acts as a compliant Reactive Streams Publisher relying on drop handling and/or blocking for flow control. In this tutorial, we’re gonna take a look at SubmissionPublisher and an example that generates items for Subscribers.

Related Articles:
Java 9 Flow API – Reactive Streams
Java 9 Flow API example – Publisher and Subscriber
Java 9 Flow API example – Processor

I. Technologies

– Java 9
– Eclipse with Java 9 Support for Oxygen (4.7)

II. Overview

1. SubmissionPublisher

SubmissionPublisher is an implementation of Java 9 Flow.Publisher that asynchronously issues items to its subscribers until closing.

Depending on usage, we can indicate the Executor for SubmissionPublisher in its constructor methods:
– If we wanna submitting items in separate threads, and can estimate number of subscribers, consider using Executors.newFixedThreadPool(int) and constructor method:


SubmissionPublisher(Executor executor, int maxBufferCapacity);
// maxBufferCapacity: the maximum capacity for each subscriber's buffer.

– Otherwise, just call the default constructor (no input parameter) that will use ForkJoinPool.commonPool().

If a Subscriber has only one action that requests and processes all items, we can consider using consume(Consumer) method (which returns a CompletableFuture object) like this:


publisher.consume((data) -> process(data));

There are 2 publication methods:
submit​(): asynchronously publishes the given item to each subscriber, but blocks until resources are available.
offer(): publishes the given item asynchronously, to each current subscriber if possible, but the item may be dropped by one or more subscribers if resource limits are exceeded.

Some more useful methods:

  • close​(): issues onComplete signals to all subscribers, and disallows subsequent attempts to publish.
  • closeExceptionally​(Throwable error): issues onError signals to all subscribers with the given error, and disallows subsequent attempts to publish.
  • estimateMaximumLag​(): returns an estimate of the maximum number of items produced but not yet consumed among all subscribers.
  • estimateMinimumDemand​(): returns an estimate of the minimum number of items requested (via request) but not yet produced, among all subscribers.
  • getNumberOfSubscribers​().
  • hasSubscribers​().
  • isSubscribed​(Subscriber).
  • getSubscribers​(): returns list of current subscribers.

2. Project

submissionpublisher
We will create a Publisher (extends SubmissionPublisher) that is subscribed by two Subscribers:
– We don’t need to define any implementation of Subscription interface. Why?
SubmissionPublisher contains a linked list of BufferedSubscriptions, everytime we invoke subscribe() method to a Subscriber, there will be a new BufferedSubscription item in list which is related to that Subscriber automatically.
– Using submit(T item) method, Publisher periodically publishes the items generated from a Supplier to Subscribers (Publisher submit() method will invoke Subscription onNext() method).
– After receiving all items successfully, Subscriber can request new data or cancel Subscription (random).
– When generated items reach to MAX_ITEM_TO_PUBLISH, we will stop Publisher by using close() method (that will send onComplete signal to Subscribers).

III. Practice

1. Create subclass of SubmissionPublisher


package com.javasampleapproach.java9flow.submissionpublisher;

import static java.lang.Thread.currentThread;

import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

public class MyPublisher extends SubmissionPublisher {

	private static final String LOG_MESSAGE_FORMAT = "Publisher >> [%s] %s%n";
	private final int MAX_ITEM_TO_PUBLISH = 5;

	private final ScheduledFuture periodicTask;
	private final ScheduledExecutorService scheduler;

	private final AtomicInteger i;

	MyPublisher(Executor executor, int maxBufferCapacity, long period, TimeUnit unit) {

		super(executor, maxBufferCapacity);

		// if using the default, normally the ForkJoinPool.commonPool(), call:
		// super();
		i = new AtomicInteger(0);

		scheduler = new ScheduledThreadPoolExecutor(1);
		periodicTask = scheduler.scheduleAtFixedRate(() -> {
			Integer item = supplier.get();
						
			log("publishing item: " + item + " ...");

			submit(item);

			log("estimateMaximumLag: " + super.estimateMaximumLag());
			log("estimateMinimumDemand: " + super.estimateMinimumDemand());

			if (item == MAX_ITEM_TO_PUBLISH) {
				close();
			}
			
		}, 0, period, unit);
	}

	@Override
	public void subscribe(Subscriber subscriber) {
		super.subscribe(subscriber);
	}

	public void close() {
		log("shutting down...");

		List> subscribers = getSubscribers();
		for (Subscriber subscriber : subscribers) {
			log("Subscriber " + subscriber.toString() + " isSubscribed(): " + isSubscribed(subscriber));
		}

		periodicTask.cancel(false);
		scheduler.shutdown();

		super.close();
	}

	Supplier supplier = new Supplier<>() {

		@Override
		public Integer get() {

			int value = i.incrementAndGet();

			return Integer.valueOf(value);
		}
	};

	private void log(String message, Object... args) {
		String fullMessage = String.format(LOG_MESSAGE_FORMAT, currentThread().getName(), message);

		System.out.printf(fullMessage, args);
	}
}

2. Create implementation of Subscriber

package com.javasampleapproach.java9flow.submissionpublisher;

import java.util.Random;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicInteger;

import static java.lang.Thread.currentThread;

public class MySubscriber implements Subscriber<Object> {

	private static final String LOG_MESSAGE_FORMAT = "~~ Subscriber %s >> [%s] %s%n";
	private static final Random RANDOM = new Random();

	private Subscription subscription;
	private AtomicInteger count;

	private String name;
	private int DEMAND = 0;

	public MySubscriber(String name) {
		this.name = name;
	}

	@Override
	public void onSubscribe(Subscription subscription) {
		log("Subscribed...");
		this.subscription = subscription;

		request(DEMAND);
	}

	public void setDEMAND(int n) {
		this.DEMAND = n;
		count = new AtomicInteger(DEMAND);
	}

	private void request(int n) {
		log("request new " + n + " items...");
		subscription.request(n);
	}

	@Override
	public void onNext(Object item) {
		log("itemValue: " + item);

		if (count.decrementAndGet() == 0) {
			if (RANDOM.nextBoolean()) {
				request(DEMAND);
				count.set(DEMAND);
			} else {
				log("Cancel subscribe...");
				subscription.cancel();
			}
		}

	}

	@Override
	public void onComplete() {
		log("Complete!");
	}

	@Override
	public void onError(Throwable t) {
		log("Error: " + t.getMessage());
	}

	private void log(String message, Object... args) {
		String fullMessage = String.format(LOG_MESSAGE_FORMAT, this.name, currentThread().getName(), message);

		System.out.printf(fullMessage, args);
	}

}

3. Create Test Class

package com.javasampleapproach.java9flow.submissionpublisher;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class MainApp {

	public static void main(String[] args) {

		final int MAX_BUFFER_CAPACITY = 128;
		final ExecutorService executor = Executors.newFixedThreadPool(4);

		MyPublisher publisher = new MyPublisher(executor, MAX_BUFFER_CAPACITY, 200, TimeUnit.MILLISECONDS);
		
		MySubscriber subscriberA = new MySubscriber("A");
		subscriberA.setDEMAND(3);

		MySubscriber subscriberB = new MySubscriber("B");
		subscriberB.setDEMAND(6);

		publisher.subscribe(subscriberA);
		publisher.subscribe(subscriberB);
	}
}

4. Check Result

Case 1:
– Subscriber A requests 3 items, then cancel subscribe.
– Subscriber B requests 6 items, then cancel subscribe.

Publisher >> [pool-2-thread-1] publishing item: 1 ...
~~ Subscriber A >> [pool-1-thread-1] Subscribed...
~~ Subscriber B >> [pool-1-thread-2] Subscribed...
~~ Subscriber A >> [pool-1-thread-1] request new 3 items...
Publisher >> [pool-2-thread-1] estimateMaximumLag: 1
~~ Subscriber B >> [pool-1-thread-2] request new 6 items...
Publisher >> [pool-2-thread-1] estimateMinimumDemand: -1
~~ Subscriber B >> [pool-1-thread-2] itemValue: 1
~~ Subscriber A >> [pool-1-thread-1] itemValue: 1
Publisher >> [pool-2-thread-1] publishing item: 2 ...
Publisher >> [pool-2-thread-1] estimateMaximumLag: 1
~~ Subscriber A >> [pool-1-thread-3] itemValue: 2
~~ Subscriber B >> [pool-1-thread-4] itemValue: 2
Publisher >> [pool-2-thread-1] estimateMinimumDemand: 1
Publisher >> [pool-2-thread-1] publishing item: 3 ...
Publisher >> [pool-2-thread-1] estimateMaximumLag: 1
~~ Subscriber A >> [pool-1-thread-2] itemValue: 3
~~ Subscriber B >> [pool-1-thread-1] itemValue: 3
~~ Subscriber A >> [pool-1-thread-2] Cancel subscribe...
Publisher >> [pool-2-thread-1] estimateMinimumDemand: 0
Publisher >> [pool-2-thread-1] publishing item: 4 ...
Publisher >> [pool-2-thread-1] estimateMaximumLag: 1
~~ Subscriber B >> [pool-1-thread-3] itemValue: 4
Publisher >> [pool-2-thread-1] estimateMinimumDemand: 2
Publisher >> [pool-2-thread-1] publishing item: 5 ...
~~ Subscriber B >> [pool-1-thread-4] itemValue: 5
Publisher >> [pool-2-thread-1] estimateMaximumLag: 0
Publisher >> [pool-2-thread-1] estimateMinimumDemand: 1
Publisher >> [pool-2-thread-1] publishing item: 6 ...
Publisher >> [pool-2-thread-1] estimateMaximumLag: 1
~~ Subscriber B >> [pool-1-thread-1] itemValue: 6
Publisher >> [pool-2-thread-1] estimateMinimumDemand: 0
~~ Subscriber B >> [pool-1-thread-1] Cancel subscribe...
Publisher >> [pool-2-thread-1] publishing item: 7 ...
Publisher >> [pool-2-thread-1] estimateMaximumLag: 0
Publisher >> [pool-2-thread-1] estimateMinimumDemand: 0
Publisher >> [pool-2-thread-1] publishing item: 8 ...
Publisher >> [pool-2-thread-1] estimateMaximumLag: 0
Publisher >> [pool-2-thread-1] estimateMinimumDemand: 0
Publisher >> [pool-2-thread-1] publishing item: 9 ...
Publisher >> [pool-2-thread-1] estimateMaximumLag: 0
Publisher >> [pool-2-thread-1] estimateMinimumDemand: 0
Publisher >> [pool-2-thread-1] publishing item: 10 ...
Publisher >> [pool-2-thread-1] estimateMaximumLag: 0
Publisher >> [pool-2-thread-1] estimateMinimumDemand: 0
Publisher >> [pool-2-thread-1] publishing item: 11 ...
Publisher >> [pool-2-thread-1] estimateMaximumLag: 0
Publisher >> [pool-2-thread-1] estimateMinimumDemand: 0

Case 2:
– Subscriber A requests 3 items, then cancel subscribe.
– Subscriber B requests 6 items, then request more 6 items.
– Because Publisher submits only 11 items, so Subscriber B only receives 11 items (while requesting total 12), then receives onComplete signal from Publisher (via Publisher close() method) when it still subscribes.

~~ Subscriber B >> [pool-1-thread-2] Subscribed...
Publisher >> [pool-2-thread-1] publishing item: 1 ...
Publisher >> [pool-2-thread-1] estimateMaximumLag: 1
Publisher >> [pool-2-thread-1] estimateMinimumDemand: -1
~~ Subscriber A >> [pool-1-thread-1] Subscribed...
~~ Subscriber A >> [pool-1-thread-1] request new 3 items...
~~ Subscriber B >> [pool-1-thread-2] request new 6 items...
~~ Subscriber A >> [pool-1-thread-1] itemValue: 1
~~ Subscriber B >> [pool-1-thread-2] itemValue: 1
Publisher >> [pool-2-thread-1] publishing item: 2 ...
Publisher >> [pool-2-thread-1] estimateMaximumLag: 1
~~ Subscriber A >> [pool-1-thread-3] itemValue: 2
~~ Subscriber B >> [pool-1-thread-4] itemValue: 2
Publisher >> [pool-2-thread-1] estimateMinimumDemand: 1
Publisher >> [pool-2-thread-1] publishing item: 3 ...
~~ Subscriber B >> [pool-1-thread-2] itemValue: 3
~~ Subscriber A >> [pool-1-thread-1] itemValue: 3
Publisher >> [pool-2-thread-1] estimateMaximumLag: 1
Publisher >> [pool-2-thread-1] estimateMinimumDemand: 0
~~ Subscriber A >> [pool-1-thread-1] Cancel subscribe...
Publisher >> [pool-2-thread-1] publishing item: 4 ...
Publisher >> [pool-2-thread-1] estimateMaximumLag: 1
~~ Subscriber B >> [pool-1-thread-3] itemValue: 4
Publisher >> [pool-2-thread-1] estimateMinimumDemand: 2
Publisher >> [pool-2-thread-1] publishing item: 5 ...
Publisher >> [pool-2-thread-1] estimateMaximumLag: 1
~~ Subscriber B >> [pool-1-thread-4] itemValue: 5
Publisher >> [pool-2-thread-1] estimateMinimumDemand: 1
Publisher >> [pool-2-thread-1] publishing item: 6 ...
~~ Subscriber B >> [pool-1-thread-2] itemValue: 6
Publisher >> [pool-2-thread-1] estimateMaximumLag: 1
~~ Subscriber B >> [pool-1-thread-2] request new 6 items...
Publisher >> [pool-2-thread-1] estimateMinimumDemand: 0
Publisher >> [pool-2-thread-1] publishing item: 7 ...
Publisher >> [pool-2-thread-1] estimateMaximumLag: 1
~~ Subscriber B >> [pool-1-thread-1] itemValue: 7
Publisher >> [pool-2-thread-1] estimateMinimumDemand: 5
Publisher >> [pool-2-thread-1] publishing item: 8 ...
Publisher >> [pool-2-thread-1] estimateMaximumLag: 1
~~ Subscriber B >> [pool-1-thread-3] itemValue: 8
Publisher >> [pool-2-thread-1] estimateMinimumDemand: 4
Publisher >> [pool-2-thread-1] publishing item: 9 ...
Publisher >> [pool-2-thread-1] estimateMaximumLag: 1
~~ Subscriber B >> [pool-1-thread-4] itemValue: 9
Publisher >> [pool-2-thread-1] estimateMinimumDemand: 3
Publisher >> [pool-2-thread-1] publishing item: 10 ...
Publisher >> [pool-2-thread-1] estimateMaximumLag: 1
~~ Subscriber B >> [pool-1-thread-2] itemValue: 10
Publisher >> [pool-2-thread-1] estimateMinimumDemand: 2
Publisher >> [pool-2-thread-1] publishing item: 11 ...
Publisher >> [pool-2-thread-1] estimateMaximumLag: 1
~~ Subscriber B >> [pool-1-thread-1] itemValue: 11
Publisher >> [pool-2-thread-1] estimateMinimumDemand: 1
Publisher >> [pool-2-thread-1] shutting down...
Publisher >> [pool-2-thread-1] Subscriber com.javasampleapproach.java9flow.submissionpublisher.MySubscriber@4278bf01 isSubscribed(): true
~~ Subscriber B >> [pool-1-thread-3] Complete!

Case 3: We change MAX_ITEM_TO_PUBLISH to 5:
– Subscriber A requests 3 items, then request more 3 items.
– Subscriber B requests 6 items, then request more 6 items.
– Because Publisher submits only 5 items, so Subscriber A and B only receives 5 items (while request total 6 items for each), then receive onComplete signal from Publisher (via Publisher close() method) when they still subscribe.


~~ Subscriber A >> [pool-1-thread-1] Subscribed...
~~ Subscriber B >> [pool-1-thread-2] Subscribed...
Publisher >> [pool-2-thread-1] publishing item: 1 ...
~~ Subscriber B >> [pool-1-thread-2] request new 6 items...
~~ Subscriber A >> [pool-1-thread-1] request new 3 items...
Publisher >> [pool-2-thread-1] estimateMaximumLag: 1
Publisher >> [pool-2-thread-1] estimateMinimumDemand: 2
~~ Subscriber B >> [pool-1-thread-2] itemValue: 1
~~ Subscriber A >> [pool-1-thread-1] itemValue: 1
Publisher >> [pool-2-thread-1] publishing item: 2 ...
Publisher >> [pool-2-thread-1] estimateMaximumLag: 1
~~ Subscriber B >> [pool-1-thread-4] itemValue: 2
Publisher >> [pool-2-thread-1] estimateMinimumDemand: 1
~~ Subscriber A >> [pool-1-thread-3] itemValue: 2
Publisher >> [pool-2-thread-1] publishing item: 3 ...
Publisher >> [pool-2-thread-1] estimateMaximumLag: 1
~~ Subscriber B >> [pool-1-thread-1] itemValue: 3
~~ Subscriber A >> [pool-1-thread-2] itemValue: 3
Publisher >> [pool-2-thread-1] estimateMinimumDemand: 0
~~ Subscriber A >> [pool-1-thread-2] request new 3 items...
Publisher >> [pool-2-thread-1] publishing item: 4 ...
~~ Subscriber A >> [pool-1-thread-4] itemValue: 4
~~ Subscriber B >> [pool-1-thread-3] itemValue: 4
Publisher >> [pool-2-thread-1] estimateMaximumLag: 1
Publisher >> [pool-2-thread-1] estimateMinimumDemand: 2
Publisher >> [pool-2-thread-1] publishing item: 5 ...
Publisher >> [pool-2-thread-1] estimateMaximumLag: 1
~~ Subscriber B >> [pool-1-thread-2] itemValue: 5
~~ Subscriber A >> [pool-1-thread-1] itemValue: 5
Publisher >> [pool-2-thread-1] estimateMinimumDemand: 1
Publisher >> [pool-2-thread-1] shutting down...
Publisher >> [pool-2-thread-1] Subscriber com.javasampleapproach.java9flow.submissionpublisher.MySubscriber@42d273fa isSubscribed(): true
Publisher >> [pool-2-thread-1] Subscriber com.javasampleapproach.java9flow.submissionpublisher.MySubscriber@dd73db isSubscribed(): true
~~ Subscriber B >> [pool-1-thread-3] Complete!
~~ Subscriber A >> [pool-1-thread-4] Complete!

Special case: Using consume(Consumer) method:


package com.javasampleapproach.java9flow.submissionpublisher;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class MainApp {

	public static void main(String[] args) {

		final int MAX_BUFFER_CAPACITY = 128;
		final ExecutorService executor = Executors.newFixedThreadPool(4);

		MyPublisher publisher = new MyPublisher(executor, MAX_BUFFER_CAPACITY, 200, TimeUnit.MILLISECONDS);

		publisher.consume((data) -> process(data));
	}

	static void process(Integer i) {
		System.out.println("consume() testing: " + i.toString());
	}
}

The result:


Publisher >> [pool-2-thread-1] publishing item: 1 ...
Publisher >> [pool-2-thread-1] estimateMaximumLag: 1
Publisher >> [pool-2-thread-1] estimateMinimumDemand: 9223372036854775806 //‭7FFFFFFFFFFFFFFE‬ : Long.MAX_VALUE - 1
consume() testing: 1
Publisher >> [pool-2-thread-1] publishing item: 2 ...
Publisher >> [pool-2-thread-1] estimateMaximumLag: 1
consume() testing: 2
Publisher >> [pool-2-thread-1] estimateMinimumDemand: 9223372036854775805 //‭7FFFFFFFFFFFFFFD‬ : Long.MAX_VALUE - 2
Publisher >> [pool-2-thread-1] publishing item: 3 ...
Publisher >> [pool-2-thread-1] estimateMaximumLag: 1
consume() testing: 3
Publisher >> [pool-2-thread-1] estimateMinimumDemand: 9223372036854775804 //‭7FFFFFFFFFFFFFFC : Long.MAX_VALUE - 3
Publisher >> [pool-2-thread-1] publishing item: 4 ...
consume() testing: 4
Publisher >> [pool-2-thread-1] estimateMaximumLag: 1
Publisher >> [pool-2-thread-1] estimateMinimumDemand: 9223372036854775803 //‭7FFFFFFFFFFFFFFB : Long.MAX_VALUE - 4
Publisher >> [pool-2-thread-1] publishing item: 5 ...
consume() testing: 5
Publisher >> [pool-2-thread-1] estimateMaximumLag: 1
Publisher >> [pool-2-thread-1] estimateMinimumDemand: 9223372036854775802 //‭7FFFFFFFFFFFFFFA : Long.MAX_VALUE - 5
Publisher >> [pool-2-thread-1] shutting down...
Publisher >> [pool-2-thread-1] Subscriber java.util.concurrent.SubmissionPublisher$ConsumerSubscriber@1c39c86 isSubscribed(): true

Look at estimateMinimumDemand which is the returned value of estimateMinimumDemand() method.
When using consume() method, we don’t specify any Subscriber for Publisher, so it will initiate estimateMinimumDemand value by Long.MAX_VALUE, and subtract 1 every consumption.

3 thoughts on “Java 9 FLow SubmissionPublisher – A Concrete Publisher”

  1. Thanks for this great example!

    One issue I have is that the main app never terminates after the publisher has closed itself and the subscribers are notified that the subscription is complete. What do I do wrong or is there anything missing in the example?

Leave a Reply

Your email address will not be published. Required fields are marked *