Reactor – Simple Ways to create Flux/Mono

Reactive Streams is an API and pattern that provides a standard for asynchronous stream processing with non-blocking back pressure. It is also being introduced in Java 9 Flow API with four simple interfaces: Publisher, Subscriber, Subscription and Processor.

But Reactive Streams API is just low level to make practical use in reactive applications. So Reactor Core provides two main implementations of Publisher: Flux and Mono. In this tutorial, we’re gonna know what they are and simple ways to create them.

I. Overview

For more details about Reactive Streams and Publish-Subscribe Pattern, please visit:
Java 9 Flow API – Reactive Streams

1. Flux & Mono

A Flux<T> is a standard Publisher<T> representing a reactive sequence of 0..N items, optionally terminated by either a success signal or an error.

While Mono<T> is a specialized Publisher<T> that emits at most single-valued-or-empty result.

2. Simple ways to create Publishers with Flux and Mono

2.1 Mono

Empty Mono:

Mono<String> noData = Mono.empty();

– Mono with value:

Mono<String> data = Mono.just("JSA");

– Mono that emits an Exception:

Mono.error(new CustomException());

2.2 Flux

Empty Flux:

Flux<String> noData = Flux.empty();

– Flux with items:

Flux<String> data = Flux.just("Java", "Sample", "Approach", ".com");

– Flux from Collections:

List<String> list = Arrays.asList("JAVA", "SAMPLE", "APPROACH", ".COM");
Flux<String> sequence = Flux.fromIterable(list);

– Flux that is infinite and ticks every x units of time with an increasing Long:

Flux<Long> counter = Flux.interval(Duration.ofMillis(x));

– Flux that emits an Exception:

Flux.error(new CustomException());

II. Practice

0. Technology

– Java 8
– Maven 3.6.1
– Reactor Core 3.0.4, with the Aluminium release train.

1. Reactor installation in Maven

– First, import the BOM by adding the following to pom.xml:

- Next, add dependency:
<pre class="lang:xml">

2. Write Code

package com.javasampleapproach.reactorpublisher;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class MainApp {

	public static void main(String[] args) throws InterruptedException {

		System.out.println("=== Empty Mono ===");

		System.out.println("=== Mono.just ===");
			.map(item -> "Mono item: " + item)
		System.out.println("=== Empty Flux ===");

		System.out.println("=== Flux.just ===");
		Flux.just("Java", "Sample", "Approach", ".com")
			.map(item -> item.toUpperCase())

		System.out.println("\n=== Flux from List ===");
		List<String> list = Arrays.asList("JAVA", "SAMPLE", "APPROACH", ".COM");
			.map(item -> item.toLowerCase())

		System.out.println("\n=== Flux emits increasing values each 100ms ===");
			.map(item -> "tick: " + item)

		System.out.println("=== Mono emits an Exception ===");
		Mono.error(new CustomException("Mono"))
			.doOnError(e -> {System.out.println("inside Mono doOnError()");})
		System.out.println("=== Flux emits an Exception ===");
		Flux.error(new CustomException("Flux"))
package com.javasampleapproach.reactorpublisher;

public class CustomException extends RuntimeException {

	private static final long serialVersionUID = -5970845585469454688L;

	public CustomException(String type) {
		System.out.println(type + ": throw CustomException!");

3. Run & Check Result

=== Empty Mono ===
=== Mono.just ===
Mono item: JSA
=== Empty Flux ===
=== Flux.just ===
=== Flux from List ===
=== Flux emits increasing values each 100ms ===
tick: 0
tick: 1
tick: 2
tick: 3
tick: 4
tick: 5
tick: 6
tick: 7
tick: 8
tick: 9
=== Mono emits an Exception ===
Mono: throw CustomException!
inside Mono doOnError()
=== Flux emits an Exception ===
Flux: throw CustomException!
Exception in thread "main" reactor.core.Exceptions$ErrorCallbackNotImplemented: com.javasampleapproach.reactorpublisher.CustomException
Caused by: com.javasampleapproach.reactorpublisher.CustomException
	at com.javasampleapproach.reactorpublisher.MainApp.main(
0 0 votes
Article Rating
Notify of
Newest Most Voted
Inline Feedbacks
View all comments