Java 9 Flow API example – Processor

java-9-flow-api-example-processor-feature-image

In previous post, we have general knowledge about Reactive Streams and Java 9 Flow API Components and Behaviour. In this tutorial, we’re gonna look at an example that implements Publisher, Subscriber with Processor as a bridge for reactive programming.

Related Articles:
Java 9 Flow API – Reactive Streams
Java 9 Flow API example – Publisher and Subscriber
Java 9 FLow SubmissionPublisher – A Concrete Publisher

I. Technologies

– Java 9
– Eclipse with Java 9 Support for Oxygen (4.7)

II. Overview

1. Processor

A Processor is a component that sits between the Publisher and Subscriber. It acts as:
+ a Subscriber when emitting a request signal to Publisher
+ a Publisher when pushing items to Subscriber.

We can create one or more Processors in chain which link a Publisher to a Subscriber.
reactive-stream-pubisher-processer-subscriber

2. Project

We will create a Publisher that is subscribed by a Processor, and that Processor will publish data to a Subscriber.

Publisher define a Subscription to work with Processor.
Processor define its own Subscription to work with Subscriber.

– Using Subscriber::onNext() method, Publisher pushes items to Processor, and Processor pushes items to Subscriber.
– Using Subscription::request() method, Processor requests items from Publisher, and Subscriber requests items from Processor.

Publisher and Processor defines an Executor for multi-threading. Then request() and onNext() method work asynchronously.

Processor has a data buffer to store items in case the demand number of items requested by Subscriber and Processor are different.

III. Practice

To understand how Publisher, Subscriber, Subscription and Processor behave and way to implementing them, please visit: Java 9 Flow API – Reactive Streams

Publisher<Integer> —— Processor<Integer, String> —— Subscriber<String>

// --------- Publisher---------
public class MyPublisher implements Publisher<Integer> {
	final ExecutorService executor = Executors.newFixedThreadPool(4);
	private MySubscription subscription;

	@Override
	public void subscribe(Subscriber<? super Integer> subscriber) { }

	private class MySubscription implements Subscription {
		private Subscriber<? super Integer> subscriber;

		@Override
		public void request(long n) { }

		@Override
		public void cancel() { }
	}
}

// --------- Processor ---------
public class MyProcessor implements Processor<Integer, String> {
	private Subscription publisherSubscription;

	final ExecutorService executor = Executors.newFixedThreadPool(4);
	private MySubscription subscription;
	private ConcurrentLinkedQueue<String> dataItems;

	@Override
	public void subscribe(Subscriber<? super String> subscriber) { }

	@Override
	public void onSubscribe(Subscription subscription) { }

	@Override
	public void onNext(Integer item) { }

	@Override
	public void onComplete() { }

	@Override
	public void onError(Throwable t) { }

	private class MySubscription implements Subscription {

		private Subscriber<? super String> subscriber;

		@Override
		public void request(long n) { }

		@Override
		public void cancel() { }
	}
}

// --------- Subscriber ---------
public class MySubscriber implements Subscriber<String> {
	private Subscription subscription;

	@Override
	public void onSubscribe(Subscription subscription) { }

	@Override
	public void onNext(String item) { }

	@Override
	public void onComplete() { }

	@Override
	public void onError(Throwable t) { }
}

1. Create implementation of Publisher

package com.javasampleapproach.java9flow.pubprocsub;

import static java.lang.Thread.currentThread;
import static java.util.concurrent.Executors.newSingleThreadExecutor;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class MyPublisher implements Publisher<Integer> {

	private static final String LOG_MESSAGE_FORMAT = "Publisher >> [%s] %s%n";

	final ExecutorService executor = Executors.newFixedThreadPool(4);
	private MySubscription subscription;

	private final CompletableFuture<Void> terminated = new CompletableFuture<>();

	@Override
	public void subscribe(Subscriber<? super Integer> subscriber) {
		subscription = new MySubscription(subscriber, executor);

		subscriber.onSubscribe(subscription);
	}

	public void waitUntilTerminated() throws InterruptedException {
		try {
			terminated.get();
		} catch (ExecutionException e) {
			System.out.println(e);
		}
	}

	private class MySubscription implements Subscription {

		private final ExecutorService executor;

		private Subscriber<? super Integer> subscriber;
		private final AtomicInteger value;
		private AtomicBoolean isCanceled;

		public MySubscription(Subscriber<? super Integer> subscriber, ExecutorService executor) {
			this.subscriber = subscriber;
			this.executor = executor;

			value = new AtomicInteger();
			isCanceled = new AtomicBoolean(false);
		}

		@Override
		public void request(long n) {
			if (isCanceled.get())
				return;

			if (n < 0)
				executor.execute(() -> subscriber.onError(new IllegalArgumentException()));
			else
				publishItems(n);
		}

		@Override
		public void cancel() {
			isCanceled.set(true);

			shutdown();
		}

		private void publishItems(long n) {
			for (int i = 0; i < n; i++) {

				executor.execute(() -> {
					int v = value.incrementAndGet();
					log("publish item: [" + v + "] ...");
					subscriber.onNext(v);
				});
			}
		}

		private void shutdown() {
			log("Shut down executor...");
			executor.shutdown();
			newSingleThreadExecutor().submit(() -> {

				log("Shutdown complete.");
				terminated.complete(null);
			});
		}

	}

	private void log(String message, Object... args) {
		String fullMessage = String.format(LOG_MESSAGE_FORMAT, currentThread().getName(), message);

		System.out.printf(fullMessage, args);
	}
}

