Introduction to RxJS – Extensions for JavaScript Reactive Streams

Reactive Extensions for JavaScript (RxJS) is a precise alternative for callback or Promise-based libraries. It treats any ubiquitous source of events in the exact same manner, whether it is reading a file, making an HTTP call, clicking a button, or moving the mouse. RxJS is built on top of the pillars of functional and reactive programming, as well as a few popular design patterns such as Observer and Iterator.

Everything – just like a Stream

To work with RxJS, we should think in terms of streams. The image below shows a simple stream (or pipeline) approach to handling data:

introduction-rxjs-pipeline-producer-consumer

Producer is data source that produces various forms of data to be consumed.
Pipeline is series of logic blocks that will be executed in order when data becomes available. The data (stream of asynchronous data) are filtered and processed in different ways so that they can be more easily consumed by Consumer.
Consumer subscribes to (or listen for) Producer’s events and will do something with (consume) received data.

Any data point that holds one or more values, from a single integer to bytes of data, can be applied to the concept of a stream. Streams originate from a Producer, where data flows through a pipeline, arriving at a Consumer.

For example, we have set of operations (filter, map) that occurs between the creation of the Producer of the stream (the array) and the Consumer (the function that logs to the console):

introduction-rxjs-pipeline-producer-consumer-sample

We can create streams from static data sources: numbers (or strings), sequences, or arrays. But the power of RxJS is that it can deal with dynamic data sources in exactly the same way.

Components of an Rx Stream

Producer

A stream must always have a Producer – source of data. It is the starting point for any logic performing in RxJS.

In practice, a Producer is created from something that generates events independently (a single value, an array, mouse clicks, a bytes stream from a file). In RxJS, we call it Observable (as it’s able to be observed).

Observable is responsible for something like pushing notifications, which means that it only emits events and doesn’t care about consuming them.

Consumer

We also need a Consumer to accept events from the Producer and process them in some specific ways. When Consumer begins listening (subscribing) to Producer for events, we now have a Stream. RxJS uses Consumer as an Observer.

With RxJS, streams travel only from the Producer to the Consumer, not the other way around. This means that streams always flow from an Observable to an Observer. In addition, both components are loosely coupled:

introduction-rxjs-observable-observer

Once the Observer begins receiving events from the Observable, what can we do with the data?
>> Within the Data Pipeline.

Data Pipeline

RxJS gives us ability to manipulate and edit data when it passes from Producer to Consumer by a list of methods (Observable operators). It means that we can adapt the output of the Producer to match the expectations of the Consumer.

This design principle is typically extremely hard to accomplish in large-scale JavaScript applications, but RxJS facilitates this model of design.

Time

Time is the implicit important factor behind all. There’s always an underlying concept of time when manipulating streams. We can build streams that run slower or faster depending on requirements.

introduction-rxjs-components

Data sources with Rx.Observable

Types of Data

Emitted data

Emitted data is created as a result of some sort of interaction with the system such as mouse click, key press or file read.

Some of these will have at most one event, we request data and then receive a response.
=> Promise can be a good solution.

Other are part of a continuous process.
=> We must treat them as event emitters that produce multiple discrete events at future times.

Static data

This is the data which exists in the system (in memory). For instance, an array or a string.

Interacting with static data is usually like iterating through it. We could think of associative arrays or maps as unordered static data.

Generated data

This data is created periodically or eventually such as a bell ringing every hour or generating Fibonacci sequence using ES6 generators.

This kind of sequence is infinite and too large to store in memory.
=> Each value should be generated dynamically to the client as needed. So we could place setTimeout() and setInterval() functions, or take(number) to limit the quantity.

Create Observable

Rx.Observable.create is an alias for the Observable constructor which takes subscribe function as argument. So we can create an Observable with a constructor using new:


// generate a string every second
const observable = new Rx.Observable(function subscribe(observer) {
  const id = setInterval(() => {
    observer.next('ozenero.com');
  }, 1000);
});

Or create() method:


// generate a string every second
const observable = Rx.Observable.create(function subscribe(observer) {
  const id = setInterval(() => {
    observer.next('ozenero.com');
  }, 1000);
});

Observable can be also created with of(), from(), interval() method…


const strEmitter = Rx.Observable.of('foo', 'bar');

const numSource = Rx.Observable.from([1, 2, 3, 4, 5]);

const btnClick = Rx.Observable.fromEvent(document.querySelector('button'), 'click');

const interval = Rx.Observable.interval(1000);

Consuming data with Observer

Data that’s emitted and processed through an Observable needs a destination which Observer object is.

Subscribe to Observable

Subscribing to an Observable is calling subscribe() function inside Observable‘s constructor:


const observable = Rx.Observable.create(
function subscribe(observer) {...}
);

observable.subscribe(x => console.log(x));

Each call to observable.subscribe() triggers its own independent setup, start an Observable execution and deliver values or events to an Observer of that execution.

Execute Observable

The code inside Observable.create(function subscribe(observer) {...}) represents an Observable execution.

The execution produces multiple values over time. There are three types of values:
– “Next” notification: actual data being delivered to an Observer such as a Number, a String, an Object…
– “Error” notification: a JavaScript error or exception.
– “Complete” notification: signal for completing a stream, now Observer can do something finally.

Nothing else can be delivered once an Error or Complete notification is delivered, and there can only be either one of them. And Observable is lazy computation because its execution only happens when an Observer subscribes.

So, what is an Observer?
Observer is just an object with three callbacks that each callback is correlated to each type of notification (next, error, complete) that an Observable may deliver.

introduction-rxjs-observable-observer-notifications

For example:


const observable = Rx.Observable.create((observer) => {
  observer.next(1);
  observer.next(2);
  observer.complete();
  observer.next(3); // can not deliver
});

const observer = {
  next: val => console.log(val),
  error: err => console.log(err),
  complete: () => console.log('done')
}

observable.subscribe(observer);
// result
1
2
"done"

If we don’t provide one of the callbacks, the execution will still happen normally, except some types of notifications will be ignored:


const observer = {
  next: val => console.log(val),
  error: err => console.log(err)
  // lack of 'complete'
}

When subscribing to an Observable, we don’t need to attach callbacks to an Observer object, then pass this object to observable.subscribe(). We can just provide them as arguments:


// don't need to create Observer: const observer = { ... }
// observable.subscribe(observer)

observable.subscribe(
  value => console.log(value),
  err => console.log(err),
  () => console.log('done')
);

// or
observable.subscribe(value => console.log(value));

Dispose Observable Execution with Subscription

Observable Execution may be infinite, so we need a way to cancel the execution to avoid wasting computation power or memory resources for unnecessary data – unsubscribe() method.

When observable.subscribe(observer) is called, the observer gets attached to Observable execution. This call also returns a Subscription object which represents the ongoing execution.
=> call subscription.unsubscribe() to cancel the execution.


const observable = Rx.Observable.create(function subscribe(observer) {
  
  let value = 0;
  // Keep track of the interval resource
  const intervalID = setInterval(() => {
    observer.next(value++);
  }, 1000);

  // dispose the interval resource
  return function unsubscribe() {
    clearInterval(intervalID);
    console.log('done')
  };
});

const subscription = observable.subscribe(x => console.log(x));

setTimeout(() => {
  subscription.unsubscribe();
}, 5000);

// result (run in about 5s)
0
1
2
3
4
"done"

We can add one Subscription into another. So calling unsubscribe() of parent Subscription may unsubscribe multiple children Subscriptions:


const foo = Rx.Observable.interval(1000);
const bar = Rx.Observable.interval(1000);

const parentSubscription = foo.subscribe(x => console.log('parent: ' + x));
const childSubscription = bar.subscribe(x => console.log('child: ' + x));

parentSubscription.add(childSubscription);

setTimeout(() => {
  // unsubscribe BOTH
  parentSubscription.unsubscribe();
}, 1500);

// result
"parent: 0"
"child: 0"

Observable Operators for Data Pipeline

