Error Handling in RxJS
Rate Limiting an Express App
How RxJS groupBy()
Works
Rate Limiting An Express App
Imagine we have an API with multiple end-points written in NodeJS and using Express. And we want to implement a rate limit on this API for each user (pretty common for any API).
The general code-structure of that would look like something this:
1linkimport * as express from 'express';
2link
3link
4linkconst router = express.Router();
5link
6linkrouter.all('*', (req, res, next) => {
7link // TODO: apply rate limiting
8link});
9link
10link// ...
11link// Rest of your API definition
12link// ...
13link
14linkexport default router;
To apply a global 10 seconds rate limit, we could write the middleware code like this:
1linkimport * as express from 'express';
2link
3link
4linkconst router = express.Router();
5link
6linklet lastReq; // --> store the time of last request
7link
8linkrouter.all('*', (req, res, next) => {
9link const now = new Date();
10link if (!lastReq || (lastReq - now) > 10000) { // --> if it has been 10 seconds since last request
11link lastReq = now; // --> update time of last request
12link next(); // --> pass to next handler (rest of the API)
13link } else {
14link res.status(429).send(); // --> otherwise, timeout
15link }
16link});
17link
18link// ...
19link// Rest of your API definition
20link// ...
21link
22linkexport default router;
This looks a bit hacky, since there is a global variable that is being updated by a middleware function. We could of-course clean this up a bit by writing a custom middleware function:
1linkexport function rateLimit(duration) {
2link let last;
3link
4link return (req, res, next) => {
5link const now = new Date();
6link if (!last || (last - now) > duration * 1000) {
7link last = now;
8link next();
9link } else {
10link res.status(429).send();
11link }
12link }
13link}
1linkimport * as express from 'express';
2linkimport { rateLimit } from './rate-limit'; // @see tab:middleware
3link
4link
5linkconst router = express.Router();
6link
7linkrouter.all('*', rateLimit(10));
8link
9link// ...
10link// Rest of your API definition
11link// ...
12link
13linkexport default router;
Now this seems much neater, but what we have implemented is a global rate limit for anyone accessing the API. In other words, if Bob access the API, then Alice cannot access it for 10 seconds after that as well.
To fix that, we would need to authenticate the user making each request, and for simplicity lets assume
we have an authenticate()
function that does exactly that job for us. Also let's assume that
authenticate()
basically populates req._.user
, which has an id, req._.user.id
.
Now our code would look like this:
1linkexport function rateLimit(duration) {
2link const last = {}; // --> keep a list of users with the timestamp of their last request
3link
4link return (req, res, next) => {
5link const now = new Date();
6link if (
7link !(req._.user.id in last) || // --> if the user has not made a request
8link (last[req._.user.id] - now) > duration * 1000 // --> or it has been 10 seconds since their last request
9link ) {
10link last[req._.user.id] = now; // --> update user's timestamp
11link next(); // --> handle user's request
12link } else {
13link res.status(429).send(); // --> otherwise, timeout
14link }
15link }
16link}
1linkimport * as express from 'express';
2linkimport { authenticate } from './auth';
3linkimport { rateLimit } from './rate-limit'; // @see tab:middleware
4link
5link
6linkconst router = express.Router();
7link
8linkrouter.all('*', authenticate(), rateLimit(10));
9link
10link// ...
11link// Rest of your API definition
12link// ...
13link
14linkexport default router;
Besides still not being neat, we still have the issue that our rateLimit()
middleware requires
another authentication middleware that does populate req._.user
object. Brushing off that, it has a more pressing
and practical problem: the last
object that the middleware creates is never cleaned up. There is an entry there for every
user that accesses the system, and without a cleanup mechanism it would keep growing in size, leading
to potential memory issues (or at least, over-usage).
To fix that, we could restructure our middleware to maintain a lock for each user and release it after a while:
1linkexport function rateLimit(duration) {
2link const lock = {};
3link return (req, res, next) => {
4link if (req._.user.id in lock) {
5link res.status(429).send();
6link } else {
7link next();
8link lock[req._.user.id] = true;
9link setTimeout(() => delete lock[req._.user.id], 1000 * duration);
10link }
11link }
12link}
Now our solution works (and our middleware is also pretty small), but it has the following issues:
The main reason our approach bears those issues is that we are looking at one request
at a time, using the middleware's closure (i.e. lock
or last
) as an in-mem way of communicating
between those instances when we are making a decision for each request.
The nature of rate limiting however has more to do with streams of request than each single request (it is a limit on the stream, not each single request). What we want to do is:
- Do authentication on each incoming request (as before)
- Split the incoming stream of requests per user
- Throttle each user-request stream by a certain duration (10 seconds)
That seems simple enough, but to achieve it we would need to be able to work with streams of requests. Express is however based on handling one request at a time (via a callback), so unfortunately we cannot have a code as neat as this simple description.
Or can't we?
Well, there is this neat library for working with streams called RxJS. And there is a nice little library that creates a request stream for us based on Express's routers, called RxXpress. Combining the two, our rate limiting code becomes something like this:
1linkimport { Router, next } from 'rxxpress';
2linkimport { throttleTime, groupBy, mergeMap } from 'rxjs/operators';
3link
4linkimport { authenticate } from './auth';
5link
6linkconst router = new Router();
7link
8linkrouter.all('*').pipe(
9link use(authenticate()), // --> conduct authentication
10link groupBy(({req}) => req._.user.id), // --> split request stream based on user
11link mergeMap(group => group.pipe(throttleTime(10000))), // --> throttle each split stream 10 seconds, then merge them together
12link next() // --> pass to next handler
13link).subscribe();
14link
15link// ...
16link// Rest of your API definition
17link// ...
18link
19linkexport default router.core;
Ok that DOES look like our neat stream-based description, but if you are not familiar with RxJS, thats a lot to unpack in one go. So lets break it down a bit:
1linkrouter.all('*').pipe(...).subscribe();
Basically with the RxXpress router, each route is a stream of requests instead of a function accepting a callback. Here we are basically saying that we want the stream of all requests (all methods, any path), and want to pipe it to some operators (each operator is simply a function that is applied on the stream, pretty similar to piping shell commands), and then subscribe to it (so we get those requests).
1linkrouter.all('*').pipe(
2link use(authenticate())
3link // ...
4link)
We simply use our nice authenticate()
middleware on
each incoming request.
1linkrouter.all('*').pipe(
2link // ...
3link groupBy(({req}) => req._.user.id)
4link // ...
5link)
The groupBy
operator splits our initial
stream of our requests based on each user. The end result is a stream of streams, i.e. a master stream, which emits
user request streams (a stream of requests by that user).
We are glossing over some details of
groupBy()
operator here. You can checkout the following piece for more details on how to usegroupBy()
in such a situation effectively:
1linkrouter.all('*').pipe(
2link // ...
3link mergeMap(group => group.pipe(throttleTime(10000)))
4link // ...
5link)
Ok this is a two-parter, so lets first look at the inner part:
1linkgroup => group.pipe(throttleTime(10000))
Remember how groupBy()
split our stream? Each split stream ends up here, in the group
variable. Each stream
is the stream of requests by each user, and we wanted to throttle it 10 seconds, so here is where
throttleTime()
operator becomes useful.
It simply passes on the first emission and drops the rest until given time is passed.
Now for the outer part:
1linkrouter.all('*').pipe(
2link // ...
3link mergeMap(...)
4link // ...
5link)
well the rest of our code doesn't need to be aware that we have split the original
stream of requests into separate per-user streams, so we need to merge those streams after throttling them
individually. That is where mergeMap()
comes into play, as it merges all those streams into a single stream of requests again.
1linkrouter.all('*').pipe(
2link // ...
3link next()
4link)
Well every request that gets to this point (and is not dropped by throttling), should be passed
to the next request handler in line, and that is precisely what next()
does.
So lets look at our two solutions side by side:
1linkimport { Router, next } from 'rxxpress';
2linkimport { throttleTime, groupBy, mergeMap } from 'rxjs/operators';
3link
4linkimport { authenticate } from './auth';
5link
6linkconst router = new Router();
7link
8linkrouter.all('*').pipe(
9link use(authenticate()), // --> conduct authentication
10link groupBy(({req}) => req._.user.id), // --> split request stream based on user
11link mergeMap(group => group.pipe(throttleTime(10000))), // --> throttle each split stream 10 seconds, then merge them together
12link next() // --> pass to next handler
13link).subscribe();
14link
15link// ...
16link// Rest of your API definition
17link// ...
18link
19linkexport default router.core;
1linkimport * as express from 'express';
2linkimport { authenticate } from './auth';
3linkimport { rateLimit } from './rate-limit'; // @see tab:middleware
4link
5link
6linkconst router = express.Router();
7link
8linkrouter.all('*', authenticate(), rateLimit(10));
9link
10link// ...
11link// Rest of your API definition
12link// ...
13link
14linkexport default router;
1linkexport function rateLimit(duration) {
2link const lock = {};
3link return (req, res, next) => {
4link if (req._.user.id in lock) {
5link res.status(400).send();
6link } else {
7link next();
8link lock[req._.user.id] = true;
9link setTimeout(() => delete lock[req._.user.id], 1000 * duration);
10link }
11link }
12link}
As you can see, the streaming solution is much more precise and elegant. However, not only it requires more knowledge of a library like RxJS, it generally demands thinking in terms of streams, which is not familiar for us programmers as most of our experience is with problems that we think imparatively about, i.e. in terms of instructions that are executed one after another.
This different paradigm of course becomes a serious barrier of entry for a lot of people, causing them to actually prefer the naive solution to the stream-based one. And that is OK. However, I hope seeing whats on the other side of that barrier encourages you to try to overcome that barrier and enjoy the elegance of streams and reactive programming.
Hero image by Makarios Tang from Unsplash
How to conduct a per-user rate limit on an Express-powered API? Well there is a naive solution, and there is a more elegant one which requires thinking in streams and reactive programming.
RxJS, Express, API, Rate Limit, Backend, Service, Stream, Reactive Programming, FRP