Rate Limiting An Express App

Imagine we have an API with multiple end-points written in NodeJS and using Express. And we want to implement a rate limit on this API for each user (pretty common for any API).

The general code-structure of that would look like something this:

1linkimport * as express from 'express';

2link

3link

4linkconst router = express.Router();

5link

6linkrouter.all('*', (req, res, next) => {

7link // TODO: apply rate limiting

8link});

9link

10link// ...

11link// Rest of your API definition

12link// ...

13link

14linkexport default router;


linkNaive Solution

To apply a global 10 seconds rate limit, we could write the middleware code like this:

1linkimport * as express from 'express';

2link

3link

4linkconst router = express.Router();

5link

6linklet lastReq; // --> store the time of last request

7link

8linkrouter.all('*', (req, res, next) => {

9link const now = new Date();

10link if (!lastReq || (lastReq - now) > 10000) { // --> if it has been 10 seconds since last request

11link lastReq = now; // --> update time of last request

12link next(); // --> pass to next handler (rest of the API)

13link } else {

14link res.status(429).send(); // --> otherwise, timeout

15link }

16link});

17link

18link// ...

19link// Rest of your API definition

20link// ...

21link

22linkexport default router;

This looks a bit hacky, since there is a global variable that is being updated by a middleware function. We could of-course clean this up a bit by writing a custom middleware function:

1linkexport function rateLimit(duration) {

2link let last;

3link

4link return (req, res, next) => {

5link const now = new Date();

6link if (!last || (last - now) > duration * 1000) {

7link last = now;

8link next();

9link } else {

10link res.status(429).send();

11link }

12link }

13link}

1linkimport * as express from 'express';

2linkimport { rateLimit } from './rate-limit'; // @see tab:middleware

3link

4link

5linkconst router = express.Router();

6link

7linkrouter.all('*', rateLimit(10));

8link

9link// ...

10link// Rest of your API definition

11link// ...

12link

13linkexport default router;


Now this seems much neater, but what we have implemented is a global rate limit for anyone accessing the API. In other words, if Bob access the API, then Alice cannot access it for 10 seconds after that as well.

To fix that, we would need to authenticate the user making each request, and for simplicity lets assume we have an authenticate() function that does exactly that job for us. Also let's assume that authenticate() basically populates req._.user, which has an id, req._.user.id.

Now our code would look like this:

1linkexport function rateLimit(duration) {

2link const last = {}; // --> keep a list of users with the timestamp of their last request

3link

4link return (req, res, next) => {

5link const now = new Date();

6link if (

7link !(req._.user.id in last) || // --> if the user has not made a request

8link (last[req._.user.id] - now) > duration * 1000 // --> or it has been 10 seconds since their last request

9link ) {

10link last[req._.user.id] = now; // --> update user's timestamp

11link next(); // --> handle user's request

12link } else {

13link res.status(429).send(); // --> otherwise, timeout

14link }

15link }

16link}

1linkimport * as express from 'express';

2linkimport { authenticate } from './auth';

3linkimport { rateLimit } from './rate-limit'; // @see tab:middleware

4link

5link

6linkconst router = express.Router();

7link

8linkrouter.all('*', authenticate(), rateLimit(10));

9link

10link// ...

11link// Rest of your API definition

12link// ...

13link

14linkexport default router;

Besides still not being neat, we still have the issue that our rateLimit() middleware requires another authentication middleware that does populate req._.user object. Brushing off that, it has a more pressing and practical problem: the last object that the middleware creates is never cleaned up. There is an entry there for every user that accesses the system, and without a cleanup mechanism it would keep growing in size, leading to potential memory issues (or at least, over-usage).

To fix that, we could restructure our middleware to maintain a lock for each user and release it after a while:

1linkexport function rateLimit(duration) {

2link const lock = {};

3link return (req, res, next) => {

4link if (req._.user.id in lock) {

5link res.status(429).send();

6link } else {

7link next();

8link lock[req._.user.id] = true;

9link setTimeout(() => delete lock[req._.user.id], 1000 * duration);

10link }

11link }

12link}

Now our solution works (and our middleware is also pretty small), but it has the following issues:


linkStream-Based Solution

The main reason our approach bears those issues is that we are looking at one request at a time, using the middleware's closure (i.e. lock or last) as an in-mem way of communicating between those instances when we are making a decision for each request.

The nature of rate limiting however has more to do with streams of request than each single request (it is a limit on the stream, not each single request). What we want to do is:

That seems simple enough, but to achieve it we would need to be able to work with streams of requests. Express is however based on handling one request at a time (via a callback), so unfortunately we cannot have a code as neat as this simple description.

Or can't we?

Well, there is this neat library for working with streams called RxJS. And there is a nice little library that creates a request stream for us based on Express's routers, called RxXpress. Combining the two, our rate limiting code becomes something like this:

1linkimport { Router, next } from 'rxxpress';

2linkimport { throttleTime, groupBy, mergeMap } from 'rxjs/operators';

