Skip to content

Commit dfebaf0

Browse files
authored
Add ratelimit example (#241)
* Add ratelimit example * Add to readmes
1 parent 1a48899 commit dfebaf0

File tree

7 files changed

+441
-0
lines changed

7 files changed

+441
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ Or have a look at the general catalog below:
6969
| <a id="event-enrichment">Event Enrichment / Joins</a> | [<img src="https://skillicons.dev/icons?i=ts" width="24" height="24">](typescript/patterns-use-cases/README.md#event-enrichment--joins) [<img src="https://skillicons.dev/icons?i=go" width="24" height="24">](go/patterns-use-cases/README.md#event-enrichment--joins) [<img src="https://skillicons.dev/icons?i=python&theme=light" width="24" height="24">](python/patterns-use-cases/README.md#event-enrichment--joins) [<img src="https://skillicons.dev/icons?i=java&theme=light" width="24" height="24">](java/patterns-use-cases/README.md#event-enrichment--joins) [<img src="https://skillicons.dev/icons?i=kotlin&theme=light" width="24" height="24">](kotlin/patterns-use-cases/README.md#event-enrichment--joins) |
7070
| <a id="promise-as-a-service">Durable Promises as a Service</a> | [<img src="https://skillicons.dev/icons?i=ts" width="24" height="24">](typescript/patterns-use-cases/README.md#durable-promises-as-a-service) |
7171
| <a id="priority-queue">Priority Queue</a> | [<img src="https://skillicons.dev/icons?i=ts" width="24" height="24">](typescript/patterns-use-cases/README.md#priority-queue) |
72+
| <a id="priority-queue">Rate Limiting</a> | [<img src="https://skillicons.dev/icons?i=ts" width="24" height="24">](typescript/patterns-use-cases/README.md#rate-limiting) |
7273

7374
#### Integrations
7475

typescript/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ Common tasks and patterns implemented with Restate:
4141
#### Building coordination constructs (Advanced)
4242
- **[Durable Promises as a Service](patterns-use-cases/README.md#durable-promises-as-a-service)**: Building Promises/Futures as a service, that can be exposed to external clients and are durable across processes and failures. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](patterns-use-cases/src/promiseasaservice)
4343
- **[Priority Queue](patterns-use-cases/README.md#priority-queue)**: Example of implementing a priority queue to manage task execution order. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](patterns-use-cases/src/priorityqueue)
44+
- **[Rate Limiting](patterns-use-cases/README.md#rate-limiting)**: Example of implementing a token bucket rate limiter. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](patterns-use-cases/src/ratelimit)
4445

4546
## Integrations
4647

typescript/patterns-use-cases/README.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ Common tasks and patterns implemented with Restate:
2727
Use Restate to build distributed coordination and synchronization constructs:
2828
- **[Durable Promises as a Service](README.md#durable-promises-as-a-service)**: Building Promises/Futures as a service, that can be exposed to external clients and are durable across processes and failures. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/promiseasaservice)
2929
- **[Priority Queue](README.md#priority-queue)**: Example of implementing a priority queue to manage task execution order. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/priorityqueue)
30+
- **[Rate Limiting](README.md#rate-limiting)**: Example of implementing a token bucket rate limiter. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/ratelimit)
3031

3132
First, install the dependencies:
3233

@@ -841,4 +842,33 @@ As you do so, you can observe the logs; in flight requests will increase up to 1
841842
842843
You can write your own queue item selection logic in `selectAndPopItem`; doing so is outside the scope of this example.
843844
845+
</details>
846+
847+
## Rate limiting
848+
[<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/show-code.svg">](src/ratelimit)
849+
850+
An example of implementing a token bucket rate limiter using Restate state and the sleep primitive.
851+
852+
853+
<details>
854+
<summary><strong>Running the example</strong></summary>
855+
856+
Run the example with `npx tsx watch ./src/ratelimit/app.ts`.
857+
858+
Set up the limiter named `myService-expensiveMethod` with a rate limit of 1 per second:
859+
```shell
860+
curl localhost:8080/limiter/myService-expensiveMethod/setRate -H 'content-type:application/json' -d '{"newLimit": 1, "newBurst": 1}'
861+
```
862+
863+
You can send requests that are subject to the limiter like this:
864+
```shell
865+
# send one request
866+
curl localhost:8080/myService/expensiveMethod
867+
# send lots
868+
for i in $(seq 1 30); do curl localhost:8080/myService/expensiveMethod && echo "request completed"; done
869+
```
870+
871+
You should observe that only one request is processed per second. You can then try changing the limit or the burst
872+
and sending more requests.
873+
844874
</details>
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
import { endpoint } from "@restatedev/restate-sdk";
2+
3+
import { limiter } from "./limiter";
4+
import { myService } from "./service";
5+
6+
endpoint().bind(limiter).bind(myService).listen();
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
// a faithful reimplementation of https://pkg.go.dev/golang.org/x/time/rate#Limiter
2+
// using virtual object state
3+
4+
import { object, ObjectContext } from "@restatedev/restate-sdk";
5+
6+
type LimiterState = {
7+
state: LimiterStateInner;
8+
};
9+
type LimiterStateInner = {
10+
limit: number;
11+
burst: number;
12+
tokens: number;
13+
// last is the last time the limiter's tokens field was updated, in unix millis
14+
last: number;
15+
// lastEvent is the latest time of a rate-limited event (past or future), in unix millis
16+
lastEvent: number;
17+
};
18+
19+
export interface Reservation {
20+
ok: boolean;
21+
tokens: number;
22+
creationDate: number;
23+
dateToAct: number;
24+
// This is the Limit at reservation time, it can change later.
25+
limit: number;
26+
}
27+
28+
export const limiter = object({
29+
name: "limiter",
30+
handlers: {
31+
state: async (
32+
ctx: ObjectContext<LimiterState>,
33+
): Promise<LimiterStateInner> => {
34+
return getState(ctx);
35+
},
36+
tokens: async (ctx: ObjectContext<LimiterState>): Promise<number> => {
37+
// deterministic date not needed, as there is only an output entry
38+
const tokens = advance(await getState(ctx), Date.now());
39+
return tokens;
40+
},
41+
reserve: async (
42+
ctx: ObjectContext<LimiterState>,
43+
{
44+
n = 1,
45+
waitLimitMillis = Infinity,
46+
}: { n?: number; waitLimitMillis?: number },
47+
): Promise<Reservation> => {
48+
let lim = await getState(ctx);
49+
50+
if (lim.limit == Infinity) {
51+
// deterministic date is not necessary, as this is part of a response body, which won't be replayed.
52+
const now = Date.now();
53+
return {
54+
ok: true,
55+
tokens: n,
56+
creationDate: now,
57+
dateToAct: now,
58+
limit: 0,
59+
};
60+
}
61+
62+
let r: Reservation;
63+
({ lim, r } = await ctx.run(() => {
64+
const now = Date.now();
65+
let tokens = advance(lim, now);
66+
67+
// Calculate the remaining number of tokens resulting from the request.
68+
tokens -= n;
69+
70+
// Calculate the wait duration
71+
let waitDurationMillis = 0;
72+
if (tokens < 0) {
73+
waitDurationMillis = durationFromTokens(lim.limit, -tokens);
74+
}
75+
76+
// Decide result
77+
const ok = n <= lim.burst && waitDurationMillis <= waitLimitMillis;
78+
79+
// Prepare reservation
80+
const r = {
81+
ok,
82+
tokens: 0,
83+
creationDate: now,
84+
dateToAct: 0,
85+
limit: lim.limit,
86+
} satisfies Reservation;
87+
88+
if (ok) {
89+
r.tokens = n;
90+
r.dateToAct = now + waitDurationMillis;
91+
92+
// Update state
93+
lim.last = now;
94+
lim.tokens = tokens;
95+
lim.lastEvent = r.dateToAct;
96+
}
97+
98+
return { lim, r };
99+
}));
100+
101+
setState(ctx, lim);
102+
103+
return r;
104+
},
105+
setRate: async (
106+
ctx: ObjectContext<LimiterState>,
107+
{ newLimit, newBurst }: { newLimit?: number; newBurst?: number },
108+
) => {
109+
if (newLimit === undefined && newBurst === undefined) {
110+
return;
111+
}
112+
113+
let lim = await getState(ctx);
114+
115+
lim = await ctx.run(() => {
116+
const now = Date.now();
117+
const tokens = advance(lim, now);
118+
119+
lim.last = now;
120+
lim.tokens = tokens;
121+
if (newLimit !== undefined) lim.limit = newLimit;
122+
if (newBurst !== undefined) lim.burst = newBurst;
123+
124+
return lim;
125+
});
126+
127+
setState(ctx, lim);
128+
},
129+
cancelReservation: async (
130+
ctx: ObjectContext<LimiterState>,
131+
r: Reservation,
132+
) => {
133+
let lim = await getState(ctx);
134+
135+
lim = await ctx.run(() => {
136+
const now = Date.now();
137+
138+
if (lim.limit == Infinity || r.tokens == 0 || r.dateToAct < now) {
139+
return lim;
140+
}
141+
142+
// calculate tokens to restore
143+
// The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved
144+
// after r was obtained. These tokens should not be restored.
145+
const restoreTokens =
146+
r.tokens - tokensFromDuration(r.limit, lim.lastEvent - r.dateToAct);
147+
if (restoreTokens <= 0) {
148+
return lim;
149+
}
150+
// advance time to now
151+
let tokens = advance(lim, now);
152+
// calculate new number of tokens
153+
tokens += restoreTokens;
154+
if (tokens > lim.burst) {
155+
tokens = lim.burst;
156+
}
157+
// update state
158+
lim.last = now;
159+
lim.tokens = tokens;
160+
if (r.dateToAct == lim.lastEvent) {
161+
const prevEvent =
162+
r.dateToAct + durationFromTokens(r.limit, -r.tokens);
163+
if (prevEvent >= now) {
164+
lim.lastEvent = prevEvent;
165+
}
166+
}
167+
168+
return lim;
169+
});
170+
171+
setState(ctx, lim);
172+
},
173+
},
174+
});
175+
176+
function advance(lim: LimiterStateInner, date: number): number {
177+
let last = lim.last;
178+
if (date <= last) {
179+
last = date;
180+
}
181+
182+
// Calculate the new number of tokens, due to time that passed.
183+
const elapsedMillis = date - last;
184+
const delta = tokensFromDuration(lim.limit, elapsedMillis);
185+
let tokens = lim.tokens + delta;
186+
if (tokens > lim.burst) {
187+
tokens = lim.burst;
188+
}
189+
190+
return tokens;
191+
}
192+
193+
async function getState(
194+
ctx: ObjectContext<LimiterState>,
195+
): Promise<LimiterStateInner> {
196+
return (
197+
(await ctx.get("state")) ?? {
198+
limit: 0,
199+
burst: 0,
200+
tokens: 0,
201+
last: 0,
202+
lastEvent: 0,
203+
}
204+
);
205+
}
206+
207+
async function setState(
208+
ctx: ObjectContext<LimiterState>,
209+
lim: LimiterStateInner,
210+
) {
211+
ctx.set("state", lim);
212+
}
213+
214+
function durationFromTokens(limit: number, tokens: number): number {
215+
if (limit <= 0) {
216+
return Infinity;
217+
}
218+
219+
return (tokens / limit) * 1000;
220+
}
221+
222+
function tokensFromDuration(limit: number, durationMillis: number): number {
223+
if (limit <= 0) {
224+
return 0;
225+
}
226+
return (durationMillis / 1000) * limit;
227+
}
228+
229+
export type Limiter = typeof limiter;

0 commit comments

Comments
 (0)