Error Handling in RxJS
Rate Limiting an Express App
How RxJS groupBy()
Works
How RxJS groupBy() Works
When working with streams of data, its often useful to group incoming data from a particular stream, lets call it the main stream. For example, imagine our main stream is a stream of incoming HTTP requests:
1linkimport { Router } from 'rxxpress'; // @see [RxXpress](https://loreanvictor.github.io/rxxpress/)
2link
3linkconst router = new Router(); // --> this is an HTTP router that gives us streams of requests
4link
5linkrouter.all('*').pipe( // --> this is our stream
6link ... // --> this is our stream
7link).subscribe(); // --> this is our stream
8link
9linkexport default router.core;
If you are not familiar with RxXpress, its a library quite like Express for handling incoming HTTP requests (it even works with Express under the hood and integrates neatly with Express). In Express, you register a callback for each route, looking at one request at a time. In RxXpress however, you get a stream of requests for each route instead, in form of an RxJS Observable.
We can, for example, authenticate incoming requests, and then group them based on the user making the request:
1linkimport { Router, use } from 'rxxpress'; // @see [RxXpress](https://loreanvictor.github.io/rxxpress/)
2linkimport { groupBy } from 'rxjs/operators'; // --> use groupBy() to group incoming requests
3link
4linkimport { authenticate } from './my-auth-code'; // --> a typical Express authentication middleware
5link
6linkconst router = new Router(); // --> this is an HTTP router that gives us streams of requests
7link
8linkrouter.all('*').pipe( // --> get the stream of all requests
9link use(authenticate), // --> this allows using Express middlewares on a stream
10link groupBy(({req}) => req._.user.id) // --> group incoming requests by user id
11link).subscribe();
12link
13linkexport default router.core;
For this example, we have assumed that authenticate()
is a typical Express middleware, authenticating
each request and populating req._.user
.
This line simply allows us to use an Express middleware on a stream of requests:
1link use(authenticate)
And this line allows us to group incoming requests based on the user making the request:
1link groupBy(({req}) => req._.user.id)
Our main stream is now split into multiple user request streams (or sub-streams), each being the stream of requests by a particular user.
How does this work exactly? Well:
This means it is not clear how many user request stream or sub-stream we will have in the end. We need to deal with an unknown number of sub-streams, which will arrive at unknown points in time.
Fortunately, there is a nice abstraction for unknown number of X arriving at unknown points in time. For example, if we had unknown number of HTTP requests, arriving at unknown points in time, we could simply represent it with a stream of HTTP requests. Now that we have an unknown number of sub-streams arriving at unknown points in time, we can represnt it with a stream of sub-streams, or in our case a stream of user request streams.
And that is what groupBy()
does exactly: it turns our main stream into a stream of sub-streams!
To get a better grasp of the stream of streams nature of groupBy()
operator, lets take a look at a simpler example:
1linkimport { interval } from 'rxjs';
2linkimport { groupBy } from 'rxjs/operators';
3link
4link
5linkinterval(1000) // --> our main stream
6link.pipe(groupBy(x => x % 3)) // --> grouping by modulo of 3
7link.subscribe(console.log); // --> log the main stream
In this case, our main stream is one that emits numbers 0, 1, 2, 3, 4, ...
every second.
We then split the stream based on their modulo of 3. If you run this code, you will get a console output
like this:
1link> GroupedObservable ...
2link> GroupedObservable ...
3link> GroupedObservable ...
Each of these are being logged one second after another, and after the third log you have no further logs. Basically:
GroupedObservable
),3 % 3 === 0 % 3
, then the first
sub-stream is used again, instead of creating a new stream, and so nothing more is logged.Now if we wanted to get the numbers logged, paired with their group, we could for example
tap
into this stream of streams, and
log contents of each sub-stream:
1linkimport { interval } from 'rxjs';
2linkimport { groupBy, tap } from 'rxjs/operators';
3link
4link
5linkinterval(1000)
6link.pipe(
7link groupBy(x => x % 3),
8link tap(group => {
9link group.subscribe(v => console.log(`${group.key}:: ${v}`));
10link })
11link)
12link.subscribe(console.log);
Running this code would yield something like this:
1link> GroupedObservable ...
2link0:: 0
3link> GroupedObservable ...
4link1:: 1
5link> GroupedObservable ...
6link2:: 2
7link0:: 3
8link1:: 4
9link2:: 5
10link...
Notice how
group.key
is used to identify each group. This is the same value that identifies data of each group, i.e. the result ofx => x % 3
.
The cool part of RxJS is that it also allows us to merge these split streams back into one stream.
For example, we can use mergeMap()
to simply merge all of the streams:
1linkimport { interval } from 'rxjs';
2linkimport { groupBy, mergeMap, map } from 'rxjs/operators';
3link
4link
5linkinterval(1000)
6link.pipe(
7link groupBy(x => x % 3),
8link mergeMap(group => // --> merge all sub-streams
9link group.pipe(map(v => `${group.key}:: ${v}`)) // --> turn each sub-stream to a stream of strings, also including the sub-stream key
10link ) // --> merge all sub-streams
11link)
12link.subscribe(console.log);
Now we are back to a single stream of strings (instead of a stream of streams), and running code would look like this:
1link0:: 0
2link1:: 1
3link2:: 2
4link0:: 3
5link1:: 4
6link2:: 5
7link...
Getting back to our stream of user request streams example, what is the purpose of this stream-splitting? Well it is useful for situations where you would want to do something on each particular stream. For example, if you want to rate limit each user, you could simply throttle their request stream:
1linkimport { Router, use, next } from 'rxxpress'
2linkimport { groupBy, mergeMap, throttleTime } from 'rxjs/operators';
3link
4linkimport { authenticate } from './my-auth-code';
5link
6linkconst router = new Router();
7link
8linkrouter.all('*').pipe( // --> the request stream
9link use(authenticate), // --> authenticate each request
10link groupBy(({req}) => req._.user.id), // --> split the stream per-user
11link mergeMap(group => group.pipe(throttleTime(10000))), // --> throttle each user-stream individually
12link next(), // --> pass to the next handler
13link).subscribe();
14link
15linkexport default router.core;
In this example, the
next()
operator simply passes requests that are yet to be responded to to the next request handler (it is similar to callingnext()
inside an Express middleware).
We are basically:
In effect, this code describes a nice per-user rate-limiting middleware that can be mounted on any Express router / app.
The issue here is that, if there is a sub-stream created for each user, slowly but surely we will fill up the memory with these sub-streams.
Fortunately, groupBy()
also accepts a duration selector argument, with which you can control
how long each sub-stream should be kept alive. In our example, we simply need to keep each
user request stream alive for a bit more than 10 seconds after its last request, which would
look like this:
1linkimport { Router, use, next } from 'rxxpress'
2linkimport { groupBy, mergeMap, throttleTime, debounceTime } from 'rxjs/operators';
3link
4linkimport { authenticate } from './my-auth-code';
5link
6linkconst router = new Router();
7link
8linkrouter.all('*').pipe( // --> the request stream
9link use(authenticate), // --> authenticate each request
10link groupBy(
11link ({req}) => req._.user.id, // --> group by user id
12link undefined,
13link group => group.pipe(debounceTime(11000)) // --> keep each group around 11 seconds after last request
14link ),
15link mergeMap(group => group.pipe(throttleTime(10000))), // --> throttle each user-stream individually
16link next(), // --> pass to the next handler
17link).subscribe();
18link
19linkexport default router.core;
This is how the duration selector works:
To see that in action, take a look at this example:
1linkimport { interval, timer } from 'rxjs';
2linkimport { groupBy, mergeMap, tap, scan } from 'rxjs/operators';
3link
4link
5link// In this example, since each group is only kept around for 4 seconds,
6link// the number of emissions of each group doesn't exceed 2.
7link
8linkinterval(1000).pipe(
9link groupBy(
10link v => v % 3, // --> group by modulo 3
11link undefined,
12link () => timer(4000) // --> keep each group around for 4 seconds
13link ),
14link mergeMap(
15link group => group.pipe(
16link scan(count => count + 1, 0), // --> count emissions of each group
17link tap(v => console.log(`${group.key}:: ${v}`)) // --> log that count
18link )
19link )
20link)
21link.subscribe();
Here we count the emissions of each sub-stream and log them. Each sub-stream emits a new value every 3 seconds, but it is allowed to live only for 4 seconds, so we cannot get to a count higher than 2:
1link0:: 1
2link1:: 1
3link2:: 1
4link0:: 2
5link1:: 2
6link2:: 2
7link0:: 1
8link1:: 1
9link2:: 1
10link...
However, we could make each sub-stream live for 4 seconds after its last emission, using the
debounceTime()
operator:
1linkimport { interval, timer } from 'rxjs';
2linkimport { groupBy, mergeMap, tap, scan, debounceTime } from 'rxjs/operators';
3link
4link
5link// In this example, each group is kept around 3.1 seconds after its last
6link// emissions. Since each group emits every 3 seconds, this means that each group
7link// is going to remain around indefinitely.
8link
9linkinterval(1000).pipe(
10link groupBy(
11link v => v % 3, // --> group by modulos 3
12link undefined,
13link group => group.pipe(debounceTime(4000)) // --> keep each group around for ...
14link // --> ... 4 seconds after last emission
15link ),
16link mergeMap(
17link group => group.pipe(
18link scan(count => count + 1, 0), // --> count emissions of each group
19link tap(v => console.log(`${group.key}:: ${v}`)) // --> log that count
20link )
21link )
22link)
23link.subscribe();
If you are interested in rate-limiting Express apps, specifically using RxJS and RxXpress, you can also take a look at this blog post:
Hero image by Suganth from Unsplash
RxJS groupBy() operator is a useful but nuanced tool. In last post we used it to easily and descriptively rate-limit an API, here we will dig more into its specification in the context of same example.
RxJS, Express, groupBy, API, Rate Limit, RxXpress, Stream, Events, Reactive Programming, FRP