2. Create implementation of Processor

package com.javasampleapproach.java9flow.pubprocsub;

import static java.lang.Thread.currentThread;
import static java.util.concurrent.Executors.newSingleThreadExecutor;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicBoolean;

public class MyProcessor implements Processor<Integer, String> {

	private static final String LOG_MESSAGE_FORMAT = "Processor >> [%s] %s%n";

	private Subscription publisherSubscription;

	final ExecutorService executor = Executors.newFixedThreadPool(4);
	private MySubscription subscription;
	private long DEMAND;
	private ConcurrentLinkedQueue<String> dataItems;

	private final CompletableFuture<Void> terminated = new CompletableFuture<>();

	public MyProcessor() {
		DEMAND = 0;
		dataItems = new ConcurrentLinkedQueue<String>();
	}

	public void setDEMAND(long n) {
		this.DEMAND = n;
	}

	@Override
	public void subscribe(Subscriber<? super String> subscriber) {
		subscription = new MySubscription(subscriber, executor);

		subscriber.onSubscribe(subscription);
	}

	@Override
	public void onSubscribe(Subscription subscription) {
		log("Subscribed...");

		publisherSubscription = subscription;

		requestItems();
	}

	private void requestItems() {
		log("Requesting %d new items...", DEMAND);
		publisherSubscription.request(DEMAND);
	}

	@Override
	public void onNext(Integer item) {

		if (null == item)
			throw new NullPointerException();

		dataItems.add("item value = " + item * 10 + " after processing");
		log("processing item: [" + item + "] ...");

	}

	@Override
	public void onComplete() {
		log("Complete!");
	}

	@Override
	public void onError(Throwable t) {
		log("Error >> %s", t);
	}

	private class MySubscription implements Subscription {

		private final ExecutorService executor;

		private Subscriber<? super String> subscriber;

		private AtomicBoolean isCanceled;

		public MySubscription(Subscriber<? super String> subscriber, ExecutorService executor) {
			this.executor = executor;
			this.subscriber = subscriber;

			isCanceled = new AtomicBoolean(false);
		}

		@Override
		public void request(long n) {
			if (isCanceled.get())
				return;

			if (n < 0)
				executor.execute(() -> subscriber.onError(new IllegalArgumentException()));
			else if (dataItems.size() > 0)
				publishItems(n);
			else if (dataItems.size() == 0) {
				subscriber.onComplete();
			}
		}

		private void publishItems(long n) {

			int remainItems = dataItems.size();

			if ((remainItems == n) || (remainItems > n)) {
				for (int i = 0; i < n; i++) {
					executor.execute(() -> {
						subscriber.onNext(dataItems.poll());
					});
				}

				log("Remaining " + (dataItems.size() - n) + " items to be published to Subscriber!");
			} else if ((remainItems > 0) && (remainItems < n)) {
				for (int i = 0; i < remainItems; i++) {
					executor.execute(() -> {
						subscriber.onNext(dataItems.poll());
					});
				}
				
				subscriber.onComplete();
			} else {
				log("Processor contains no item!");
			}

		}

		@Override
		public void cancel() {
			isCanceled.set(true);

			shutdown();
			publisherSubscription.cancel();
		}

		private void shutdown() {
			log("Shut down executor...");
			executor.shutdown();
			newSingleThreadExecutor().submit(() -> {

				log("Shutdown complete.");
				terminated.complete(null);
			});
		}
	}

	private void log(String message, Object... args) {
		String fullMessage = String.format(LOG_MESSAGE_FORMAT, currentThread().getName(), message);

		System.out.printf(fullMessage, args);
	}
}

3. Create implementation of Subscriber

package com.javasampleapproach.java9flow.pubprocsub;

import static java.lang.Thread.currentThread;

import java.util.Random;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

public class MySubscriber implements Subscriber<String> {

	private static final String LOG_MESSAGE_FORMAT = "Subscriber >> [%s] %s%n";

	private long DEMAND = 0;

	private Subscription subscription;

	private long count;

	public void setDEMAND(long n) {
		this.DEMAND = n;
		count = DEMAND;
	}

	@Override
	public void onSubscribe(Subscription subscription) {
		log("Subscribed");
		this.subscription = subscription;

		requestItems(DEMAND);
	}

	private void requestItems(long n) {
		log("Requesting %d new items...", n);
		subscription.request(n);
	}

	@Override
	public void onNext(String item) {
		if (item != null) {
			log(item);

			synchronized (this) {
				count--;

				if (count == 0) {
					log("Cancelling subscription...");
					subscription.cancel();
				}
			}
		} else {
			log("Null Item!");
		}
	}

	@Override
	public void onComplete() {
		log("onComplete(): There is no remaining item in Processor.");
	}