Operators allows us to inject logic into an Observable’s pipeline. An operator is a pure and higher-order function. It never changes the existing Observable instance, but returns a new Observable that continues the chain.

introduction-rxjs-operations-concept

These Operators can be used to inspect, alter, create, or delay events after they leave the Data Source but before they reach the Consumer. In other words, anything in your business logic pipeline is handled by the combination of one or more Operators.

And RxJS Operators are also lazily evaluated.

For example, this Operator has Observable and Function as arguments:


function jsamap(observable, fn) {
  const output = Rx.Observable.create(observer => {
    observable.subscribe({
      next: x => observer.next(fn(x)),
      error: err => observer.error(err),
      complete: () => observer.complete()
    });
  });
  
  return output;
}

Using the Operator above, we can create new Observable:


const emitter = Rx.Observable.interval(500).take(5);

const newObservable = jsamap(emitter, val => val * 5);

const observer = {
  next: x => console.log('next: ' + x),
  error: err => console.log('error: ' + err),
  complete: () => console.log('done')
};

newObservable.subscribe(observer);

// result
"next: 0"
"next: 5"
"next: 10"
"next: 15"
"next: 20"
"done"

Instance Operators vs Static Operators

Instance Operator

Instance Operator is method on an Observable instance. For example, if the operator jsamap above is an official instance operator, it will be like this:


// use 'this' keyword  to infer what is the input Observable
Rx.Observable.prototype.jsamap = function jsamap(fn) {
  const observable = this;
  const output = Rx.Observable.create(observer => {
    observable.subscribe({
      next: x => observer.next(fn(x)),
      error: err => observer.error(err),
      complete: () => observer.complete()
    });
  });
  
  return output;
}

const emitter = Rx.Observable.interval(500).take(5);

const newObservable = emitter.jsamap(val => val * 5);

const observer = {
  next: x => console.log('next: ' + x),
  error: err => console.log('error: ' + err),
  complete: () => console.log('done')
};

newObservable.subscribe(observer);

Static Operator

Static Operator is method attached to the Observable class directly. We usually use it to create new Observable from scratch (non-Observable arguments).

Many instances of Static Operators are of, from, interval, fromPromise, empty, merge


const observable1 = Rx.Observable.interval(1000);
const observable2 = Rx.Observable.interval(2000);

const merged = Rx.Observable.merge(observable1, observable2);

Operator Categories

Creation Operators

We have known that create operator can be used to create Observable. There are other Creation Operators in common cases.

of: emits one or some specific values given as arguments, immediately one after the other, and then emits a complete notification:


const foo = Rx.Observable.of(100);
foo.subscribe(x => console.log(x));
// result
100

const bar = Rx.Observable.of(1,2,3);
bar.subscribe(x => console.log(x));
// result
1
2
3

Similar Operators:
range: emits a sequence of numbers within a specified range.
interval: emit sequential numbers every specified interval of time.
timer: is like interval, but we can specify when the emissions start.
throw: emits an error notification.
empty: emits a complete notification.
never: emits no items to the Observer.

fromEventPattern: converts any addHandler/removeHandler API to an Observable. The addHandler is called when the output Observable is subscribed, and removeHandler is called when the Subscription is unsubscribed.


function addClickHandler(handler) {
  document.addEventListener('click', handler);
}

function removeClickHandler(handler) {
  document.removeEventListener('click', handler);
}

var clicks = Rx.Observable.fromEventPattern(
  addClickHandler,
  removeClickHandler
);
clicks.subscribe(x => console.log(x));

fromPromise: returns an Observable that just emits the Promise‘s resolved value as a next, then completes.


function getInfo() {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve({
        site: 'ozenero.com',
        post: 'Reactive Programming with RxJS'
      });
    }, 200);
  });
}

const source = Rx.Observable.fromPromise(getInfo());
source.subscribe(val => console.log(val));

// result
[object Object] {
  post: "Reactive Programming with RxJS",
  site: "ozenero.com"
}

from: creates an Observable from an Array, an array-like object, a Promise, an Iterable object, or an Observable-like object.


