How RxJS groupBy() Works

When working with streams of data, its often useful to group incoming data from a particular stream, lets call it the main stream. For example, imagine our main stream is a stream of incoming HTTP requests:

1linkimport { Router } from 'rxxpress'; // @see [RxXpress](https://loreanvictor.github.io/rxxpress/)

2link

3linkconst router = new Router(); // --> this is an HTTP router that gives us streams of requests

4link

5linkrouter.all('*').pipe( // --> this is our stream

6link ... // --> this is our stream

7link).subscribe(); // --> this is our stream

8link

9linkexport default router.core;

If you are not familiar with RxXpress, its a library quite like Express for handling incoming HTTP requests (it even works with Express under the hood and integrates neatly with Express). In Express, you register a callback for each route, looking at one request at a time. In RxXpress however, you get a stream of requests for each route instead, in form of an RxJS Observable.

We can, for example, authenticate incoming requests, and then group them based on the user making the request:

1linkimport { Router, use } from 'rxxpress'; // @see [RxXpress](https://loreanvictor.github.io/rxxpress/)

2linkimport { groupBy } from 'rxjs/operators'; // --> use groupBy() to group incoming requests

3link

4linkimport { authenticate } from './my-auth-code'; // --> a typical Express authentication middleware

5link

6linkconst router = new Router(); // --> this is an HTTP router that gives us streams of requests

7link

8linkrouter.all('*').pipe( // --> get the stream of all requests

9link use(authenticate), // --> this allows using Express middlewares on a stream

10link groupBy(({req}) => req._.user.id) // --> group incoming requests by user id

11link).subscribe();

12link

13linkexport default router.core;

For this example, we have assumed that authenticate() is a typical Express middleware, authenticating each request and populating req._.user.

This line simply allows us to use an Express middleware on a stream of requests:

1link use(authenticate)

And this line allows us to group incoming requests based on the user making the request:

1link groupBy(({req}) => req._.user.id)

Our main stream is now split into multiple user request streams (or sub-streams), each being the stream of requests by a particular user.

How does this work exactly? Well:

This means it is not clear how many user request stream or sub-stream we will have in the end. We need to deal with an unknown number of sub-streams, which will arrive at unknown points in time.

Fortunately, there is a nice abstraction for unknown number of X arriving at unknown points in time. For example, if we had unknown number of HTTP requests, arriving at unknown points in time, we could simply represent it with a stream of HTTP requests. Now that we have an unknown number of sub-streams arriving at unknown points in time, we can represnt it with a stream of sub-streams, or in our case a stream of user request streams.

And that is what groupBy() does exactly: it turns our main stream into a stream of sub-streams!


linkA Stream of Streams?

To get a better grasp of the stream of streams nature of groupBy() operator, lets take a look at a simpler example:

1linkimport { interval } from 'rxjs';

2linkimport { groupBy } from 'rxjs/operators';

3link

4link

5linkinterval(1000) // --> our main stream

6link.pipe(groupBy(x => x % 3)) // --> grouping by modulo of 3

7link.subscribe(console.log); // --> log the main stream

Try It!

In this case, our main stream is one that emits numbers 0, 1, 2, 3, 4, ... every second. We then split the stream based on their modulo of 3. If you run this code, you will get a console output like this:

1link> GroupedObservable ...

2link> GroupedObservable ...

3link> GroupedObservable ...

Each of these are being logged one second after another, and after the third log you have no further logs. Basically:

Now if we wanted to get the numbers logged, paired with their group, we could for example tap into this stream of streams, and log contents of each sub-stream:

1linkimport { interval } from 'rxjs';

2linkimport { groupBy, tap } from 'rxjs/operators';

3link

4link

5linkinterval(1000)

6link.pipe(

7link groupBy(x => x % 3),

8link tap(group => {

9link group.subscribe(v => console.log(`${group.key}:: ${v}`));

10link })

11link)

12link.subscribe(console.log);

Try It!

Running this code would yield something like this:

1link> GroupedObservable ...

2link0:: 0

3link> GroupedObservable ...

4link1:: 1

5link> GroupedObservable ...

6link2:: 2

7link0:: 3

8link1:: 4

9link2:: 5

10link...

Notice how group.key is used to identify each group. This is the same value that identifies data of each group, i.e. the result of x => x % 3.

The cool part of RxJS is that it also allows us to merge these split streams back into one stream. For example, we can use mergeMap() to simply merge all of the streams:

1linkimport { interval } from 'rxjs';

2linkimport { groupBy, mergeMap, map } from 'rxjs/operators';

3link

4link

5linkinterval(1000)

6link.pipe(

7link groupBy(x => x % 3),

8link mergeMap(group => // --> merge all sub-streams

9link group.pipe(map(v => `${group.key}:: ${v}`)) // --> turn each sub-stream to a stream of strings, also including the sub-stream key

10link ) // --> merge all sub-streams

11link)

12link.subscribe(console.log);