3link

4linkimport { authenticate } from './auth';

5link

6linkconst router = new Router();

7link

8linkrouter.all('*').pipe(

9link use(authenticate()), // --> conduct authentication

10link groupBy(({req}) => req._.user.id), // --> split request stream based on user

11link mergeMap(group => group.pipe(throttleTime(10000))), // --> throttle each split stream 10 seconds, then merge them together

12link next() // --> pass to next handler

13link).subscribe();

14link

15link// ...

16link// Rest of your API definition

17link// ...

18link

19linkexport default router.core;

Ok that DOES look like our neat stream-based description, but if you are not familiar with RxJS, thats a lot to unpack in one go. So lets break it down a bit:


1linkrouter.all('*').pipe(...).subscribe();

Basically with the RxXpress router, each route is a stream of requests instead of a function accepting a callback. Here we are basically saying that we want the stream of all requests (all methods, any path), and want to pipe it to some operators (each operator is simply a function that is applied on the stream, pretty similar to piping shell commands), and then subscribe to it (so we get those requests).


1linkrouter.all('*').pipe(

2link use(authenticate())

3link // ...

4link)

We simply use our nice authenticate() middleware on each incoming request.


1linkrouter.all('*').pipe(

2link // ...

3link groupBy(({req}) => req._.user.id)

4link // ...

5link)

The groupBy operator splits our initial stream of our requests based on each user. The end result is a stream of streams, i.e. a master stream, which emits user request streams (a stream of requests by that user).

We are glossing over some details of groupBy() operator here. You can checkout the following piece for more details on how to use groupBy() in such a situation effectively:


1linkrouter.all('*').pipe(

2link // ...

3link mergeMap(group => group.pipe(throttleTime(10000)))

4link // ...

5link)

Ok this is a two-parter, so lets first look at the inner part:

1linkgroup => group.pipe(throttleTime(10000))

Remember how groupBy() split our stream? Each split stream ends up here, in the group variable. Each stream is the stream of requests by each user, and we wanted to throttle it 10 seconds, so here is where throttleTime() operator becomes useful. It simply passes on the first emission and drops the rest until given time is passed.

Now for the outer part:

1linkrouter.all('*').pipe(

2link // ...

3link mergeMap(...)

4link // ...

5link)

well the rest of our code doesn't need to be aware that we have split the original stream of requests into separate per-user streams, so we need to merge those streams after throttling them individually. That is where mergeMap() comes into play, as it merges all those streams into a single stream of requests again.


1linkrouter.all('*').pipe(

2link // ...

3link next()

4link)

Well every request that gets to this point (and is not dropped by throttling), should be passed to the next request handler in line, and that is precisely what next() does.


linkPros & Cons

So lets look at our two solutions side by side:

1linkimport { Router, next } from 'rxxpress';

2linkimport { throttleTime, groupBy, mergeMap } from 'rxjs/operators';

3link

4linkimport { authenticate } from './auth';

5link

6linkconst router = new Router();

7link

8linkrouter.all('*').pipe(

9link use(authenticate()), // --> conduct authentication

10link groupBy(({req}) => req._.user.id), // --> split request stream based on user

11link mergeMap(group => group.pipe(throttleTime(10000))), // --> throttle each split stream 10 seconds, then merge them together

12link next() // --> pass to next handler

13link).subscribe();

14link

15link// ...

16link// Rest of your API definition

17link// ...

18link

19linkexport default router.core;

router code
1linkimport * as express from 'express';

2linkimport { authenticate } from './auth';

3linkimport { rateLimit } from './rate-limit'; // @see tab:middleware

4link

5link

6linkconst router = express.Router();

7link

8linkrouter.all('*', authenticate(), rateLimit(10));

9link

10link// ...

11link// Rest of your API definition

12link// ...

13link

14linkexport default router;

middleware code
1linkexport function rateLimit(duration) {

2link const lock = {};

3link return (req, res, next) => {

4link if (req._.user.id in lock) {

5link res.status(400).send();

6link } else {

7link next();

8link lock[req._.user.id] = true;

9link setTimeout(() => delete lock[req._.user.id], 1000 * duration);

10link }

11link }

12link}

As you can see, the streaming solution is much more precise and elegant. However, not only it requires more knowledge of a library like RxJS, it generally demands thinking in terms of streams, which is not familiar for us programmers as most of our experience is with problems that we think imparatively about, i.e. in terms of instructions that are executed one after another.

This different paradigm of course becomes a serious barrier of entry for a lot of people, causing them to actually prefer the naive solution to the stream-based one. And that is OK. However, I hope seeing whats on the other side of that barrier encourages you to try to overcome that barrier and enjoy the elegance of streams and reactive programming.



Hero image by Makarios Tang from Unsplash

Naive SolutionStream-Based SolutionPros & Cons

Home

On Reactive Programmingchevron_right
Yet Another Frontend Frameworkchevron_right
Other Articleschevron_right