// from Array:
const array = [1, 2, 3];
const source = Rx.Observable.from(array);
source.subscribe(x => console.log(x));
// result
5
10
15

// from Iterable:
function* generateDoubles(seed) {
  var i = seed;
  while (true) {
    yield i;
    i = 2 * i; // double it
  }
}

const iterator = generateDoubles(2);
const result = Rx.Observable.from(iterator).take(5);
result.subscribe(x => console.log(x));
// result
2
4
8
16
32

repeat: repeats the stream of items emitted by the source Observable at most count times.


const array = [1, 2, 3];
const source = Rx.Observable.from(array).repeat(2);

source.subscribe(x => console.log(x));

// result
1
2
3
1
2
3

Transformation Operators

Transformation Operators transform items that are emitted by an Observable into values as another Observable.

map: passes each source value through a transformation function to get corresponding output values.


const source = Rx.Observable.from([1,2,3]);
const example = source.map(val => val*10); // plus 10 to each value

example.subscribe(val => console.log(val));
// result
10
20
30

// from HTTP Response object
this.http.get(API_ENDPOINT) // return Observable
  .map(res => res.json());

scan: scan(accumulator: function, seed: any): Observable
It emits the current accumulation whenever the source emits a value.


const array = Rx.Observable.from([1,2,3]);
const source = array.scan((acc, current) => acc + current, 10);

source.subscribe(val => console.log(val));

//result
11 (= 10 + 1)
13 (= 2 + 11)
16 (= 3 + 13)

buffer: collects output values as an array, and emits that array only when another Observable emits.


// set a 2 seconds timer
const interval = Rx.Observable.interval(1000);
// click-action every 300ms
const click = Rx.Observable.interval(300);
// collect clicks every second
const buffer = click.buffer(interval);

buffer.subscribe(val => console.log(val));

// result
[0, 1, 2]
[3, 4, 5]
[6, 7, 8]
[9, 10, 11, 12]
[13, 14, 15]
...

bufferTime: it collects output values as an array similar to buffer, and emits the array periodically in specific time.

The example code for buffer above becomes:


// click-action every 300ms
const click = Rx.Observable.interval(300);
// collect clicks every second (1000ms)
const buffer = click.bufferTime(1000);

buffer.subscribe(val => console.log(val));

// result
[0, 1, 2]
[3, 4, 5]
[6, 7, 8]
[9, 10, 11, 12]
[13, 14, 15]
...

Filtering Operators

Filtering Operators provide techniques for picking values from an Observable and dealing with back-pressure (when fast data source doesn’t overwhelm the stream destination).

filter: filter(select: Function, thisArg: any): Observable
It only emits a value from the source if it passes condition in the select Function.


const source = Rx.Observable.from([1, 2, 3, 4, 5]);
const filter = source.filter(num => num >= 3);

filter.subscribe(val => console.log(val));

// result
3
4
5

take: emits a number of first values from the source, limited by given quantity, then completes.


const source = Rx.Observable.of(1,2,3,4,5);
const take = source.take(3);

take.subscribe(val => console.log(val));

// result
1
2
3

takeUntil: emits the values from the source Observable until a notifier Observable emits a value, then completes. It is helpful for us to auto-unsubscribe.


const source = Rx.Observable.interval(1000); // emit value every second
const timer = Rx.Observable.timer(3500); // after 3.5 seconds, emit value
const takeUntil = source.takeUntil(timer);

takeUntil.subscribe(val => console.log(val));

// result
0
1
2

takeWhile: takeWhile(predicate: function(value, index): boolean): Observable
It takes values from the source only while they pass the condition given (predicate returns true). If predicate returns false, it completes.


const source = Rx.Observable.interval(500);
const takeWhile = source.takeWhile(val => val < 4);

takeWhile.subscribe(val => console.log(val));

// result
0
1
2
3

skip: skips a number of first values from the source.


const source = Rx.Observable.interval(200); // emit 0,1,2,3,4,5,6,...
const skip = source.skip(3);

