Error Handling in RxJS
Rate Limiting an Express App
How RxJS groupBy()
Works
Error Handling in RxJS
Take this little RxJS snippet: It looks up abilities of a given Pokémon in the PokéAPI and logs the results to the console:
1linkimport { ajax } from 'rxjs/ajax'; // @see [RxJS](https://learnrxjs.io)
2linkimport { BehaviorSubject } from 'rxjs'; // @see [RxJS](https://learnrxjs.io)
3linkimport { switchMap } from 'rxjs/operators'; // @see [RxJS](https://learnrxjs.io)
4link
5linkconst POKE_API = 'https://pokeapi.co/api/v2/pokemon/';
6link
7linkconst name = new BehaviorSubject('charizard'); // --> Represents the name of the Pokémon
8linkname.pipe(switchMap(name => ajax.getJSON(POKE_API + name + '/'))) // --> Request the API
9link.subscribe(d => // --> For each response ...
10link console.log(
11link `${d.name}:: ` + // --> ... Log the name of the Pokemon ...
12link `${d.abilities.map(a => a.ability.name).join(', ')}` // --> ... And their abilities
13link )
14link);
To utilize it, we can for example feed Pokémon names directly into the name
subject:
1linksetTimeout(() => name.next('snorlax'), 1000); // --> Let's have a timeout so data is requested one by one
2linksetTimeout(() => name.next('eevee'), 3000); // --> Let's have a timeout so data is requested one by one
3link
4link// Result:
5link// > charizard:: solar-power, blaze
6link// > snorlax:: gluttony, thick-fat, immunity
7link// > eevee:: anticipation, adaptability, run-away
Or we can bind the name
subject to an HTML input so that we get an interface
for our nice console tool:
1link/** @jsx renderer.create **/
2linkimport { Renderer } from '@connectv/html'; // @see [CONNECTIVE HTML](https://github.com/CONNECT-platform/connective-html)
3link
4linkconst renderer = new Renderer(); // --> The `Renderer` class helps us render HTML elements
5linkrenderer.render(<input _state={name}/>).on(document.body); // --> Render an input whose state is bound to `name`
Either way, our code snippet works pretty fine for correct Pokémon names and logs their abilities to the console.
But what happens when we accidentally feed name
a non-existing Pokémon name?
1linksetTimeout(() => name.next('snorlax'), 1000); // --> snorlax info is fetched
2linksetTimeout(() => name.next('ZZZ'), 1000); // --> ZZZ does not exist
3linksetTimeout(() => name.next('eevee'), 3000); // --> eevee info is also not fetched!
4link
5link// Result:
6link// > charizard:: solar-power, blaze
7link// > snorlax:: gluttony, thick-fat, immunity
8link// No more logs
Nothing was logged for 'ZZZ'
, which is not unexpected since we do not have any code
to handle errors, and since 'ZZZ'
is not a correct Pokémon name, it naturally gets an error
from the PokéAPI.
The problem is that when we fed 'eevee'
to name
afterwards,
we still don't get any logs on the console. So, what went wrong?
To understand the situation, lets recall what Observable
s basically are:
An
Observable
is sorta-kinda like a function. To be more precise, its like a push function that generates multiple values (according to Rxjs's docs).In contrast, a normal function is a pull function that generates one value.
If an Observable
is like a function, then a Subscription
is its equivalent of a function call.
The same way you need to call a function to get its value, you need to subscribe to an observable
to start getting its values:
1linkname.pipe(switchMap(name => ajax.getJSON(POKE_API + name + '/'))) // --> This is the `Observable`, or the function
2link.subscribe(d => // --> This is the `Subscription`, or the function call
3link console.log( // --> This is the `Subscription`, or the function call
4link `${d.name}:: ` + // --> This is the `Subscription`, or the function call
5link `${d.abilities.map(a => a.ability.name).join(', ')}` // --> This is the `Subscription`, or the function call
6link )
7link);
When an unhandled error happens in a function call, that particular function call terminates.
Similarly, when an unhandled error happens in a Subscription
, that Subscription
terminates.
In our code, we have ONE subscription to name.pipe(...)
, so
when an error occurs in it, it terminates:
1linksetTimeout(() => name.next('snorlax'), 1000);
2linksetTimeout(() => name.next('ZZZ'), 1000);
3linksetTimeout(() => name.next('eevee'), 3000);
name
emits 'charizard'
, its initial value, so our subscription receives charizard's info from API.
name
emits 'snorlax'
, so our subscription receives snorlax's info from API.
name
emits 'ZZZ'
, and our subscription receives an error and terminates.
name
doesn't even emit 'eevee'
because there are no subscriptions to emit to.
If it was imperative programming, we would simply enclose the whole thing in a try/catch block, so lets do the RxJS equivalent of that:
1linkimport { ajax } from 'rxjs/ajax';
2linkimport { BehaviorSubject, of } from 'rxjs';
3linkimport { switchMap, catchError } from 'rxjs/operators'; // --> Also import `catchError`
4link
5linkconst POKE_API = 'https://pokeapi.co/api/v2/pokemon/';
6link
7linkconst name = new BehaviorSubject('charizard');
8linkname.pipe(
9link switchMap(name => ajax.getJSON(POKE_API + name + '/')),
10link catchError(() => of({ // --> So when there is an error ...
11link name: 'unknown', // --> ... Return an `unknown` Pokemon ...
12link abilities: [] // --> ... With no abilities.
13link }))
14link).subscribe(d =>
15link console.log(
16link `${d.name}:: ` +
17link `${d.abilities.map(a => a.ability.name).join(', ')}`
18link )
19link);
Now lets run it against our failing test case again:
1linksetTimeout(() => name.next('snorlax'), 1000); // --> snorlax info is fetched
2linksetTimeout(() => name.next('ZZZ'), 1000); // --> ZZZ does not exist
3linksetTimeout(() => name.next('eevee'), 3000); // --> eevee info is also not fetched!
4link
5link// Result:
6link// > charizard:: solar-power, blaze
7link// > snorlax:: gluttony, thick-fat, immunity
8link// > unknown::
9link// No more logs
We get the unknown::
log, which means we are catching the error and handling it.
However, we are still not getting any response to 'eevee'
. Why is that?
First, lets recall what RxJS pipes are:
A
pipe
is simply a function that transforms oneObservable
to another.x.pipe(a, b, c)
is basically equivalent to
1linky = x.pipe(a);2linkz = y.pipe(b);3linkw = z.pipe(c);
So this code:
1linkname.pipe(switchMap(...), catchError(...)).subscribe(...)
is basically equivalent to this code:
1linkx = name.pipe(switchMap(...))
2linky = x.pipe(catchError(...))
3linky.subscibe(...)
Or this code:
1link x = switchMap(...)(name);
2link y = catchError(...)(x);
3link y.subscribe(...);
When you subscribe to y
, it internally subscribes to x
and passes down values it receives
from that inner subscription to your outer subscription.
When an error occurs in that inner subscription, it will naturally terminate.
However, catchError()
wants to maintain the outer subscription, so it calls the
factory function you provide it (() => of(...)
in this case), and creates
another inner subscription to the result of this function (which should have returned an observable),
now feeding the outer subscription from this new inner subscription.
In other words, what happens here is:
1linkname.pipe(
2link switchMap(name => ajax.getJSON(POKE_API + name + '/')),
3link catchError(() => of({
4link name: 'unknown',
5link abilities: []
6link }))
7link).subscribe(...);
8link
9linksetTimeout(() => name.next('snorlax'), 1000);
10linksetTimeout(() => name.next('ZZZ'), 1000);
11linksetTimeout(() => name.next('eevee'), 3000);
You are originally subscribed to name
(and the switchMap()
) via catchError()
.
name
emits 'ZZZ'
, switchMap()
throws an error, the inner subscription to it is terminated.
To mitigate, catchError()
calls () => of(...)
to create a new inner subscription as the source
of your outer subscription.
Now you are basically subscribed to of(...)
which emits once and terminates.
Note that at this point, YOU ARE NO LONGER SUBSCRIBED TO name
.
name
doesn't even emit 'eevee'
as there are no subscriptions to emit to.
This might seem confusing and weird, however this is exactly how normal functions would behave.
Basically, catchError()
would be a function like this if we were working with normal functions
instead of observables:
1linkfunction catchError(handlerFactory) {
2link return function(ogFunc) {
3link return (...args) => { // --> the outer function call (like the outer subscription)
4link try {
5link return ogFunc(...args); // --> the inner function call (like the inner subscription)
6link } catch(error) {
7link return handlerFactory(error)(...args); // --> the replacement inner function call (like the replacement inner subscription)
8link }
9link }
10link }
11link}
1linkx = function() { ... }; // --> the original function
2linky = catchError(...)(x); // --> the catchError pipe
3linky(...); // --> the function call
1linkx = ... // --> the original observable
2linky = catchError(...)(x); // --> the catchError pipe
3linky.subscribe(...); // --> the subscription
The difference however, is that from a normal function, we only expect ONE value. So in case of error, we can simply replace it with another ONE value.
Observables on the other hand, can (and are expected to) push multiple values.
In our case, we literally expect our subscription to keep getting values in response to
future events, while we are replacing it with a one-off subscription (to of(...)
).
A neat solution would be to conduct the error handling closer to its source.
In our example, this source (fortunately) is not the long-living subscription to name
itself,
but rather the short-living one-off subscriptions to ajax.getJSON(...)
that we
create for every emission of name
.
Because these subscriptions are supposed to be short-lived themselves (each one is supposed to respond with ONE value), we can safely replace them with another one-off subscription in case of error:
1linkname.pipe(
2link switchMap(
3link name => ajax.getJSON(POKE_API + name + '/').pipe( // --> so `catchError()` is directly piped to `ajax.getJSON()`
4link catchError(() => of({ // --> so `catchError()` is directly piped to `ajax.getJSON()`
5link name: 'unknown', // --> so `catchError()` is directly piped to `ajax.getJSON()`
6link abilities: [] // --> so `catchError()` is directly piped to `ajax.getJSON()`
7link })) // --> so `catchError()` is directly piped to `ajax.getJSON()`
8link ) // --> so `catchError()` is directly piped to `ajax.getJSON()`
9link ),
10link).subscribe(d =>
11link console.log(
12link `${d.name}:: ` +
13link `${d.abilities.map(a => a.ability.name).join(', ')}`
14link )
15link);
Now this sequence would behave like this in our test case:
1linksetTimeout(() => name.next('snorlax'), 1000);
2linksetTimeout(() => name.next('ZZZ'), 1000);
3linksetTimeout(() => name.next('eevee'), 3000);
4link
5link// Result:
6link// > charizard:: solar-power, blaze
7link// > snorlax:: gluttony, thick-fat, immunity
8link// > unknown::
9link// > eevee:: anticipation, adaptability, run-away
What about situations where the source of the error IS a long-living subscription?
In that case, when our long living subscription terminates due to an error, we could actually replace it by another subscription to the same long living observable.
The retry()
operator does exactly that:
1linkimport { ajax } from 'rxjs/ajax';
2linkimport { BehaviorSubject } from 'rxjs';
3linkimport { switchMap, retry } from 'rxjs/operators'; // --> Now we are importing `retry` as well
4link
5linkconst POKE_API = 'https://pokeapi.co/api/v2/pokemon/';
6link
7linkconst name = new BehaviorSubject('charizard');
8linkname.pipe(
9link switchMap(name => ajax.getJSON(POKE_API + name + '/')),
10link retry(), // --> In case of error, simply retry
11link).subscribe(d =>
12link console.log(
13link `${d.name}:: ` +
14link `${d.abilities.map(a => a.ability.name).join(', ')}`
15link )
16link);
touch_app FUN FACT
You could actually replicate behavior of
retry()
usingcatchError()
:
1linkfunction myRetry(observable) {2link return observable.pipe(catchError(() => myRetry(observable)));3link}
1linkname.pipe(switchMap(...), myRetry).subscribe(...);
On the first glance, this approach seems to cleanly solve our issue:
1linksetTimeout(() => name.next('snorlax'), 1000);
2linksetTimeout(() => name.next('ZZZ'), 1000);
3linksetTimeout(() => name.next('eevee'), 3000);
4link
5link// > charizard:: solar-power, blaze
6link// > snorlax:: gluttony, thick-fat, immunity
7link// > eevee:: anticipation, adaptability, run-away
However, if we add a console log before each request, we can see that we are actually messing up pretty terribly:
1linkname.pipe(
2link tap(value => console.log(`REQUEST FOR ${value}`)),
3link switchMap(name => ajax.getJSON(POKE_API + name + '/')),
4link retry(),
5link).subscribe(...);
1linksetTimeout(() => name.next('snorlax'), 1000);
2linksetTimeout(() => name.next('ZZZ'), 1000);
3linksetTimeout(() => name.next('eevee'), 3000);
4link
5link// > REQUEST FOR charizard
6link// > charizard:: solar-power, blaze
7link// > REQUEST FOR snorlax
8link// > snorlax:: gluttony, thick-fat, immunity
9link// > REQUEST FOR ZZZ
10link// > REQUEST FOR ZZZ
11link// > REQUEST FOR ZZZ
12link// > REQUEST FOR ZZZ
13link// > REQUEST FOR ZZZ
14link// > REQUEST FOR ZZZ
15link// ...
16link// ... about 100 times or more
17link// ...
18link// > REQUEST FOR ZZZ
19link// > REQUEST FOR ZZZ
20link// > REQUEST FOR ZZZ
21link// > REQUEST FOR eevee
22link// > eevee:: anticipation, adaptability, run-away
What happened?
Well, everytime our ajax.getJSON(...)
subscription fails, retry()
will re-subscribe
to its previous observable, which means it is indirectly re-subscribing to name
as well.
name
is a BehaviorSubject
, which means when you subscribe to it, it will immediately emit its latest value to you,
which in this case is 'ZZZ'
.
As a result, immediately after each time we get an error, we re-subscribe
to name
, get 'ZZZ'
again, make a request for it, fail, and repeat this cycle.
Note that after about one second, 'eevee'
is emitted by name
, which will take us out of this loop. If it wasn't
for that emission, we would be stuck in this loop indefinitely.
To break out of this cycle, we can use retryWhen()
instead of retry()
:
1linkname.pipe(
2link tap(x => console.log(`REQUEST FOR ${x}`)),
3link switchMap(name => ajax.getJSON(POKE_API + name + '/')),
4link retryWhen(() => name), // --> using `retryWhen()` instead of `retry()`
5link).subscribe(...);
retryWhen()
basically retries when the observable returned by the function passed to it
emits its next value. In our case, this means we will retry (re-subscribe) when name
emits
a value:
1linksetTimeout(() => name.next('snorlax'), 1000);
2linksetTimeout(() => name.next('ZZZ'), 1000);
3linksetTimeout(() => name.next('eevee'), 3000);
4link
5link// > REQUEST FOR charizard
6link// > charizard:: solar-power, blaze
7link// > REQUEST FOR snorlax
8link// > snorlax:: gluttony, thick-fat, immunity
9link// > REQUEST FOR ZZZ
10link// > REQUEST FOR ZZZ
11link// > REQUEST FOR eevee
12link// > eevee:: anticipation, adaptability, run-away
Note that 'ZZZ'
is still being tried two times. That is because when the first try
causes an error, retryWhen()
subscribes to name
as its notifier, which immediately
emits 'ZZZ'
once more, causing the second try.
Alternatively, we could keep using retry()
and make name
a Subject
instead of a BehaviorSubject
.
In that case, it would emit each value only once and to subscriptions present at the time,
so when we re-subscribe to it after an error, we won't get the problematic value again:
1linkconst name = new Subject();
2linkname.pipe(
3link tap(x => console.log(`REQUEST FOR ${x}`)),
4link switchMap(name => ajax.getJSON(POKE_API + name + '/')),
5link retry(),
6link).subscribe(...);
1linksetTimeout(() => name.next('snorlax'), 1000);
2linksetTimeout(() => name.next('ZZZ'), 1000);
3linksetTimeout(() => name.next('eevee'), 3000);
4link
5link// > REQUEST FOR snorlax
6link// > snorlax:: gluttony, thick-fat, immunity
7link// > REQUEST FOR ZZZ
8link// > REQUEST FOR eevee
9link// > eevee:: anticipation, adaptability, run-away
warning CAREFUL THOUGH ...
Using
retryWhen()
in combination withSubject
wouldn't actually work:
1linkconst name = new Subject();2linkname.pipe(3link tap(x => console.log(`REQUEST FOR ${x}`)),4link switchMap(name => ajax.getJSON(POKE_API + name + '/')),5link retryWhen(() => name)6link).subscribe(...)This is due to the fact that we do not re-subscribe until
name
emits another value after the problematic'ZZZ'
. The next value is'eevee'
, so we retry (re-subscribe) AFTER it was emitted, meaning that we would basically miss it.
Hero Image by Sebastian Herrmann from Unsplash.
Hero Image by Mitchell Luo from Unsplash.
A deep-dive in error-handling in RxJS, its solutions, pitfalls, etc.
RxJS, error handling, observable, streams, errors, reactive extensions, retry, try and catch