How RxJS groupBy() Works

When working with streams of data, its often useful to group incoming data from a particular stream, lets call it the main stream. For example, imagine our main stream is a stream of incoming HTTP requests:

1import { Router } from 'rxxpress'; // @see [RxXpress](https://loreanvictor.github.io/rxxpress/)

2

3const router = new Router(); // --> this is an HTTP router that gives us streams of requests

4

5router.all('*').pipe( // --> this is our stream

6 ... // --> this is our stream

7).subscribe(); // --> this is our stream

8

9export default router.core;

If you are not familiar with RxXpress, its a library quite like Express for handling incoming HTTP requests (it even works with Express under the hood and integrates neatly with Express). In Express, you register a callback for each route, looking at one request at a time. In RxXpress however, you get a stream of requests for each route instead, in form of an RxJS Observable.

We can, for example, authenticate incoming requests, and then group them based on the user making the request:

1import { Router, use } from 'rxxpress'; // @see [RxXpress](https://loreanvictor.github.io/rxxpress/)

2import { groupBy } from 'rxjs/operators'; // --> use groupBy() to group incoming requests

3

4import { authenticate } from './my-auth-code'; // --> a typical Express authentication middleware

5

6const router = new Router(); // --> this is an HTTP router that gives us streams of requests

7

8router.all('*').pipe( // --> get the stream of all requests

9 use(authenticate), // --> this allows using Express middlewares on a stream

10 groupBy(({req}) => req._.user.id) // --> group incoming requests by user id

11).subscribe();

12

13export default router.core;

For this example, we have assumed that authenticate() is a typical Express middleware, authenticating each request and populating req._.user.

This line simply allows us to use an Express middleware on a stream of requests:

1 use(authenticate)

And this line allows us to group incoming requests based on the user making the request:

1 groupBy(({req}) => req._.user.id)

Our main stream is now split into multiple user request streams (or sub-streams), each being the stream of requests by a particular user.

How does this work exactly? Well:

This means it is not clear how many user request stream or sub-stream we will have in the end. We need to deal with an unknown number of sub-streams, which will arrive at unknown points in time.

Fortunately, there is a nice abstraction for unknown number of X arriving at unknown points in time. For example, if we had unknown number of HTTP requests, arriving at unknown points in time, we could simply represent it with a stream of HTTP requests. Now that we have an unknown number of sub-streams arriving at unknown points in time, we can represnt it with a stream of sub-streams, or in our case a stream of user request streams.

And that is what groupBy() does exactly: it turns our main stream into a stream of sub-streams!


linkA Stream of Streams?

To get a better grasp of the stream of streams nature of groupBy() operator, lets take a look at a simpler example:

1import { interval } from 'rxjs';

2import { groupBy } from 'rxjs/operators';

3

4

5interval(1000) // --> our main stream

6.pipe(groupBy(x => x % 3)) // --> grouping by modulo of 3

7.subscribe(console.log); // --> log the main stream

Try It!

In this case, our main stream is one that emits numbers 0, 1, 2, 3, 4, ... every second. We then split the stream based on their modulo of 3. If you run this code, you will get a console output like this:

1> GroupedObservable ...

2> GroupedObservable ...

3> GroupedObservable ...

Each of these are being logged one second after another, and after the third log you have no further logs. Basically:

Now if we wanted to get the numbers logged, paired with their group, we could for example tap into this stream of streams, and log contents of each sub-stream:

1import { interval } from 'rxjs';

2import { groupBy, tap } from 'rxjs/operators';

3

4

5interval(1000)

6.pipe(

7 groupBy(x => x % 3),

8 tap(group => {

9 group.subscribe(v => console.log(`${group.key}:: ${v}`));

10 })

11)

12.subscribe(console.log);

Try It!

Running this code would yield something like this:

1> GroupedObservable ...

20:: 0

3> GroupedObservable ...

41:: 1

5> GroupedObservable ...

62:: 2

70:: 3

81:: 4

92:: 5

10...

Notice how group.key is used to identify each group. This is the same value that identifies data of each group, i.e. the result of x => x % 3.

The cool part of RxJS is that it also allows us to merge these split streams back into one stream. For example, we can use mergeMap() to simply merge all of the streams:

1import { interval } from 'rxjs';

2import { groupBy, mergeMap, map } from 'rxjs/operators';

3

4

5interval(1000)

6.pipe(

7 groupBy(x => x % 3),

8 mergeMap(group => // --> merge all sub-streams

9 group.pipe(map(v => `${group.key}:: ${v}`)) // --> turn each sub-stream to a stream of strings, also including the sub-stream key

10 ) // --> merge all sub-streams

11)