skip.subscribe(val => console.log(val));
// result
3
4
5
6
...

Similar to takeUntil, takeWhile, intead of take action, we have skip action on skipUntil, skipWhile.

first: first(predicate: function, select: function)
It emits only the first value or the first value that predicate returns true.


const source = Rx.Observable.from([2,4,6,8]);

const first = source.first();
first.subscribe(val => console.log(val));
// result
2

const firstCon = source.first(x => x % 3 === 0);
firstCon.subscribe(val => console.log(val));
// result
6

const firstConIdx = source.first(
  x => x % 3 === 0,
  (result, index) => `Found '${result}' at index: ${index}`
);
firstConIdx.subscribe(val => console.log(val));
// result
"Found '6' at index: 2"

debounceTime: emits a value from the source Observable only after a particular time span has passed without another source emission (discards emitted values that take less than the specified time between output).

For example:


const clicks = Rx.Observable.fromEvent(document, 'click');
const result = clicks.debounceTime(1000);
result.subscribe(x => console.log(x));

introduction-rxjs-operators-debounceTime

distinctUntilChanged: only emits when the current value is different than the last.


const source = Rx.Observable.from([1, 1, 2, 2, 2, 3, 4, 4, 5]);
const distinct = source.distinctUntilChanged();

distinct.subscribe(val => console.log(val));
// result
1
2
3
4
5

// objects
const persons = Rx.Observable.from([
  { age: 24, name: 'Jack'},
  { age: 25, name: 'Jason'},
  { age: 32, name: 'Jason'},
  { age: 27, name: 'Adam'}
]);

result = persons.distinctUntilChanged((p1, p2) => p1.name === p2.name);
result.subscribe(x => console.log(x));
//result
[object Object] {
  age: 24,
  name: "Jack"
}
[object Object] {
  age: 25,
  name: "Jason"
}
[object Object] {
  age: 27,
  name: "Adam"
}

// with debounceTime to detect click
const mousemove = Rx.Observable.fromEvent(document, 'click');

const sub = mousemove
  .debounceTime(300)
  .map(e => e.clientX)
  .distinctUntilChanged()
  .subscribe(x => console.log(x));

Combination Operators

Combination Operators combine data from multiple Observables.

merge: turns multiple Observables into a single Observable. We can use both Static Operator & Instance Operator:
Rx.Observable.merge(observables: ...ObservableInput, concurrent: number): Observable
merge(input: Observable, concurrent: number): Observable
With concurrent is the maximum number of input Observables being subscribed to concurrently


const s1 = Rx.Observable.interval(300).take(6)
  .map(x => `[1] ${x}`);

const s2 = Rx.Observable.interval(500).take(5)
  .map(x => `[2] ${x}`);

const s3 = Rx.Observable.interval(700).take(4)
  .map(x => `[3] ${x}`);

// [1] --0--1--2--3--4--5
// [2] ----0----1----2----3----4
// [2] ------0------1------2------3
const source = Rx.Observable.merge(s1,s2,s3);
// or use instance operator with the same result
// const source = s1.merge(s2, s3);

source.subscribe(x => console.log(x));
// result
"[1] 0"
"[2] 0"
"[1] 1"
"[3] 0"
"[1] 2"
"[2] 1"
"[1] 3"
"[3] 1"
"[1] 4"
"[2] 2"
"[1] 5"
"[2] 3"
"[3] 2"
"[2] 4"
"[3] 3"

// only 2 streams run concurrently
const source = Rx.Observable.merge(s1,s2,s3,2);
source.subscribe(x => console.log(x));
// result
"[1] 0"
"[2] 0"
"[1] 1"
"[1] 2"
"[2] 1"
"[1] 3"
"[2] 2"
"[1] 4"
"[1] 5"
"[2] 3"
"[2] 4"
"[3] 0"
"[3] 1"
"[3] 2"
"[3] 3"

concat: sequentially emitting Observables’ values, one Observable after the other.
It is equivalent to merge(...observables, 1).


const s1 = Rx.Observable.interval(300).take(6)
  .map(x => `[1] ${x}`);

