In previous post, we have general knowledge about Reactive Streams and Java 9 Flow API Components and Behaviour. In this tutorial, we’re gonna look at an example that implements Publisher and Subscriber for reactive programming.
Related Articles:
– Java 9 Flow API – Reactive Streams
– Java 9 Flow API example – Processor
– Java 9 FLow SubmissionPublisher – A Concrete Publisher
I. Technologies
– Java 9
– Eclipse with Java 9 Support for Oxygen (4.7)
II. Project Overview
We will create a Publisher that is subscribed by two Subscribers.
– Publisher maintains a list of Subscriptions, each Subscription is correlative to each Subscriber above.
– Publisher uses one Subscription to push items to correlative Subscriber by Subscriber::onNext()
method.
– Subscriber uses Subscription to request items from Publisher by Subscription::request()
method.
– Publisher defines an Executor for multi-threading. Then request()
and onNext()
method work asynchronously, producing data to each Subscriber by Subscription is also asynchronous.
– After receiving all items successfully, Subscriber can request new data or cancel Subscription (random).
III. Practice
To understand how Publisher, Subscriber and Subscription behave and way to implementing them, please visit: Java 9 Flow API – Reactive Streams
1. Create implementation of Publisher
package com.javasampleapproach.java9flow.pubsub;
import static java.lang.Thread.currentThread;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class MyPublisher implements Publisher {
private static final String LOG_MESSAGE_FORMAT = "Publisher >> [%s] %s%n";
final ExecutorService executor = Executors.newFixedThreadPool(4);
private List subscriptions = Collections.synchronizedList(new ArrayList());
private final CompletableFuture terminated = new CompletableFuture<>();
@Override
public void subscribe(Subscriber super Integer> subscriber) {
MySubscription subscription = new MySubscription(subscriber, executor);
subscriptions.add(subscription);
subscriber.onSubscribe(subscription);
}
public void waitUntilTerminated() throws InterruptedException {
try {
terminated.get();
} catch (ExecutionException e) {
System.out.println(e);
}
}
private class MySubscription implements Subscription {
private final ExecutorService executor;
private Subscriber super Integer> subscriber;
private final AtomicInteger value;
private AtomicBoolean isCanceled;
public MySubscription(Subscriber super Integer> subscriber, ExecutorService executor) {
this.subscriber = subscriber;
this.executor = executor;
value = new AtomicInteger();
isCanceled = new AtomicBoolean(false);
}
@Override
public void request(long n) {
if (isCanceled.get())
return;
if (n < 0)
executor.execute(() -> subscriber.onError(new IllegalArgumentException()));
else
publishItems(n);
}
@Override
public void cancel() {
isCanceled.set(true);
synchronized (subscriptions) {
subscriptions.remove(this);
if (subscriptions.size() == 0)
shutdown();
}
}
private void publishItems(long n) {
for (int i = 0; i < n; i++) {
executor.execute(() -> {
int v = value.incrementAndGet();
log("publish item: [" + v + "] ...");
subscriber.onNext(v);
});
}
}
private void shutdown() {
log("Shut down executor...");
executor.shutdown();
newSingleThreadExecutor().submit(() -> {
log("Shutdown complete.");
terminated.complete(null);
});
}
}
private void log(String message, Object... args) {
String fullMessage = String.format(LOG_MESSAGE_FORMAT, currentThread().getName(), message);
System.out.printf(fullMessage, args);
}
}
2. Create implementation of Subscriber
package com.javasampleapproach.java9flow.pubsub;
import static java.lang.Thread.currentThread;
import java.util.Random;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
public class MySubscriber implements Subscriber {
private static final String LOG_MESSAGE_FORMAT = "Subscriber %s >> [%s] %s%n";
private static final int DEMAND = 3;
private static final Random RANDOM = new Random();
private String name;
private Subscription subscription;
private int count;
public MySubscriber(String name) {
this.name = name;
}
@Override
public void onSubscribe(Subscription subscription) {
log("Subscribed");
this.subscription = subscription;
count = DEMAND;
requestItems(DEMAND);
}
private void requestItems(int n) {
log("Requesting %d new items...", n);
subscription.request(n);
}
@Override
public void onNext(Integer item) {
if (item != null) {
log(item.toString());
synchronized (this) {
count--;
if (count == 0) {
if (RANDOM.nextBoolean()) {
count = DEMAND;
requestItems(count);
} else {
count = 0;
log("Cancelling subscription...");
subscription.cancel();
}
}
}
} else {
log("Null Item!");
}
}
@Override
public void onComplete() {
log("Complete!");
}
@Override
public void onError(Throwable t) {
log("Subscriber Error >> %s", t);
}
private void log(String message, Object... args) {
String fullMessage = String.format(LOG_MESSAGE_FORMAT, this.name, currentThread().getName(), message);
System.out.printf(fullMessage, args);
}
}
3. Create Test Class
package com.javasampleapproach.java9flow.pubsub;
public class MainApp {
public static void main(String[] args) throws InterruptedException {
MyPublisher publisher = new MyPublisher();
MySubscriber subscriberA = new MySubscriber("A");
MySubscriber subscriberB = new MySubscriber("B");
publisher.subscribe(subscriberA);
publisher.subscribe(subscriberB);
publisher.waitUntilTerminated();
}
}
4. Check Result
Subscriber A >> [main] Subscribed
Subscriber A >> [main] Requesting 3 new items...
Subscriber B >> [main] Subscribed
Subscriber B >> [main] Requesting 3 new items...
Publisher >> [pool-1-thread-2] publish item: [2] ...
Subscriber A >> [pool-1-thread-2] 2
Publisher >> [pool-1-thread-1] publish item: [1] ...
Subscriber A >> [pool-1-thread-1] 1
Publisher >> [pool-1-thread-1] publish item: [1] ...
Subscriber B >> [pool-1-thread-1] 1
Publisher >> [pool-1-thread-2] publish item: [2] ...
Subscriber B >> [pool-1-thread-2] 2
Publisher >> [pool-1-thread-3] publish item: [3] ...
Subscriber A >> [pool-1-thread-3] 3
Subscriber A >> [pool-1-thread-3] Cancelling subscription...
Publisher >> [pool-1-thread-4] publish item: [3] ...
Subscriber B >> [pool-1-thread-4] 3
Subscriber B >> [pool-1-thread-4] Requesting 3 new items...
Publisher >> [pool-1-thread-1] publish item: [4] ...
Subscriber B >> [pool-1-thread-1] 4
Publisher >> [pool-1-thread-2] publish item: [5] ...
Subscriber B >> [pool-1-thread-2] 5
Publisher >> [pool-1-thread-3] publish item: [6] ...
Subscriber B >> [pool-1-thread-3] 6
Subscriber B >> [pool-1-thread-1] Cancelling subscription...
Publisher >> [pool-1-thread-1] Shut down executor...
Publisher >> [pool-2-thread-1] Shutdown complete.
fdaywyjjsyqxqgrhixzmuchwuddanf
Best view i have ever seen !
Best view i have ever seen !
248781 114616Music began playing as soon as I opened this blog, so annoying! 332815
211755 751215You ought to participate in a contest for among the very best blogs on the web. I will recommend this website! 338229
374137 247830Be the precise blog in case you have wants to learn about this topic. You comprehend considerably its almost onerous to argue to you (not that I personally would needHaHa). You undoubtedly put a new spin for a topic thats been discussing for some time. Good stuff, just good! 830448
332669 630Thank you pertaining to giving this outstanding content on your web-site. I discovered it on google. I may check back once again if you publish extra aricles. 169041
Online-Slots-Spiel erlaubt Person bis genießen Sie
das Spiel ohne Ausgaben zuerst Preise.
Viele Menschen wirklich zu verbessern, in der Zeitraum sie zum ersten Mal starten genießen bis zugrunde gehen.
It’s actually a nice and useful piece of info. I’m happy that you shared this helpful information with us. Please stay us up to date like this. Thanks for sharing.
eye care should always be our top priority since the eye is a very delicate and irreplaceable organ’
After study a handful of the web sites for your web site now, and i also genuinely such as your technique for blogging. I bookmarked it to my bookmark site list and you will be checking back soon. Pls look at my site as well and let me know if you agree.
Play Black Ops Zombies… […]Call of Duty Black Ops zombie mode has been played by millions of gamers…[…]…
the travel packages that i have seen on the internet are sometimes oversold, they are cheap yet the inclusions sucks..
I have been exploring for a little bit for any high quality articles or blog posts in this sort of space . Exploring in Yahoo I eventually stumbled upon this web site. Studying this info So i am happy to express that I’ve an incredibly just right uncanny feeling I found out just what I needed. I such a lot for sure will make sure to do not fail to remember this site and provides it a look a relentless basis.
745099 680516Hey! Very good stuff, please keep us posted when you post something like that! 828874
Concerning me and my partner we have owned far more MP3 players about the many years than I can rely, including Sansas, iRivers, iPods (classic & touch), the Ibiza Rhapsody, and so on. But, the last several years I have resolved down in the direction of just one line of avid gamers. Why? Mainly because I was delighted in direction of uncover how well-designed and exciting in the direction of retain the services of the underappreciated (and broadly mocked) Zunes are.
Heya i’m for the first time here. I came across this board and I find It really useful & it helped me out a lot. I hope to give something back and aid others like you helped me.
58384 383066Cool text dude, maintain up the good work, just shared this with the mates 285332
of course like your web site however you have to check the spelling on several of your posts. A number of them are rife with spelling issues and I in finding it very troublesome to tell the truth then again I?¦ll certainly come back again.
Good day! I could have sworn I’ve been to this site before but after checking through some of the post I realized it’s new to me. Anyhow, I’m definitely delighted I found it and I’ll be book-marking and checking back frequently!
I wish to show appreciation to you for rescuing me from this type of situation. Because of surfing around through the search engines and seeing solutions that were not beneficial, I was thinking my entire life was well over. Existing minus the strategies to the issues you have sorted out by way of your site is a crucial case, as well as those which might have adversely damaged my entire career if I had not come across your website. Your personal ability and kindness in taking care of every aspect was priceless. I am not sure what I would’ve done if I hadn’t come across such a thing like this. I can also at this time look ahead to my future. Thanks so much for the specialized and effective help. I won’t think twice to recommend your web blog to any person who will need guidelines about this issue.
Good day! This is kind of off topic but I need some advice from an established blog. Is it very difficult to set up your own blog? I’m not very techincal but I can figure things out pretty quick. I’m thinking about making my own but I’m not sure where to start. Do you have any ideas or suggestions? Appreciate it
Whats Going down i am new to this, I stumbled upon this I’ve found It absolutely useful and it has helped me out loads. I’m hoping to contribute & aid other users like its helped me. Great job.
Very interesting info!Perfect just what I was searching for!
I am always searching online for ideas that can assist me. Thank you!
I like this post, enjoyed this one thanks for posting.
Great web site. A lot of helpful information here. I’m sending it to several pals ans also sharing in delicious. And naturally, thanks for your effort!
you’re actually a excellent webmaster. The site loading pace is incredible. It kind of feels that you are doing any unique trick. Furthermore, The contents are masterpiece. you have done a wonderful process in this matter!
Some really excellent info , Gladiolus I found this.
Hey there would you mind letting me know which web host you’re working with? I’ve loaded your blog in 3 different internet browsers and I must say this blog loads a lot quicker then most. Can you suggest a good hosting provider at a honest price? Cheers, I appreciate it!
Thanks for the marvelous posting! I seriously enjoyed reading it, you happen to be a great author.I will be sure to bookmark your blog and will often come back very soon. I want to encourage that you continue your great writing, have a nice weekend!
Real great info can be found on web blog.
Wonderful goods from you, man. I’ve take into account your stuff previous to and you are just extremely excellent. I actually like what you’ve obtained here, certainly like what you’re saying and the way in which you say it. You are making it enjoyable and you continue to take care of to stay it wise. I cant wait to learn much more from you. That is actually a tremendous web site.
Very interesting subject, regards for putting up. “The friendship that can cease has never been real.” by Saint Jerome.
808680 298203This is such a great resource that you are offering and you supply out at no cost. I appreciate seeing internet sites that realize the worth of offering a perfect beneficial resource entirely free. I genuinely loved reading your submit. 940904
Nice post. I learn something more challenging on different blogs everyday. It will always be stimulating to read content from other writers and practice a little something from their store. I’d prefer to use some with the content on my blog whether you don’t mind. Natually I’ll give you a link on your web blog. Thanks for sharing.