Reactive Streams is an API and pattern that provides a standard for asynchronous stream processing with non-blocking back pressure. It is also being introduced in Java 9 Flow API with four simple interfaces: Publisher
, Subscriber
, Subscription
and Processor
.
But Reactive Streams API is just low level to make practical use in reactive applications. So Reactor Core provides two main implementations of Publisher: Flux and Mono. In this tutorial, we’re gonna know what they are and simple ways to create them.
I. Overview
For more details about Reactive Streams and Publish-Subscribe Pattern, please visit:
Java 9 Flow API – Reactive Streams
1. Flux & Mono
A Flux<T> is a standard Publisher<T> representing a reactive sequence of 0..N items, optionally terminated by either a success signal or an error.
While Mono<T> is a specialized Publisher<T> that emits at most single-valued-or-empty result.
2. Simple ways to create Publishers with Flux and Mono
2.1 Mono
– Empty Mono:
Mono<String> noData = Mono.empty();
– Mono with value:
Mono<String> data = Mono.just("JSA");
– Mono that emits an Exception:
Mono.error(new CustomException());
2.2 Flux
– Empty Flux:
Flux<String> noData = Flux.empty();
– Flux with items:
Flux<String> data = Flux.just("Java", "Sample", "Approach", ".com");
– Flux from Collections:
List<String> list = Arrays.asList("JAVA", "SAMPLE", "APPROACH", ".COM"); Flux<String> sequence = Flux.fromIterable(list);
– Flux
Flux<Long> counter = Flux.interval(Duration.ofMillis(x));
– Flux that emits an Exception:
Flux.error(new CustomException());
II. Practice
0. Technology
– Java 8
– Maven 3.6.1
– Reactor Core 3.0.4, with the Aluminium release train.
1. Reactor installation in Maven
– First, import the BOM by adding the following to pom.xml:
<dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <version>Aluminium-SR1</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> </pre> - Next, add dependency: <pre class="lang:xml"> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> </dependencies>
2. Write Code
package com.javasampleapproach.reactorpublisher; import java.time.Duration; import java.util.Arrays; import java.util.List; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class MainApp { public static void main(String[] args) throws InterruptedException { System.out.println("=== Empty Mono ==="); Mono.empty().subscribe(System.out::println); System.out.println("=== Mono.just ==="); Mono.just("JSA") .map(item -> "Mono item: " + item) .subscribe(System.out::println); System.out.println("=== Empty Flux ==="); Flux.empty() .subscribe(System.out::println); System.out.println("=== Flux.just ==="); Flux.just("Java", "Sample", "Approach", ".com") .map(item -> item.toUpperCase()) .subscribe(System.out::print); System.out.println("\n=== Flux from List ==="); List<String> list = Arrays.asList("JAVA", "SAMPLE", "APPROACH", ".COM"); Flux.fromIterable(list) .map(item -> item.toLowerCase()) .subscribe(System.out::print); System.out.println("\n=== Flux emits increasing values each 100ms ==="); Flux.interval(Duration.ofMillis(100)) .map(item -> "tick: " + item) .take(10) .subscribe(System.out::println); Thread.sleep(1500); System.out.println("=== Mono emits an Exception ==="); Mono.error(new CustomException("Mono")) .doOnError(e -> {System.out.println("inside Mono doOnError()");}) .subscribe(System.out::println); System.out.println("=== Flux emits an Exception ==="); Flux.error(new CustomException("Flux")) .subscribe(System.out::println); } }
package com.javasampleapproach.reactorpublisher; public class CustomException extends RuntimeException { private static final long serialVersionUID = -5970845585469454688L; public CustomException(String type) { System.out.println(type + ": throw CustomException!"); } }
3. Run & Check Result
=== Empty Mono === === Mono.just === Mono item: JSA === Empty Flux === === Flux.just === ozenero.com === Flux from List === ozenero.com === Flux emits increasing values each 100ms === tick: 0 tick: 1 tick: 2 tick: 3 tick: 4 tick: 5 tick: 6 tick: 7 tick: 8 tick: 9 === Mono emits an Exception === Mono: throw CustomException! inside Mono doOnError() === Flux emits an Exception === Flux: throw CustomException! Exception in thread "main" reactor.core.Exceptions$ErrorCallbackNotImplemented: com.javasampleapproach.reactorpublisher.CustomException Caused by: com.javasampleapproach.reactorpublisher.CustomException at com.javasampleapproach.reactorpublisher.MainApp.main(MainApp.java:50)