const s2 = Rx.Observable.interval(500).take(5)
  .map(x => `[2] ${x}`);

const s3 = Rx.Observable.interval(700).take(4)
  .map(x => `[3] ${x}`);

// [1] --0--1--2--3--4--5
// [2] ----0----1----2----3----4
// [2] ------0------1------2------3
const source = Rx.Observable.concat(s1,s2,s3); // ~merge(s1,s2,s3,1);
source.subscribe(x => console.log(x));

// result
"[1] 0"
"[1] 1"
"[1] 2"
"[1] 3"
"[1] 4"
"[1] 5"
"[2] 0"
"[2] 1"
"[2] 2"
"[2] 3"
"[2] 4"
"[3] 0"
"[3] 1"
"[3] 2"
"[3] 3"

startWith: emits given value first, then emit values by the source Observable.


const source = Rx.Observable.of(1, 2, 3);
const stream =  source.startWith(0);

stream.subscribe(val => console.log(val));
// result
0
1
2
3

combineLatest: combineLatest(observables: ...Observable, project: function): Observable
It emits the latest value from all Observables whenever any input Observable emits a value. Notice that the first emission only occurs when all Observables have emitted.


const s1 = Rx.Observable.interval(300).take(6)
  .map(x => `[1] ${x}`);

const s2 = Rx.Observable.interval(500).take(5)
  .map(x => `[2] ${x}`);

const s3 = Rx.Observable.interval(700).take(4)
  .map(x => `[3] ${x}`);

// [1] --0--1--2--3--4--5
// [2] ----0----1----2----3----4
// [2] ------0------1------2------3
const source = Rx.Observable.combineLatest(s1,s2,s3);
source.subscribe(x => console.log(x));

// result
["[1] 1", "[2] 0", "[3] 0"]
["[1] 2", "[2] 0", "[3] 0"]
["[1] 2", "[2] 1", "[3] 0"]
["[1] 3", "[2] 1", "[3] 0"]
["[1] 3", "[2] 1", "[3] 1"]
["[1] 3", "[2] 2", "[3] 1"]
["[1] 4", "[2] 2", "[3] 1"]
["[1] 5", "[2] 2", "[3] 1"]
["[1] 5", "[2] 3", "[3] 1"]
["[1] 5", "[2] 3", "[3] 2"]
["[1] 5", "[2] 4", "[3] 2"]
["[1] 5", "[2] 4", "[3] 3"]

We can use project function to transform the combined latest values into a new value:


// [1] --0--1--2--3--4--5
// [2] ----0----1----2----3----4
// [2] ------0------1------2------3
const source = Rx.Observable.combineLatest(
  s1,s2,s3,
  (a, b, c) => ('s' + a + ' --- ' + 's' + b + ' --- ' + 's' + c)
);
source.subscribe(x => console.log(x));

// result
"s[1] 1 --- s[2] 0 --- s[3] 0"
"s[1] 2 --- s[2] 0 --- s[3] 0"
"s[1] 2 --- s[2] 1 --- s[3] 0"
"s[1] 3 --- s[2] 1 --- s[3] 0"
"s[1] 3 --- s[2] 1 --- s[3] 1"
"s[1] 3 --- s[2] 2 --- s[3] 1"
"s[1] 4 --- s[2] 2 --- s[3] 1"
"s[1] 5 --- s[2] 2 --- s[3] 1"
"s[1] 5 --- s[2] 3 --- s[3] 1"
"s[1] 5 --- s[2] 3 --- s[3] 2"
"s[1] 5 --- s[2] 4 --- s[3] 2"
"s[1] 5 --- s[2] 4 --- s[3] 3"

withLatestFrom: source.withLatestFrom(other: Observable, project: Function): Observable
It emits values which are calculated from the latest values of each, but only when the source emits.


const s1 = Rx.Observable.interval(500).take(5)
  .map(x => `[1] ${x}`);

const s2 = Rx.Observable.interval(300).take(4)
  .map(x => `[2] ${x}`);

