Reactor – How to Create Flux (Publisher) with Interval

In this tutorial, JavaSampleApproach introduces simple ways to create Publisher (Flux) that emits items every specified Interval of time.

I. Overview

1. Steps to do

Our goal is to create Flux for (data1, data2, data3, …) that emits items every specified Interval of time.

– First, we need to produce a Flux<Long> that is infinite and emits regular ticks from a clock -> use Flux.interval(Duration).
Now we have Flux<Long>: (i1, i2, i3, …)

– Then we have several ways to convert each Long item (i) to each data item (data) that we want to publish:
+ transform i to data -> use map(i -> data).
+ zip i with data together -> use zipWith(publisher,combinator) or zipWithIterable(collection). Then separate Zip(i,data) to get data.

2. Methods’ details
1. Flux#interval

This method creates a new Flux that emits incrementing Long starting with 0 every period.

static Flux interval(Duration period);
2. Flux.zipWithIterable

Pairwise combines as Tuple2 elements of the Flux and an Iterable sequence.

Flux> zipWithIterable(Iterable iterable)

To get the content of Tuple2 in Type-safe way:

Tuple2 tuple2 = ...; // tuple2(object1,object2)

T object1 = tuple2.getT1();
T2 object2 = tuple2.getT2();
3. Flux.zipWith

The operator will forward all combinations produced by the combinator from the most recent items emitted by source<T> (that invokes zipWith() method) and source2<T2> (that contains data item to be zipped with source item):

final Flux zipWith(Publisher source2, final BiFunction combinator);

II. Practice

0. Technology

– Java 8
– Maven 3.6.1
– Reactor Core 3.0.4, with the Aluminium release train.

1. Reactor installation in Maven

– First, import the BOM by adding the following to pom.xml:


	
		
			io.projectreactor
			reactor-bom
			Aluminium-SR1
			pom
			import
		
	

– Next, add dependency:


	
		io.projectreactor
		reactor-core
	

2. Code
2.1 Using map()
List data = new ArrayList(Arrays.asList("{A}", "{B}", "{C}"));
Flux intervalFlux1 = Flux
							.interval(Duration.ofMillis(500))
							.map(tick -> {
								if (tick < data.size())
									return "item " + tick + ": " + data.get(tick.intValue());
								return "Done (tick == data.size())";
							});

intervalFlux1.take(data.size() + 1).subscribe(System.out::println);
Thread.sleep(3000);

Result:

item 0: {A}
item 1: {B}
item 2: {C}
Done (tick == data.size())
2.2 Using zipWithIterable()
List data = new ArrayList(Arrays.asList("{A}", "{B}", "{C}"));
Flux intervalFlux2 = Flux
							.interval(Duration.ofMillis(500))
							.zipWithIterable(data)
							.map(source -> "item " + source.getT1() + ": " + source.getT2());

intervalFlux2.subscribe(System.out::println);
Thread.sleep(3000);

Result:

item 0: {A}
item 1: {B}
item 2: {C}
2.3 Using zipWith()
Flux flux = Flux.just("{A}", "{B}", "{C}");
Flux intervalFlux3 = Flux
							.interval(Duration.ofMillis(500))
							.zipWith(flux, (i, item) -> "item " + i + ": " + item);
		
intervalFlux3.subscribe(System.out::println);
Thread.sleep(3000);

Result:

item 0: {A}
item 1: {B}
item 2: {C}

III. Source Code

package com.javasampleapproach.reactor.publisherinterval;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import reactor.core.publisher.Flux;

public class MainApp {

	public static void main(String[] args) throws InterruptedException {

		System.out.println("=== from Collection using map() ===");
		List data = new ArrayList(Arrays.asList("{A}", "{B}", "{C}"));
		Flux intervalFlux1 = Flux
									.interval(Duration.ofMillis(500))
									.map(tick -> {
										if (tick < data.size())
											return "item " + tick + ": " + data.get(tick.intValue());
										return "Done (tick == data.size())";
									});
		
		intervalFlux1.take(data.size() + 1).subscribe(System.out::println);
		Thread.sleep(3000);
		
		System.out.println("=== from Collection using zipWithIterable() and map() ===");
		Flux intervalFlux2 = Flux
									.interval(Duration.ofMillis(500))
									.zipWithIterable(data)
									.map(source -> "item " + source.getT1() + ": " + source.getT2());

		intervalFlux2.subscribe(System.out::println);
		Thread.sleep(3000);
		
		System.out.println("=== from Flux using zipWith() ===");
		Flux flux = Flux.just("{A}", "{B}", "{C}");
		Flux intervalFlux3 = Flux
									.interval(Duration.ofMillis(500))
									.zipWith(flux, (i, item) -> "item " + i + ": " + item);
		
		intervalFlux3.subscribe(System.out::println);
		Thread.sleep(3000);

	}
}

Run and check Results:

=== from Collection using map() ===
item 0: {A}
item 1: {B}
item 2: {C}
Done (tick == data.size())
=== from Collection using zipWithIterable() and map() ===
item 0: {A}
item 1: {B}
item 2: {C}
=== from Flux using zipWith() ===
item 0: {A}
item 1: {B}
item 2: {C}

11 thoughts on “Reactor – How to Create Flux (Publisher) with Interval”

  1. Hi there would you mind letting me know which hosting company you’re utilizing?
    I’ve loaded your blog in 3 different internet browsers and
    I must say this blog loads a lot quicker then most. Can you recommend a good
    web hosting provider at a fair price? Thanks, I appreciate it!

  2. Very nice post. I just stumbled upon your blog and wished to say that I have really
    enjoyed browsing your blog posts. After all I will be subscribing to your feed and I hope you write again very soon!

  3. I blog quite often and I genuinely appreciate your content.
    Your article has really peaked my interest. I will bookmark your site
    and keep checking for new information about once a
    week. I subscribed to your RSS feed as well.

  4. Excellent pieces. Keep posting such kind of information on your page.

    Im really impressed by your site.
    Hey there, You’ve done an incredible job. I will certainly digg
    it and individually recommend to my friends. I am confident they’ll
    be benefited from this site.

  5. I am really enjoying the theme/design of your web site. Do you ever run into any browser compatibility issues? A couple of my blog readers have complained about my site not working correctly in Explorer but looks great in Safari. Do you have any ideas to help fix this problem?

  6. There are actually quite a lot of details like that to take into consideration. That is a great level to deliver up. I provide the thoughts above as common inspiration but clearly there are questions like the one you deliver up the place a very powerful thing will be working in sincere good faith. I don?t know if greatest practices have emerged around issues like that, but I’m certain that your job is clearly recognized as a good game. Each girls and boys feel the influence of only a moment抯 pleasure, for the remainder of their lives.

Leave a Reply

Your email address will not be published. Required fields are marked *