Try It!

Now we are back to a single stream of strings (instead of a stream of streams), and running code would look like this:

1link0:: 0

2link1:: 1

3link2:: 2

4link0:: 3

5link1:: 4

6link2:: 5

7link...


linkSub-Stream Life Cycle

Getting back to our stream of user request streams example, what is the purpose of this stream-splitting? Well it is useful for situations where you would want to do something on each particular stream. For example, if you want to rate limit each user, you could simply throttle their request stream:

1linkimport { Router, use, next } from 'rxxpress'

2linkimport { groupBy, mergeMap, throttleTime } from 'rxjs/operators';

3link

4linkimport { authenticate } from './my-auth-code';

5link

6linkconst router = new Router();

7link

8linkrouter.all('*').pipe( // --> the request stream

9link use(authenticate), // --> authenticate each request

10link groupBy(({req}) => req._.user.id), // --> split the stream per-user

11link mergeMap(group => group.pipe(throttleTime(10000))), // --> throttle each user-stream individually

12link next(), // --> pass to the next handler

13link).subscribe();

14link

15linkexport default router.core;

In this example, the next() operator simply passes requests that are yet to be responded to to the next request handler (it is similar to calling next() inside an Express middleware).

We are basically:

In effect, this code describes a nice per-user rate-limiting middleware that can be mounted on any Express router / app.

The issue here is that, if there is a sub-stream created for each user, slowly but surely we will fill up the memory with these sub-streams.

Fortunately, groupBy() also accepts a duration selector argument, with which you can control how long each sub-stream should be kept alive. In our example, we simply need to keep each user request stream alive for a bit more than 10 seconds after its last request, which would look like this:

1linkimport { Router, use, next } from 'rxxpress'

2linkimport { groupBy, mergeMap, throttleTime, debounceTime } from 'rxjs/operators';

3link

4linkimport { authenticate } from './my-auth-code';

5link

6linkconst router = new Router();

7link

8linkrouter.all('*').pipe( // --> the request stream

9link use(authenticate), // --> authenticate each request

10link groupBy(

11link ({req}) => req._.user.id, // --> group by user id

12link undefined,

13link group => group.pipe(debounceTime(11000)) // --> keep each group around 11 seconds after last request

14link ),

15link mergeMap(group => group.pipe(throttleTime(10000))), // --> throttle each user-stream individually

16link next(), // --> pass to the next handler

17link).subscribe();

18link

19linkexport default router.core;

This is how the duration selector works:

To see that in action, take a look at this example:

1linkimport { interval, timer } from 'rxjs';

2linkimport { groupBy, mergeMap, tap, scan } from 'rxjs/operators';

3link

4link

5link// In this example, since each group is only kept around for 4 seconds,

6link// the number of emissions of each group doesn't exceed 2.

7link

8linkinterval(1000).pipe(

9link groupBy(

10link v => v % 3, // --> group by modulo 3

11link undefined,

12link () => timer(4000) // --> keep each group around for 4 seconds

13link ),

14link mergeMap(

15link group => group.pipe(

16link scan(count => count + 1, 0), // --> count emissions of each group

17link tap(v => console.log(`${group.key}:: ${v}`)) // --> log that count

18link )

19link )

20link)

21link.subscribe();

Try It!

Here we count the emissions of each sub-stream and log them. Each sub-stream emits a new value every 3 seconds, but it is allowed to live only for 4 seconds, so we cannot get to a count higher than 2:

1link0:: 1

2link1:: 1

3link2:: 1

4link0:: 2

5link1:: 2

6link2:: 2

7link0:: 1

8link1:: 1

9link2:: 1

10link...

However, we could make each sub-stream live for 4 seconds after its last emission, using the debounceTime() operator:

1linkimport { interval, timer } from 'rxjs';

2linkimport { groupBy, mergeMap, tap, scan, debounceTime } from 'rxjs/operators';

3link

4link

5link// In this example, each group is kept around 3.1 seconds after its last

6link// emissions. Since each group emits every 3 seconds, this means that each group

7link// is going to remain around indefinitely.

8link

9linkinterval(1000).pipe(

10link groupBy(

11link v => v % 3, // --> group by modulos 3

12link undefined,

13link group => group.pipe(debounceTime(4000)) // --> keep each group around for ...

14link // --> ... 4 seconds after last emission

15link ),

16link mergeMap(

17link group => group.pipe(

18link scan(count => count + 1, 0), // --> count emissions of each group

19link tap(v => console.log(`${group.key}:: ${v}`)) // --> log that count

20link )

21link )

22link)

23link.subscribe();

Try It!

If you are interested in rate-limiting Express apps, specifically using RxJS and RxXpress, you can also take a look at this blog post:



Hero image by Suganth from Unsplash

A Stream of Streams?Sub-Stream Life Cycle

Home

On Reactive Programmingchevron_right
Yet Another Frontend Frameworkchevron_right
Other Articleschevron_right