// [1] ----0----1----2----3----4
// [2] --0--1--2--3
const source = s1.withLatestFrom(s2);
source.subscribe(x => console.log(x));

//result
["[1] 0", "[2] 0"]
["[1] 1", "[2] 2"]
["[1] 2", "[2] 3"]
["[1] 3", "[2] 3"]
["[1] 4", "[2] 3"]

forkJoin: forkJoin(...observables, selector : function): Observable
It emits the last emitted value from each Observables when all of them complete.
Notice that if any of the inner Observables get an error, we will lose the value of any Observable that would or have already completed if we do not catch the error correctly.


const s1 = Rx.Observable.interval(200).take(10)
  .map(x => `[1] ${x}`);

const s2 = Rx.Observable.interval(500).take(5)
  .map(x => `[2] ${x}`);

const source = Rx.Observable.forkJoin(
  s1, s2,
  (a, b) => (a + ' --- ' + b)
);

source.subscribe(x => console.log(x));

// result 
"[1] 9 --- [2] 4"

zip: emit values (with the same index) as an array after all Observables emit.
If the latest parameter is a function, this function is used to compute the created value from the input values (instead emit an array of values).
The process will continue until at least one inner Observable completes.


const s1 = Rx.Observable.interval(200).take(10)
  .map(x => `[1] ${x}`);

const s2 = Rx.Observable.interval(500).take(3)
  .map(x => `[2] ${x}`);

// [1] -0-1-2-3-4-5-6-7-8-9
// [2] ----0----1----2
const source = Rx.Observable.zip(
  s1, s2,
  (a, b) => (a + ' --- ' + b)
);

source.subscribe(
  x => console.log(x),
  null,
  () => console.log('complete')
);

// result
"[1] 0 --- [2] 0"
"[1] 1 --- [2] 1"
"[1] 2 --- [2] 2"
"complete"

Error Handling Operators

The Operators provide ways to handle errors and retry logic.

catch: catchs Error by returning a new Observable.


const source = Rx.Observable.from([1,2,3,4])
  .map(val => {
    if (val < 4) return val;
    throw 'This is an Exception!';
  })

const result = source.catch(
  err => Rx.Observable.of(
  `err message: ${err}`,
  'Just release a new Source!')
);

result.subscribe(val => console.log(val));

// result
1
2
3
"err message: This is an Exception!"
"Just release a new Source!"

retry: restarts Observable a specific number of times if an error occurs. If not indicate the number, it restarts forever.


const source = Rx.Observable.from([1,2,3,4])
  .map(val => {
    if (val < 4) return val;
    throw 'Exception!';
  })

const result = source.retry(2);

result.subscribe(
  val => console.log(val),
  err => console.log(`${err} - Retried 2 times then quit!`)
);
// result
1
2
3
1
2
3
1
2
3
"Exception! - Retried 2 times then quit!"

retryWhen: retryWhen(notifier: function(errors: Observable): Observable): Observable
It takes an errors Observable with error as its only parameter.
– If the source Observable calls error -> emit the Throwable that caused the error to the Observable returned from notifier.
– If that Observable calls complete or error -> call complete or error on the child subscription.
– Otherwise -> resubscribe to the source Observable.


const source = Rx.Observable.interval(50)
  .map(val => {
    if (val < 4) return val;
    throw 'FOUR!';
  })

const result = source.retryWhen(errors => {
  errors.subscribe(sourceError => console.log(`catch ${sourceError}, please wait 2s...`));
  return errors.delay(2000);
});

result.subscribe(val => console.log(val));

// result
0
1
2
3
"catch FOUR!, please wait 2s..."
0
1
2
3
"catch FOUR!, please wait 2s..."
0
1
2
3
"catch FOUR!, please wait 2s..."
...

The code above will retry forever if the error keep happening. To limit the amount of times, we can use scan operator to keep track of how many retries have been made, then throw the error if it exceeds the number.


const source = Rx.Observable.interval(50)
    .map(val => {
    if (val < 4) return val;
    throw 'FOUR!';
  });

