TL;DR: In this article, we will reverse engineer Observable from the RxJS library. We will also re-create a couple of operators from the library and learn about Observer and Subscription. We are also going to use TypeScript to annotate the code. In the end, we will write a very small example code to use that reverse engineered library. You can find the demo at StackBlitz.
Introduction to Reactive Programming and RxJS
Let’s keep it simple.
- Reactive Programming is programming with asynchronous data streams — Andre Staltz (Creator of cycle.js)
I am not going to give you a lengthy explanation in this post (you can find more information here) but Reactive Programming is basically a paradigm (or approach) to manage asynchronous data streams.
RxJS is a library to do Reactive Programming. It allows you to write reactive programs with a functional approach.
What is Observable?
Observable is the core element of RxJS. It’s more or less like an array, whose items arrive in the future asynchronously.
- Observable represents the idea of an invokable collection of future values or events. — RxJS Docs
From the API perspective, Observable has a subscribe method. This subscribe method is used to invoke the execution of an Observable.
In the above example, we have created an Observable named observable using some magical rxjs code and then we invoked the subscribe method by passing doSomething. An important thing to remember is when we call this subscribe method only then Observable starts working. For now, ignore how we created observable and what doSomething is.
It’s also worth noting that this subscribe method returns something called a Subscription. Basically, this Subscription allows us to unsubscribe from the Observable. In other words, it returns an Object with an unsubscribe method, which allows us to stop listening to the values sent by Observable.
What is Observer?
Observer is a collection of callback functions, which reacts to the value arriving via Observable.
- Observer is a collection of callbacks that knows how to listen to values delivered by the Observable. — RxJS Docs.
In Observable, we need callbacks for three things:
- values — future values, which Observable is going to send/push
- errors — errors which might occur while invocation of an Observable to signal when observable is done with sending values
Hence, Observer is a collection of three callback methods as shown below:
The subscribe method and Observer
There is a relationship between Observer and subscribe method. Take a look at the following example:
Here, we have created an Observable and then executed it by invoking subscribe method. And if you take a closer look, we have passed an Observer to that subscribe method.
You can write the type definition of subscribe in TypeScript as follows:
Observable.subscribe(observer:Observer):Subscription;
You can combine this pattern with any Push API.
Using Observable and Observer pattern
In the following example, we are going to wrap Observable around JavaScript’s setInterval API:
Now we can call this setIntervalObservable method with time and subscribe to it. It will fire the observer.next callback after each cycle of given time as shown below:
Reverse engineering Observable
So far you have learned about Observer, Observable, Subscription and so on. Now we are going to create Observable using TypeScript Classes and Interfaces.
Creating Observer interface
Observer, as mentioned, is a collection of callbacks. You already know about next, error and complete but there is an optional value named closed. Which you are going to use later in this tutorial :
Creating Subscription class
As mentioned above, subscribe method returns a Subscription. So basically, a subscription takes unsubscribe method as input, so that it can be invoked by the user later on:
Creating Observable class
In this section, we will create an Observable class and a constructor which takes subscribe method as an input. The subscribe method takes Observer as input and returns a Subscription:
Creating create static method on Observable class
Observable also comes with a static method named create to create new Observable. This method also takes a subscribe method and returns an Observable:
RxJS creation operators
Usually, when working with RxJS, you don’t really have to create your own custom Observable. RxJS comes with creation methods which allow you to create Observable from different types of Inputs. The input to the creation method can be anything depending upon the needs but it must return an Observable.
You can describe creation operators using TypeScript as follows:
creationOperator(input:any): Observable;
There are so many creation operators in RxJS like fromEvent and of to name a few.
setIntervalObservable (that we used earlier) is actually a creation method. We can easily re-write it using our Observable and Subscription Class as shown below:
Reverse engineering of creation operator
The of creation operator from RxJS basically takes multiple values as an input and then pushes/sends those values to the observer as shown below:
We have to do the following:
- loop over each value given as input
- fire observer.next with those values
- after that, fire observer.complete()
- return a Subscription
Here is the complete code for the of operator:
How to create a custom creation operator?
Creating custom creation operators looks something like this:
- operator can take any number or type of inputs, depending upon the need
- It must return an Observable
- send/push values by invoking observer.next
- After observable is complete, fire observer.complete()
- Don’t forget to return a Subscription from within Observable
Pipeable operators in RxJS
So far we have created Observable and subscribed to them. But there is another big element of RxJS which allows us to do functional programming with asynchronous values. So we can basically use Array’s map, filter or similar methods/operators to modify the original Observable.
To work with these operators, there is a method on Observable class named pipe. This pipe method takes single or multiple operators as an input and returns a new Observable:
Observable.pipe(...invokedOperators): Observable;
Here is an example of using a filter and map operator in RxJS:
Creating custom pipeable operators
You have to first understand the structure and anatomy of RxJS pipeable operator to write our own custom pipe method on Observable class.
The type definition of a pipeable operator using TypeScript would look something like this:
type pipeableOperator = (input) => (source:Observable) => Observable;
- operator takes an input. This input can be anything and either single or multi-value. It depends upon what kind of operator you want to make.
- the operator function returns another function. This returned function takes the source Observable as an input and returns a new Observable by modifying the input by performing the desired action based on operator’s input.
Creating filter operator
In order to create filter operator, let’s first see it’s structure:
filter(filterPredicate): (source:Observable) => Observable;
- filterPredicate is the function which returns a Boolean value. You have to apply it to the value emitted by source Observable.
- We can access the values emitted by source Observable by subscribing to it, as shown below:
- Inside the if condition shown above, emit the value for new Observable.
Here is how we can code filter operator :
- Similarly, you can create other operators like map and so on.
Creating the pipe method
Now we can reverse engineer the pipe method. But first, we have to do the following:
- Pipe method takes single or multiple inputs. So we have to loop over all of those operators. We can use JavaScript’s spread operator and forEach to do that as shown below:
-
It is important to realize that, inside pipe method, we don’t really get the pipeable operator but the invocation of it. In other words, we are basically accessing whatever is returned by the operator. It is a function which takes source Observable and returns new modified Observable.
-
We can access the source Observable via this.
-
Basically, we will start with this as first Observable and then call the first operator on it. We will use this new Observable as a source for next operator.
Here is how we will write pipe method:
Final example
Here is an example of creating and consuming an Observable using our reverse engineered library:
The fun part is, code in the above example is totally compatible with RxJS. So basically you can switch the imports to RxJS library and everything will work fine.
Conclusion
In this article, we have written a very tiny subset of RxJS Observable, creating custom creation operators and custom pipeable operators, along with reverse engineering of operator, filter operator from the RxJS. We have also learned about Observer and Subscriptions. You can check the demo at StackBlitz