RxJS: Building Blocks

RxJS is written in typescript and there is an interface for every building block of RxJS. the building blocks of RxJS are:

  • Observable
  • Observer
  • Operator
  • Subject
  • Scheduler

Side Note: I recommend you to read my previous articles: RxJS: What and Why.

Observer/Subscriber

The observer is an object that observes and responds to notifications that are specified as methods:

  • next – handle the next emitted item.
  • error – handle the error condition.
  • complete – final processing or cleanup.

Observers are a collection of callbacks that know how to listen to values delivered by the observable. The observable emits notifications with values and the observer responds to these notifications.

In RxJS, an observer is also defined as an interface with next, error, and complete methods. So any class created as an observer will implement these 3 methods.


const observer = {
  next: message => console.log('message emmited ${message}),
  error: err => console.log('Error occurred ${err}'),
  complete: () => console.log('no more messages')
}

One of the classes that implement the observer interface is a subscriber. While observer is the class we use to observe emitted values, inside RxJS each observer is converted to a subscriber.

A subscriber is an observer with additional features to unsubscribe from an observable. 

Observable

A collection of events or values emitted over time such as user actions (key press, selection…), application events (route, forms…) HTTP responses, web sockets, and more.

Observables can emit any data such as:

  • Primitives
  • Events
  • Objects
  • Array
  • HTTP response

Observables can be:

  • Synchronous – meaning an item emitted immediately and in sequence like from an array.
  • Asynchronous – meaning an item emitted over time like from an event.

Just like with arrays, they are collections of elements that we can map, filter, and concat the same can be done with observable items. and because these items can arrive over time, we can also perform a delay, timeout, and other time-based operations.

Let’s see how we create an observable with a constructor:


// observer
const observer = {
  next: message => console.log('message emmited ${message}),
  error: err => console.log('Error occurred ${err}'),
  complete: () => console.log('no more messages')
}

// observable
const messages$ = new Observable(messageSubscriber => {
  messageSubscriber.next('message 1');
  messageSubscriber.next('message 2');
  messageSubscriber.complete();
});

// This is not the way we usually write code such as this, there are better ways to create an observables and we will see them later. 

In the constructor, we provide a function that’s called when the observable is subscribed. This function is given the subscriber and within this function, we can use that subscriber to call next to emit value, and so on.

After creating the observer and observable, all that is left is to subscribe. We need to call subscribe on the observable to start receiving notifications. pass in the observer so the observable knows where to send the notifications to. We must subscribe to an observable to start receiving emissions, otherwise, we receive no emissions and have nothing to observe.

Subscribing

With RxJs, we call the subscribe method on the observable to receive notifications and pass them to the observer so the observable knows where to send the notification to. if we don’t subscribe to the observable, we won’t receive anything and we won’t have anything to observe.

when we subscribe to an observable, what will happen next depends on how the observable is set up:

  • Some observables can start emiting right away even if there are no subscribers. it’s called a hot observable because it’s emitting values even if there are no subscriptions yet.
  • Some observables are not emitting until there is a subscriber. for example, emitting the items from an array and the HTTP get. it’s called cold observable because it does not emit until there is a subscriber.

let’s see it in code:


// observer
const observer = {
  next: message => console.log('message emmited ${message}),
  error: err => console.log('Error occurred ${err}'),
  complete: () => console.log('no more messages')
}

// observable
const messages$ = new Observable(messageSubscriber => {
  messageSubscriber.next('message 1');
  messageSubscriber.next('message 2');
  messageSubscriber.complete();
});

// subscribing...
const messageSubscription = messages$.subscribe(observer);


As part of the subscription process, we tell the observable how we will observe it. the subscribe method returns a subscription which represents the execution of the observable

Unsubscribing

With RxJs, to stop receiving notifications we need to call unsubscribe on the subscription returned from the subscribe method. Every time we subscribe, we should unsubscribe too. It will prevent memory leaks in our app.

The ways to stop receiving notifications are:

  • Call complete() of the subscriber – automatically cancel all subscriptions.
  • Use an operator or creation function that automatically completes after emitting all of the items (completion cancels all subscriptions).
  • Throw an error – uncaught error executes the error method and cancels the subscription. after an unhandled error occurs, the observable won’t emit any more items.
  • Call unsubscribe() on the subscription (not calling the complete method)

let’s see it in code:


// observer
const observer = {
  next: message => console.log('message emmited ${message}),
  error: err => console.log('Error occurred ${err}'),
  complete: () => console.log('no more messages')
}

// observable
const messages$ = new Observable(messageSubscriber => {
  messageSubscriber.next('message 1');
  messageSubscriber.next('message 2');
  messageSubscriber.complete();
});

// subscribing...
const messageSubscription = messages$.subscribe(observer);

// unsubscribing
messageSubscription .unsubscribe();


Creation Functions

Usually, we won’t create observables with the observable constructor, we will use observables that angular creates for us or we can create them with the built-in creation functions.


// observable with observer shortcut (using a constructor)
const messages$ = new Observable(messageSubscriber => {
  messageSubscriber.next('message 1');
  messageSubscriber.next('message 2');
  messageSubscriber.complete();
});

// using a built in function called 'of' - creates an observable from one or more values
const messages$ = of('message 1', 'message 2');

// using a built in function called 'from' - 
// creates an observable from an array or other data structure
// including promis and obsrvable like
const messages$ = from(['message 1', 'message 2']);


creates an observable from a promise or anything array or observable-like.

The difference between ‘of’ and ‘from’ is that if we pass an array to ‘of’ it will emit one time the whole array, but if we pass it to ‘from’ it will emit each value. after finishing, the observables ofcorse will unsubscribe.

Other worth mentioning creation functions are ‘fromEvent’ and ”.

The creation function ‘fromEvent’ will create an observable from any DOM event such as ‘click’. Let’s see it in code:


@ViewChild('elm') elm: ElementRef;

ngAfterViewInit() {
  const elmStream = fromEvent(this.elm.nativeElement, 'click').subscribe(console.log);
}

The creation function ‘interval’, creates an observable that emits a sequential number at a defined interval. It’s often used to generate a synchronized set of numbers for a test or simple code


const numbers = interval(1000).subscribe(console.log);

Operator

With observables, we pipe the emitted items through a sequence of operators to transform, filter, or process them, and since observables can arrive over time, we can also perform a delay, timeout, and other time-based operations.

RxJs Operator is a function that transforms and manipulates items that are emitted from an observable. we can apply the operators in sequence using the observable’s pipe() method.

let’s see an example in code:


of(1,2,3).pipe(
  map(item => item * 10),
  tap(item => console.log(item)),
  take(3)
).subscribe(item => console.log(item))

result => 10, 20, 30

When we subscribe, the source observable starts emitting items, and each item pipes through a series of operators in sequence.

each of the operators subscribes to the input observable and creates and returns an output observable.

There are a lot of operators in RxJS, you can check out the list in the URL: https://rxjs.dev.. in the menu of the website there is a selection for ‘operator decision tree’ which can help you choose the needed operator through a series of questions.

Subject

The subject is a special type of observable that implements the observer interface. it has a ‘next’ method to emit items into the observable, an error method to emit error notifications, and a complete method. the subject is an observable and an observer.

As I mentioned before, the observable is read-only, we need to subscribe to it to react to its notifications and it can’t emit anything into it. the observable object does not have an emit method. only the creator of the observable can emit items into it.

for example, when we use the ‘of’ function, the function creates the observable and emits the values into it. in angular framework, when we do an HTTP request, angular creates the observable and emits the returned response into that observable. and when we create the observable with the constructor, we pass it a function with next, error, and complete, when we subscribe, this function is called and given a subscriber. which can call ‘next’, ‘error’, and ‘complete’.

So to emit values to an action like ‘click’ we need an observable with extra features, an observable that is an observer too, just like the subject.

Let’s see how to use a subject:


actionSubject = new Subject();

//because the subject implements the observer interface, we can use next() to emit items.
this.actionSubject.next(magic');

// because its an observable, any code that want to recieve notifications, can call its subscribe(). (we can use pipe() too)
this.actionSubject.subscribe(
  item => console.log(item);
);

observable is unicast while the subject is multicast.

unicast – emits to a single subscriber. means that when there are multiple subscribers, each subscriber gets a new observable.

multicast – emits to multiple subscribers. the subscribers share the same emissions, which means that we can share the actions in the app on multiple subscribers.

Let’s understand the concept by example:


numbers = new Subject();

numbers.subscribe(
  num=> console.log('sub1: ', num);
);

numbers.next(1);

numbers.subscribe(
  num=> console.log('sub2: ', num);
);

numbers.next(2);
numbers.next(3);

numbers.complete();

/* 
result:
sub1: 1
sub1: 2
sub2: 2
sub1: 3
sub2: 3
*/

Notice that the second subscription does not get the first emittion since the subscription happened after the first emittion happened.

Notice that the second subscription does not get the first emittion since the subscription happened after the first emittion happened.

We should use subject to: share notifications or state between multiple layers of the application. for example, we have a loading event, and multiple components in the application need to be aware so the subject will be loading and subscribers will the components,

BehaviorSubject

The BehaviorSubject is a special type of subject that :

  • Buffers its last emitted value
  • Emits that value to any late subscribers
  • Requires a default value
  • Emits the default value if it hasn’t yet emitted any items

sub= new BehaviorSubject(0);


numbers = new BehaviorSubject(0);

numbers.subscribe(
  num=> console.log('sub1: ', num);
);

numbers.next(1);

numbers.subscribe(
  num=> console.log('sub2: ', num);
);

numbers.next(2);
numbers.next(3);

numbers.complete();

/* 
result:
sub1: 0
sub1: 1
sub2: 1
sub1: 2
sub2: 2
sub1: 3
sub2: 3
*/

Even if the second subscriber subscribed late, it will get all the emitted items including the values that were emitted before it subscribed.

Scheduler

RxJS leans heavily on schedulers for time-based operations and operators.

Code can be executed synchronously or asynchronously. synchronous code executes line-by-line or task-by-task, asynchronous code by registering callbacks to be fired after some task completes, or an event occurs., by doing this we avoid blocking the UI while we wait for resource-intensive or long operations.

the easiest way to demonstrate an async operation is with setTimeOut. setTimeOut will defer the execution of the callback after all queue tasks are executed :


setTimeout(() => console.log('setTimeout'));
console.log('first');
console.log('second');

/* will print 
first
second
setTimeout
*/

Another way in JavaScript to achieve async operation is through the micro Task queue. the microTask queue runs after the current task in the queue is complete and before other async tasks in the queue as the setTimeout. the most common example of API that uses a microTask queue is promises.


setTimeout(() => console.log('setTimeout'));
// think of the queueMicroTask as it was a promise
queueMicroTask(() => console.log('microTask'));
console.log('first');
console.log('second');
/* will print 
first
second
microTask
setTimeout
*/

The requestAnimationFrame API can schedule tasks before the browser repaints. it lets us trigger the function before the browser repaints after 60 times per second to help create smooth animation.


setTimeout(() => console.log('setTimeout'));
queueMicroTask(() => console.log('microTask'));
requestAnimationFrame(() => console.log('animation'));
console.log('first');
console.log('second');
/* will print 
first
second
microTask
animation
setTimeout
*/

the message from the requestAnimationFrame will be logged after themicroTask and before the setTimeout.

Schedulares like what you see above, let you define when work will be executed. for example if you want to schedule an async task:


// Instead of this for async task
setTimeout(() => console.log('async'));

// Use this for async task
asyncScheduler.schedule(() => console.log('async'));

// instead of this to schedule an async task to perform as soon as possible
queueMicrotask(() => console.log('microtask'));

// Use this instead
asapScheduler.schedule(() => console.log('microtask'));

// instead of using this to schedule task for browser repaint 
requestAnimationFrame(() => console.log('animation'));

// use this
animationFrameScheduler.schedule(() => console.log('animation'));

// use this when you need to schedule tasks inside other tasks
queueScheduler.schedule(() => console.log('schedule additional tasks'));

Schedulers accept 3 arguments:

  • Function to invoke on the provided schedule
  • Delay – the time to wait before performing the work
  • stay- which is provided to the function.

The delay when provided to all types of schedulers will make them behave like the asyncScheduler.

let’s see how it will look if we provide the examples above the arguments we mentioned:


asyncScheduler.schedule(console.log, 200, 'async');

asapScheduler.schedule(console.log, null, 'microtask');

animationFrameScheduler.schedule(console.log, null, 'animation');

queueScheduler.schedule(() => console.log('schedule additional tasks'));

Let’s see a few simple cases of using them: