Rate Limiting An Express App

Imagine we have an API with multiple end-points written in NodeJS and using Express. And we want to implement a rate limit on this API for each user (pretty common for any API).

The general code-structure of that would look like something this:

1import * as express from 'express';

2

3

4const router = express.Router();

5

6router.all('*', (req, res, next) => {

7 // TODO: apply rate limiting

8});

9

10// ...

11// Rest of your API definition

12// ...

13

14export default router;


linkNaive Solution

To apply a global 10 seconds rate limit, we could write the middleware code like this:

1import * as express from 'express';

2

3

4const router = express.Router();

5

6let lastReq; // --> store the time of last request

7

8router.all('*', (req, res, next) => {

9 const now = new Date();

10 if (!lastReq || (lastReq - now) > 10000) { // --> if it has been 10 seconds since last request

11 lastReq = now; // --> update time of last request

12 next(); // --> pass to next handler (rest of the API)

13 } else {

14 res.status(429).send(); // --> otherwise, timeout

15 }

16});

17

18// ...

19// Rest of your API definition

20// ...

21

22export default router;

This looks a bit hacky, since there is a global variable that is being updated by a middleware function. We could of-course clean this up a bit by writing a custom middleware function:

1export function rateLimit(duration) {

2 let last;

3

4 return (req, res, next) => {

5 const now = new Date();

6 if (!last || (last - now) > duration * 1000) {

7 last = now;

8 next();

9 } else {

10 res.status(429).send();

11 }

12 }

13}

1import * as express from 'express';

2import { rateLimit } from './rate-limit'; // @see tab:middleware

3

4

5const router = express.Router();

6

7router.all('*', rateLimit(10));

8

9// ...

10// Rest of your API definition

11// ...

12

13export default router;


Now this seems much neater, but what we have implemented is a global rate limit for anyone accessing the API. In other words, if Bob access the API, then Alice cannot access it for 10 seconds after that as well.

To fix that, we would need to authenticate the user making each request, and for simplicity lets assume we have an authenticate() function that does exactly that job for us. Also let's assume that authenticate() basically populates req._.user, which has an id, req._.user.id.

Now our code would look like this:

1export function rateLimit(duration) {

2 const last = {}; // --> keep a list of users with the timestamp of their last request

3

4 return (req, res, next) => {

5 const now = new Date();

6 if (

7 !(req._.user.id in last) || // --> if the user has not made a request

8 (last[req._.user.id] - now) > duration * 1000 // --> or it has been 10 seconds since their last request

9 ) {

10 last[req._.user.id] = now; // --> update user's timestamp

11 next(); // --> handle user's request

12 } else {

13 res.status(429).send(); // --> otherwise, timeout

14 }

15 }

16}

1import * as express from 'express';

2import { authenticate } from './auth';

3import { rateLimit } from './rate-limit'; // @see tab:middleware

4

5

6const router = express.Router();

7

8router.all('*', authenticate(), rateLimit(10));

9

10// ...

11// Rest of your API definition

12// ...

13

14export default router;

Besides still not being neat, we still have the issue that our rateLimit() middleware requires another authentication middleware that does populate req._.user object. Brushing off that, it has a more pressing and practical problem: the last object that the middleware creates is never cleaned up. There is an entry there for every user that accesses the system, and without a cleanup mechanism it would keep growing in size, leading to potential memory issues (or at least, over-usage).

To fix that, we could restructure our middleware to maintain a lock for each user and release it after a while:

1export function rateLimit(duration) {

2 const lock = {};

3 return (req, res, next) => {

4 if (req._.user.id in lock) {

5 res.status(429).send();

6 } else {

7 next();

8 lock[req._.user.id] = true;

9 setTimeout(() => delete lock[req._.user.id], 1000 * duration);

10 }

11 }

12}

Now our solution works (and our middleware is also pretty small), but it has the following issues:


linkStream-Based Solution

The main reason our approach bears those issues is that we are looking at one request at a time, using the middleware's closure (i.e. lock or last) as an in-mem way of communicating between those instances when we are making a decision for each request.

The nature of rate limiting however has more to do with streams of request than each single request (it is a limit on the stream, not each single request). What we want to do is:

That seems simple enough, but to achieve it we would need to be able to work with streams of requests. Express is however based on handling one request at a time (via a callback), so unfortunately we cannot have a code as neat as this simple description.

Or can't we?

Well, there is this neat library for working with streams called RxJS. And there is a nice little library that creates a request stream for us based on Express's routers, called RxXpress. Combining the two, our rate limiting code becomes something like this:

1import { Router, next } from 'rxxpress';

2import { throttleTime, groupBy, mergeMap } from 'rxjs/operators';

3

4import { authenticate } from './auth';

5

6const router = new Router();

7

8router.all('*').pipe(

9 use(authenticate()), // --> conduct authentication

10 groupBy(({req}) => req._.user.id), // --> split request stream based on user

11 mergeMap(group => group.pipe(throttleTime(10000))), // --> throttle each split stream 10 seconds, then merge them together

12 next() // --> pass to next handler

13).subscribe();

14

15// ...

16// Rest of your API definition

17// ...

18

19export default router.core;

Ok that DOES look like our neat stream-based description, but if you are not familiar with RxJS, thats a lot to unpack in one go. So lets break it down a bit:


1router.all('*').pipe(...).subscribe();

Basically with the RxXpress router, each route is a stream of requests instead of a function accepting a callback. Here we are basically saying that we want the stream of all requests (all methods, any path), and want to pipe it to some operators (each operator is simply a function that is applied on the stream, pretty similar to piping shell commands), and then subscribe to it (so we get those requests).


1router.all('*').pipe(

2 use(authenticate())

3 // ...

4)

We simply use our nice authenticate() middleware on each incoming request.


1router.all('*').pipe(

2 // ...

3 groupBy(({req}) => req._.user.id)

4 // ...

5)

The groupBy operator splits our initial stream of our requests based on each user. The end result is a stream of streams, i.e. a master stream, which emits user request streams (a stream of requests by that user).

We are glossing over some details of groupBy() operator here. You can checkout the following piece for more details on how to use groupBy() in such a situation effectively:


1router.all('*').pipe(

2 // ...

3 mergeMap(group => group.pipe(throttleTime(10000)))

4 // ...

5)

Ok this is a two-parter, so lets first look at the inner part:

1group => group.pipe(throttleTime(10000))

Remember how groupBy() split our stream? Each split stream ends up here, in the group variable. Each stream is the stream of requests by each user, and we wanted to throttle it 10 seconds, so here is where throttleTime() operator becomes useful. It simply passes on the first emission and drops the rest until given time is passed.

Now for the outer part:

1router.all('*').pipe(

2 // ...

3 mergeMap(...)

4 // ...

5)

well the rest of our code doesn't need to be aware that we have split the original stream of requests into separate per-user streams, so we need to merge those streams after throttling them individually. That is where mergeMap() comes into play, as it merges all those streams into a single stream of requests again.


1router.all('*').pipe(

2 // ...

3 next()

4)

Well every request that gets to this point (and is not dropped by throttling), should be passed to the next request handler in line, and that is precisely what next() does.


linkPros & Cons

So lets look at our two solutions side by side:

1import { Router, next } from 'rxxpress';

2import { throttleTime, groupBy, mergeMap } from 'rxjs/operators';

3

4import { authenticate } from './auth';

5

6const router = new Router();

7

8router.all('*').pipe(

9 use(authenticate()), // --> conduct authentication

10 groupBy(({req}) => req._.user.id), // --> split request stream based on user

11 mergeMap(group => group.pipe(throttleTime(10000))), // --> throttle each split stream 10 seconds, then merge them together

12 next() // --> pass to next handler

13).subscribe();

14

15// ...

16// Rest of your API definition

17// ...

18

19export default router.core;

router code
1import * as express from 'express';

2import { authenticate } from './auth';

3import { rateLimit } from './rate-limit'; // @see tab:middleware

4

5

6const router = express.Router();

7

8router.all('*', authenticate(), rateLimit(10));

9

10// ...

11// Rest of your API definition

12// ...

13

14export default router;

middleware code
1export function rateLimit(duration) {

2 const lock = {};

3 return (req, res, next) => {

4 if (req._.user.id in lock) {

5 res.status(400).send();

6 } else {

7 next();

8 lock[req._.user.id] = true;

9 setTimeout(() => delete lock[req._.user.id], 1000 * duration);

10 }

11 }

12}

As you can see, the streaming solution is much more precise and elegant. However, not only it requires more knowledge of a library like RxJS, it generally demands thinking in terms of streams, which is not familiar for us programmers as most of our experience is with problems that we think imparatively about, i.e. in terms of instructions that are executed one after another.

This different paradigm of course becomes a serious barrier of entry for a lot of people, causing them to actually prefer the naive solution to the stream-based one. And that is OK. However, I hope seeing whats on the other side of that barrier encourages you to try to overcome that barrier and enjoy the elegance of streams and reactive programming.



Hero image by Makarios Tang from Unsplash

Naive SolutionStream-Based SolutionPros & Cons

Home

On Reactive Programmingchevron_right
Yet Another Frontend Frameworkchevron_right
Other Articleschevron_right