In this tutorial, JavaSampleApproach introduces ways to handle Errors: catch Exception and fall back, recover from errors, deal with backpressure errors…
I. Ways to handle Errors
0. Declare & Initialize Flux
Fluxflux = Flux // . error(new IllegalArgumentException()); .range(1, 4) .map(item -> { if (item <= 3) return "item: " + item; else { System.out.println(">> Exception occurs on map()"); throw new RuntimeException(); } });
1. Catching Exception
1.1 Then do something
flux.doOnError(e -> System.out.println("doOnError: " + e)) .subscribe(System.out::println); /** item: 1 item: 2 item: 3 >> Exception occurs on map() doOnError: java.lang.RuntimeException */
1.2 Then fall back to a default value
flux.onErrorReturn("onErrorReturn: Value!") .subscribe(System.out::println); /** item: 1 item: 2 item: 3 >> Exception occurs on map() onErrorReturn: Value! */
1.3 Then re-throw new Exception
flux.mapError(e -> new CustomException("mapError")) .subscribe(System.out::println); /** item: 1 item: 2 item: 3 >> Exception occurs on map() mapError: throw CustomException! Exception in thread "main" reactor.core.Exceptions$ErrorCallbackNotImplemented: com.javasampleapproach.reactor.handleerror.CustomException Caused by: com.javasampleapproach.reactor.handleerror.CustomException at com.javasampleapproach.reactor.handleerror.MainApp.lambda$6(MainApp.java:115) at reactor.core.publisher.Flux.lambda$mapError$23(Flux.java:4189) ... */ public class CustomException extends RuntimeException { private static final long serialVersionUID = -5970845585469454688L; public CustomException(String type) { System.out.println(type + ": throw CustomException!"); } }
2. Recover from Errors
2.1 By falling back to another Flux
flux.onErrorResumeWith(e -> { System.out.println("-> inside onErrorResumeWith()"); return Flux.just(1,2) .map(item -> {return "-> new Flux item: " + item;}); }) .subscribe(System.out::println); /** item: 1 item: 2 item: 3 >> Exception occurs on map() -> inside onErrorResumeWith() -> new Flux item: 1 -> new Flux item: 2 */
2.2 By retrying
flux.retry(1) .doOnError(System.out::println) .subscribe(System.out::println); /** item: 1 item: 2 item: 3 >> Exception occurs on map() item: 1 item: 2 item: 3 >> Exception occurs on map() java.lang.RuntimeException */
With Predicate
:
flux.retry(1, e -> { boolean shouldRetry = RANDOM.nextBoolean(); System.out.println("shouldRetry? -> " + shouldRetry); return shouldRetry; }) .doOnError(System.out::println) .subscribe(System.out::println); /** item: 1 item: 2 item: 3 >> Exception occurs on map() shouldRetry? -> true item: 1 item: 2 item: 3 >> Exception occurs on map() java.lang.RuntimeException */
3. Deal with Backpressure Errors
3.1 By throwing a special Exception
When Receiver is overrun by more signals than expected:
// Publisher emits 3-4 items, but Subscriber requests only 1 flux.onBackpressureError() .doOnError(System.out::println) .subscribe(new BaseSubscriber() { @Override protected void hookOnSubscribe(Subscription subscription) { System.out.println("Subscriber > request only 1 item..."); request(1); } @Override protected void hookOnNext(String value) { System.out.println("Subscriber > process... [" + value + "]"); } }); /** Subscriber > request only 1 item... Subscriber > process... [item: 1] reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...) */
3.2 By dropping excess values
// Publisher emits 3 items, but Subscriber requests only 1 flux.onBackpressureDrop(item -> System.out.println("Drop: [" + item+ "]")) .doOnError(System.out::println) .subscribe(new BaseSubscriber() { @Override protected void hookOnSubscribe(Subscription subscription) { System.out.println("Subscriber > request only 1 item..."); request(1); } @Override protected void hookOnNext(String value) { System.out.println("Subscriber > process... [" + value + "]"); } }); /** Subscriber > request only 1 item... Subscriber > process... [item: 1] Drop: [item: 2] Drop: [item: 3] >> Exception occurs on map() java.lang.RuntimeException */
3.2 By buffering excess values
BaseSubscribersubscriber = new BaseSubscriber () { @Override protected void hookOnSubscribe(Subscription subscription) { System.out.println("Subscriber > request only 1 item..."); request(1); } @Override protected void hookOnNext(String value) { System.out.println("Subscriber > process... [" + value + "]"); } }; // Publisher emits 3 items, but Subscriber requests only 1 [item 1] // Buffer 2 items [item 2, item 3] flux.onBackpressureBuffer(2, item -> System.out.println("Buffer: [" + item+ "]")) .doOnError(System.out::println) .subscribe(subscriber); System.out.println("Subscriber > request more items:"); subscriber.request(1); // receive one more item (from Buffer) /** Subscriber > request only 1 item... Subscriber > process... [item: 1] >> Exception occurs on map() Subscriber > request more items: Subscriber > process... [item: 2] */
II. Source Code
1. Technology
– Java 8
– Maven 3.6.1
– Reactor Core 3.0.4, with the Aluminium release train.
2. Reactor installation in Maven
– First, import the BOM by adding the following to pom.xml:
io.projectreactor reactor-bom Aluminium-SR1 pom import
– Next, add dependency:
io.projectreactor reactor-core
3. Code
package com.javasampleapproach.reactor.handleerror; import java.util.Random; import org.reactivestreams.Subscription; import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Flux; public class MainApp { public static void main(String[] args) { final Random RANDOM = new Random(); Fluxflux = Flux // . error(new IllegalArgumentException()); .range(1, 4) .map(item -> { if (item <= 3) return "item: " + item; else { System.out.println(">> Exception occurs on map()"); throw new RuntimeException(); } }); System.out.println("=== do when error ==="); flux.doOnError(e -> System.out.println("doOnError: " + e)) .subscribe(System.out::println); System.out.println("=== fall back to a default value ==="); flux.onErrorReturn("onErrorReturn: Value!") .subscribe(System.out::println); System.out.println("=== fall back to another Flux ==="); flux.onErrorResumeWith(e -> { System.out.println("-> inside onErrorResumeWith()"); return Flux.just(1,2) .map(item -> {return "-> new Flux item: " + item;}); }) .subscribe(System.out::println); System.out.println("=== retry ==="); flux.retry(1) .doOnError(System.out::println) .subscribe(System.out::println); System.out.println("=== retry with Predicate ==="); flux.retry(1, e -> { boolean shouldRetry = RANDOM.nextBoolean(); System.out.println("shouldRetry? -> " + shouldRetry); return shouldRetry; }) .doOnError(System.out::println) .subscribe(System.out::println); System.out.println("=== deal with backpressure Error ==="); flux.onBackpressureError() .doOnError(System.out::println) .subscribe(new BaseSubscriber () { @Override protected void hookOnSubscribe(Subscription subscription) { System.out.println("Subscriber > request only 1 item..."); request(1); } @Override protected void hookOnNext(String value) { System.out.println("Subscriber > process... [" + value + "]"); } }); System.out.println("=== dropping excess values ==="); flux.onBackpressureDrop(item -> System.out.println("Drop: [" + item+ "]")) .doOnError(System.out::println) .subscribe(new BaseSubscriber () { @Override protected void hookOnSubscribe(Subscription subscription) { System.out.println("Subscriber > request only 1 item..."); request(1); } @Override protected void hookOnNext(String value) { System.out.println("Subscriber > process... [" + value + "]"); } }); System.out.println("=== buffer excess values ==="); BaseSubscriber subscriber = new BaseSubscriber () { @Override protected void hookOnSubscribe(Subscription subscription) { System.out.println("Subscriber > request only 1 item..."); request(1); } @Override protected void hookOnNext(String value) { System.out.println("Subscriber > process... [" + value + "]"); } }; flux.onBackpressureBuffer(2, item -> System.out.println("Buffer: [" + item+ "]")) .doOnError(System.out::println) .subscribe(subscriber); System.out.println("Subscriber > request more items:"); subscriber.request(1); System.out.println("=== catch and rethrow ==="); flux.mapError(e -> new CustomException("mapError")) .subscribe(System.out::println); } }
4. Results
=== do when error === item: 1 item: 2 item: 3 >> Exception occurs on map() doOnError: java.lang.RuntimeException === fall back to a default value === item: 1 item: 2 item: 3 >> Exception occurs on map() onErrorReturn: Value! === fall back to another Flux === item: 1 item: 2 item: 3 >> Exception occurs on map() -> inside onErrorResumeWith() -> new Flux item: 1 -> new Flux item: 2 === retry === item: 1 item: 2 item: 3 >> Exception occurs on map() item: 1 item: 2 item: 3 >> Exception occurs on map() java.lang.RuntimeException === retry with Predicate === item: 1 item: 2 item: 3 >> Exception occurs on map() shouldRetry? -> true item: 1 item: 2 item: 3 >> Exception occurs on map() java.lang.RuntimeException === deal with backpressure Error === Subscriber > request only 1 item... Subscriber > process... [item: 1] reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...) === dropping excess values === Subscriber > request only 1 item... Subscriber > process... [item: 1] Drop: [item: 2] Drop: [item: 3] >> Exception occurs on map() java.lang.RuntimeException === buffer excess values === Subscriber > request only 1 item... Subscriber > process... [item: 1] >> Exception occurs on map() Subscriber > request more items: Subscriber > process... [item: 2] === catch and rethrow === item: 1 item: 2 item: 3 >> Exception occurs on map() mapError: throw CustomException! Exception in thread "main" reactor.core.Exceptions$ErrorCallbackNotImplemented: com.javasampleapproach.reactor.handleerror.CustomException Caused by: com.javasampleapproach.reactor.handleerror.CustomException at com.javasampleapproach.reactor.handleerror.MainApp.lambda$6(MainApp.java:115) at reactor.core.publisher.Flux.lambda$mapError$23(Flux.java:4189) at reactor.core.publisher.FluxResume$ResumeSubscriber.onError(FluxResume.java:88) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:132) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115) at reactor.core.publisher.FluxRange$RangeSubscription.fastPath(FluxRange.java:119) at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:97) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:172) at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1476) at reactor.core.publisher.FluxResume$ResumeSubscriber.onSubscribe(FluxResume.java:68) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:94) at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68) at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:67) at reactor.core.publisher.FluxResume.subscribe(FluxResume.java:47) at reactor.core.publisher.Flux.subscribe(Flux.java:5780) at reactor.core.publisher.Flux.subscribe(Flux.java:5747) at reactor.core.publisher.Flux.subscribe(Flux.java:5679) at com.javasampleapproach.reactor.handleerror.MainApp.main(MainApp.java:116)