const result = source.retryWhen(errors => errors.delay(1000).scan((count, sourceError) => {
  console.log(`catch ${sourceError}`);
  console.log(`error count: ${count}`);
  
  if (count > 2) throw sourceError;
  return count + 1;
}, 1));

result.subscribe(
  val => console.log(val),
  err => console.error(`get an error: ${err}`),
  () => console.log('complete!')
);

// result
0
1
2
3
"catch FOUR!"
"error count: 1"
0
1
2
3
"catch FOUR!"
"error count: 2"
0
1
2
3
"catch FOUR!"
"error count: 3"
"get an error: FOUR!"

Multicasting Operators

By default, RxJS Observables are cold (or unicast). Multicasting Operators can make an observable hot (or multicast), allowing side-effects to be share among multiple Subscribers.

share: returns a new Observable that multicasts (shares) the original Observable among multiple Subscribers.
– As long as there is at least one Subscriber this Observable will be subscribed and emitting data.
– When all Subscribers have unsubscribed it will unsubscribe from the source Observable.


const source = Rx.Observable.interval(200).take(2);

const sub1 = source.subscribe(val => console.log(`[sub1] ${val}`));
const sub2 = source.subscribe(val => console.log(`[sub2] ${val}`));

setTimeout(() => {
  const sharedExample = source.share();
  const sub3 = sharedExample.subscribe(val => console.log(`[sub3] ${val}`));
  const sub4 = sharedExample.subscribe(val => console.log(`[sub4] ${val}`));
}, 3000);

// result
"[sub1] 0"
"[sub2] 0"
"[sub1] 1"
"[sub2] 1"
"[sub3] 0"
"[sub4] 0"
"[sub3] 1"
"[sub4] 1"

publish: returns a ConnectableObservable that can begin emitting items to Observers that have subscribed to it when connect method is called.


const source = Rx.Observable.of(1, 2, 3);
const example = source.publish();

example.subscribe(val => console.log(`Subscriber One: ${val}`));
example.subscribe(val => console.log(`Subscriber Two: ${val}`));

console.log('wait 3s for connecting to Source...');
setTimeout(() => {
  example.connect();
}, 3000);

// result
"wait 3s for connecting to Source..."
"Subscriber One: 1"
"Subscriber Two: 1"
"Subscriber One: 2"
"Subscriber Two: 2"
"Subscriber One: 3"
"Subscriber Two: 3"

Utility Operators

These operators provide helpful utilities in your observable toolkit.

do: do(nextOrObserver: Observer|function, error: function, complete: function): Observable
It returns an Observable that is identical to the source, we can use to perform actions or side-effects such as logging.


const source = Rx.Observable.of(1, 2, 3);

const result = source
  .do((val) => {
    console.log(`Log: ${val}`);
    return val;
  })
  .map(
    val => val*10
  );

result.subscribe(val => console.log(val));

// result
"Log: 1"
10
"Log: 2"
20
"Log: 3"
30

delay: shifts time for emitting each item by some specified amount of milliseconds.


const source = Rx.Observable.of(1, 2, 3);
const result = source.delay(1000);
result.subscribe(x => console.log(x));

There are other useful Operators such as delayWhen, timeout, toArray, toPromise that we can find at ReactiveX.io

Other Operators

– Conditional and Boolean Operators:

  • defaultIfEmpty
  • every
  • find
  • findIndex
  • isEmpty

– Mathematical and Aggregate Operators:

  • count
  • max
  • min
  • reduce

For more details, please visit: ReactiveX.io

4 thoughts on “Introduction to RxJS – Extensions for JavaScript Reactive Streams”

  1. Does your website have a contact page? I’m having a tough
    time locating it but, I’d like to shoot you an email. I’ve got some creative ideas for your blog you might be
    interested in hearing. Either way, great website and I look forward to seeing it develop over time.

  2. I have learn several excellent stuff here. Certainly worth bookmarking for revisiting. I surprise how so much attempt you place to make this kind of magnificent informative website.

Leave a Reply

Your email address will not be published. Required fields are marked *