Understanding RxJS

What is RxJS?

RxJS is the JavaScript implementation of the Reactive Extensions API.

The reactive extensions API is meant to help you manage the flow of data into your application. check the Reactive Extensions API documentation at the link: http://reactivex.io/.

RxJS stands for:

  • Rx – Reactive Extensions API
  • JS – JavaScript

RxJS is written in typescript and there is an interface for every building block of RxJS. the building blocks of RxJS are:

  • Observable
  • Observer
  • Operator
  • Subject
  • Scheduler

There are different types of data that are all processed a little differently in JavaScript:

  • Callback functions
  • Promises
  • Event handlers
  • Loops
  • Variable assignment

RxJS provide a single API that facilitates and simplifies the processing of data from multiple sources. When this sources produce data overtime it’s helpful to think of them as streams of data gradually flowing into the app. The form that streams can take are:

  • Synchronous data
  • Asynchronous data
  • Application data from a server
  • User provided data
  • Client side event data
  • Function return values

Observer Pattern and RxJS

RxJS is based upon the observer pattern. It means that there is an object that is the subject which will produce values and notify other objects that are interested in receiving those values. the subject maintain a list of the objects that want to observe those new values. Those objects are the observables. the subject pushes values to the observers over time and each observer may than react to this values by executing some code specific to that observer:


observer pattern example
observer pattern example

In RxJS the observable object pushes values to a single observer and not to multiple observers.

We can register an observer to receive the values from the observable by calling a method on the observable called subscribe. The observer that should receive the values is passed as a parameter to the subscribe method. The observable sends values and a couple of other messages to the observer by calling methods on the observer object  itself (next, error, complete). We write the code for the next, error and complete methods:

observer pattern in rxjs example
observer pattern in rxjs example

Benefits of RxJS

Besides the fact that RxJS provide a single API that facilitates and simplifies the processing of data from multiple sources, RxJS provide batter solution for asynchronous API. instead of using callbacks, promises or async-await to produce a value over time the Observables can reproduce multiple values over time (until there is an error or complete) and the code will still be simple and readable just like promise.

callback:


router.get('/albums', (req, res) => {
  handleData('data.txt', (err, result) => {
    fs.writeFile('/data/albums.txt', result, (err, output) => {
      res.send(output);
    });
  });
});

promise:


let albumPromise = getAlbumeById(10);

albumPromise
  .then(album => console.log(book.title)
  .catch(err => console.log(err))
  .finally(() => console.log('Complete'));

async-await:


async function GetAlbumByID(id) {
  let album = await GetAlbumFromServer(id);
  console.log(book.title);
  return album;
}

observables:


let albumObservable = handleAlbums();

albumObservable.subscribe(
  (book => console.log(book.title)),
  (err => console.log(err)),
  (() => console.log('complete'))
);

Another big Benefit when using RxJS is one of its building blocks, operators. The operators are functions that manipulate the data produced by an observable and return a new observable.  Because of that we can use several operators to shape the data. 


import { Observable, from } from 'rxjs';
import { filter } from 'rxjs/operators';

let numbers$ = from([10, 20, 30, 40, 50]);

let observer = {
  next: value => console.log(value),
  error: err => console.log(`ERROR: ${err}`),
  complete: () => console.log('All done.')
};

numbers$.pipe(
  filter(num => num > 30);
)
.subscribe(observer);

// 40, 50

RxJS Building Blocks

Observables and observers


import { Observable, from } from 'rxjs';

let numbers$ = from([10, 20, 30, 40, 50]);

let observer = {
  next: value => console.log(value),
  error: err => console.log(`ERROR: ${err}`),
  complete: () => console.log('All done.')
};

numbers$.subscribe(observer);
// 10, 20, 30, 40, 50

Observables can produce values to single observer (called unicast).

We can create our own custom observables when we need precise control over how data is produced. To do that we will use a second object known as subscriber that happens to implement the same interface as the observable object:


import { Observable,} from 'rxjs';

let customObservable$ = Observable.create(subscriber => {

  If (value) {
    subscriber.next(value);
  }

  If (error) {
    subscriber.error(error);

  }

  If (complete) {
    subscriber.complete(complete);
  }

});

Operators

Manipulate the values produced by observable and return a new observable.


import { Observable, from } from 'rxjs';
import { filter, map } from 'rxjs/operators';

let numbers$ = from([10, 20, 30, 40, 50]);

let observer = {
  next: value => console.log(value),
  error: err => console.log(`ERROR: ${err}`),
  complete: () => console.log('All done.')
};

numbers$.pipe(
  filter(num => num > 30);
)
.subscribe(observer);

// 40, 50

Subjects

Similar to observables but have important additional features. 

Subjects can push data to multiple observers (called multicast).

Schedulers

Observables can be configured with schedulers to control the execution context of the observable:

  • Causes an observable to execute synchronously.
  • Causes an observable to execute asynchronously(using the js setInterval func)
  • Will execute the observable on the microtask queue