	@Override
	public void onError(Throwable t) {
		log("Error >> %s", t);
	}

	private void log(String message, Object... args) {
		String fullMessage = String.format(LOG_MESSAGE_FORMAT, currentThread().getName(), message);

		System.out.printf(fullMessage, args);
	}
}

4. Check Result

We uses this class to test:


package com.javasampleapproach.java9flow.pubprocsub;

public class MainApp {

	public static void main(String[] args) throws InterruptedException {
		
		MyPublisher publisher = new MyPublisher();
		
		MySubscriber subscriber = new MySubscriber();
		subscriber.setDEMAND(...); // MUST set number of items to be requested here!
		
		MyProcessor processor = new MyProcessor();
		processor.setDEMAND(...); // MUST set number of items to be requested here!
		
		publisher.subscribe(processor);
		processor.subscribe(subscriber);
		
		publisher.waitUntilTerminated();
		
	}
}

4.1 Subscriber DEMAND == Processor DEMAND


// ...
		subscriber.setDEMAND(3);
// ...
		processor.setDEMAND(3);
// ...

The result:

Processor >> [main] Subscribed...
Processor >> [main] Requesting 3 new items...
Publisher >> [pool-1-thread-1] publish item: [1] ...
Publisher >> [pool-1-thread-3] publish item: [3] ...
Publisher >> [pool-1-thread-2] publish item: [2] ...
Processor >> [pool-1-thread-2] processing item: [2] ...
Processor >> [pool-1-thread-1] processing item: [1] ...
Processor >> [pool-1-thread-3] processing item: [3] ...
Subscriber >> [main] Subscribed
Subscriber >> [main] Requesting 3 new items...
Processor >> [main] Remaining 0 items to be published to Subscriber!
Subscriber >> [pool-2-thread-2] item value = 20 after processing
Subscriber >> [pool-2-thread-1] item value = 30 after processing
Subscriber >> [pool-2-thread-3] item value = 10 after processing
Subscriber >> [pool-2-thread-3] Cancelling subscription...
Processor >> [pool-2-thread-3] Shut down executor...
Publisher >> [pool-2-thread-3] Shut down executor...
Processor >> [pool-3-thread-1] Shutdown complete.
Publisher >> [pool-4-thread-1] Shutdown complete.

4.2 Subscriber DEMAND > Processor DEMAND


// ...
		subscriber.setDEMAND(5);
// ...
		processor.setDEMAND(3);
// ...

In this case, we invoke Subscriber::onComplete() method to notice Subscriber that Processor have already processed all its items and pushed them to Subscriber.
The result:


Processor >> [main] Subscribed...
Processor >> [main] Requesting 3 new items...
Publisher >> [pool-1-thread-1] publish item: [1] ...
Publisher >> [pool-1-thread-2] publish item: [2] ...
Publisher >> [pool-1-thread-3] publish item: [3] ...
Subscriber >> [main] Subscribed
Processor >> [pool-1-thread-3] processing item: [3] ...
Subscriber >> [main] Requesting 5 new items...
Processor >> [pool-1-thread-2] processing item: [2] ...
Processor >> [pool-1-thread-1] processing item: [1] ...
Subscriber >> [main] onComplete(): There is no remaining item in Processor.
Subscriber >> [pool-2-thread-1] item value = 20 after processing
Subscriber >> [pool-2-thread-2] item value = 30 after processing
Subscriber >> [pool-2-thread-3] item value = 10 after processing

4.3 Subscriber DEMAND < Processor DEMAND


// ...
		subscriber.setDEMAND(3);
// ...
		processor.setDEMAND(5);
// ...

The result:


Processor >> [main] Subscribed...
Processor >> [main] Requesting 5 new items...
Publisher >> [pool-1-thread-1] publish item: [1] ...
Publisher >> [pool-1-thread-2] publish item: [2] ...
Processor >> [pool-1-thread-1] processing item: [1] ...
Publisher >> [pool-1-thread-1] publish item: [3] ...
Processor >> [pool-1-thread-1] processing item: [3] ...
Processor >> [pool-1-thread-2] processing item: [2] ...
Subscriber >> [main] Subscribed
Subscriber >> [main] Requesting 3 new items...
Publisher >> [pool-1-thread-3] publish item: [4] ...
Processor >> [pool-1-thread-3] processing item: [4] ...
Publisher >> [pool-1-thread-4] publish item: [5] ...
Processor >> [pool-1-thread-4] processing item: [5] ...
Processor >> [main] Remaining 2 items to be published to Subscriber!
Subscriber >> [pool-2-thread-1] item value = 10 after processing
Subscriber >> [pool-2-thread-2] item value = 20 after processing
Subscriber >> [pool-2-thread-3] item value = 30 after processing
Subscriber >> [pool-2-thread-3] Cancelling subscription...
Processor >> [pool-2-thread-3] Shut down executor...
Publisher >> [pool-2-thread-3] Shut down executor...
Publisher >> [pool-4-thread-1] Shutdown complete.
Processor >> [pool-3-thread-1] Shutdown complete.

One thought on “Java 9 Flow API example – Processor”

Leave a Reply

Your email address will not be published. Required fields are marked *