12.subscribe(console.log);

Try It!

Now we are back to a single stream of strings (instead of a stream of streams), and running code would look like this:

10:: 0

21:: 1

32:: 2

40:: 3

51:: 4

62:: 5

7...


linkSub-Stream Life Cycle

Getting back to our stream of user request streams example, what is the purpose of this stream-splitting? Well it is useful for situations where you would want to do something on each particular stream. For example, if you want to rate limit each user, you could simply throttle their request stream:

1import { Router, use, next } from 'rxxpress'

2import { groupBy, mergeMap, throttleTime } from 'rxjs/operators';

3

4import { authenticate } from './my-auth-code';

5

6const router = new Router();

7

8router.all('*').pipe( // --> the request stream

9 use(authenticate), // --> authenticate each request

10 groupBy(({req}) => req._.user.id), // --> split the stream per-user

11 mergeMap(group => group.pipe(throttleTime(10000))), // --> throttle each user-stream individually

12 next(), // --> pass to the next handler

13).subscribe();

14

15export default router.core;

In this example, the next() operator simply passes requests that are yet to be responded to to the next request handler (it is similar to calling next() inside an Express middleware).

We are basically:

In effect, this code describes a nice per-user rate-limiting middleware that can be mounted on any Express router / app.

The issue here is that, if there is a sub-stream created for each user, slowly but surely we will fill up the memory with these sub-streams.

Fortunately, groupBy() also accepts a duration selector argument, with which you can control how long each sub-stream should be kept alive. In our example, we simply need to keep each user request stream alive for a bit more than 10 seconds after its last request, which would look like this:

1import { Router, use, next } from 'rxxpress'

2import { groupBy, mergeMap, throttleTime, debounceTime } from 'rxjs/operators';

3

4import { authenticate } from './my-auth-code';

5

6const router = new Router();

7

8router.all('*').pipe( // --> the request stream

9 use(authenticate), // --> authenticate each request

10 groupBy(

11 ({req}) => req._.user.id, // --> group by user id

12 undefined,

13 group => group.pipe(debounceTime(11000)) // --> keep each group around 11 seconds after last request

14 ),

15 mergeMap(group => group.pipe(throttleTime(10000))), // --> throttle each user-stream individually

16 next(), // --> pass to the next handler

17).subscribe();

18

19export default router.core;

This is how the duration selector works:

To see that in action, take a look at this example:

1import { interval, timer } from 'rxjs';

2import { groupBy, mergeMap, tap, scan } from 'rxjs/operators';

3

4

5// In this example, since each group is only kept around for 4 seconds,

6// the number of emissions of each group doesn't exceed 2.

7

8interval(1000).pipe(

9 groupBy(

10 v => v % 3, // --> group by modulo 3

11 undefined,

12 () => timer(4000) // --> keep each group around for 4 seconds

13 ),

14 mergeMap(

15 group => group.pipe(

16 scan(count => count + 1, 0), // --> count emissions of each group

17 tap(v => console.log(`${group.key}:: ${v}`)) // --> log that count

18 )

19 )

20)

21.subscribe();

Try It!

Here we count the emissions of each sub-stream and log them. Each sub-stream emits a new value every 3 seconds, but it is allowed to live only for 4 seconds, so we cannot get to a count higher than 2:

10:: 1

21:: 1

32:: 1

40:: 2

51:: 2

62:: 2

70:: 1

81:: 1

92:: 1

10...

However, we could make each sub-stream live for 4 seconds after its last emission, using the debounceTime() operator:

1import { interval, timer } from 'rxjs';

2import { groupBy, mergeMap, tap, scan, debounceTime } from 'rxjs/operators';

3

4

5// In this example, each group is kept around 3.1 seconds after its last

6// emissions. Since each group emits every 3 seconds, this means that each group

7// is going to remain around indefinitely.

8

9interval(1000).pipe(

10 groupBy(

11 v => v % 3, // --> group by modulos 3

12 undefined,

13 group => group.pipe(debounceTime(4000)) // --> keep each group around for ...

14 // --> ... 4 seconds after last emission

15 ),

16 mergeMap(

17 group => group.pipe(

18 scan(count => count + 1, 0), // --> count emissions of each group

19 tap(v => console.log(`${group.key}:: ${v}`)) // --> log that count

20 )

21 )

22)

23.subscribe();

Try It!

If you are interested in rate-limiting Express apps, specifically using RxJS and RxXpress, you can also take a look at this blog post:



Hero image by Suganth from Unsplash

A Stream of Streams?Sub-Stream Life Cycle

Home

On Reactive Programmingchevron_right
Yet Another Frontend Frameworkchevron_right
Other Articleschevron_right