Skip to content

Commit 4fad0b4

Browse files
committed
feat: Support running EventQL queries.
1 parent 96b451d commit 4fad0b4

File tree

5 files changed

+202
-0
lines changed

5 files changed

+202
-0
lines changed

README.md

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,42 @@ for await (const event of client.readEvents('/books/42', {
188188
controller.abort();
189189
```
190190

191+
### Running EventQL Queries
192+
193+
To run an EventQL query, call the `runEventQlQuery` function and provide the query as argument. The function returns an asynchronous iterator, which you can use e.g. inside a `for await` loop:
194+
195+
```typescript
196+
for await (const row of client.runEventQlQuery(`
197+
FROM e IN events
198+
PROJECT INTO e
199+
`)) {
200+
// ...
201+
}
202+
```
203+
204+
#### Aborting a Query
205+
206+
If you need to abort a query use `break` or `return` within the `await for` loop. However, this only works if there is currently an iteration going on.
207+
208+
To abort the query independently of that, hand over an abort signal as second argument when calling `runEventQlQuery`, and abort the appropriate AbortController:
209+
210+
```typescript
211+
const controller = new AbortController();
212+
213+
for await (const row of client.runEventQlQuery(`
214+
FROM e IN events
215+
PROJECT INTO e
216+
`, controller.signal)) {
217+
// ...
218+
}
219+
220+
// Somewhere else, abort the controller, which will cause
221+
// reading to end.
222+
controller.abort();
223+
```
224+
225+
*Note that each row returned by the iterator matches the projection specified in your query.*
226+
191227
### Observing Events
192228

193229
To observe all events of a subject, call the `observeEvents` function with the subject as the first argument and an options object as the second argument. Set the `recursive` option to `false`. This ensures that only events of the given subject are returned, not events of nested subjects.

src/Client.ts

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { isStreamCloudEvent } from './stream/isStreamCloudEvent.js';
1111
import { isStreamError } from './stream/isStreamError.js';
1212
import { isStreamEventType } from './stream/isStreamEventType.js';
1313
import { isStreamHeartbeat } from './stream/isStreamHeartbeat.js';
14+
import { isStreamRow } from './stream/isStreamRow.js';
1415
import { isStreamSubject } from './stream/isStreamSubject.js';
1516
import { hasShapeOf } from './types/hasShapeOf.js';
1617

@@ -190,6 +191,74 @@ class Client {
190191
})();
191192
}
192193

194+
public runEventQlQuery(query: string, signal?: AbortSignal): AsyncGenerator<unknown, void, void> {
195+
const url = this.#getUrl('/api/v1/run-eventql-query');
196+
const apiToken = this.#apiToken;
197+
198+
return (async function* () {
199+
const internalAbortController = new AbortController();
200+
const combinedSignal = signal ?? internalAbortController.signal;
201+
const shouldAbortInternally = !signal;
202+
203+
let removeAbortListener: (() => void) | undefined;
204+
205+
if (signal && !signal.aborted) {
206+
const onAbort = () => internalAbortController.abort();
207+
signal.addEventListener('abort', onAbort, { once: true });
208+
removeAbortListener = () => signal.removeEventListener('abort', onAbort);
209+
}
210+
211+
try {
212+
const response = await fetch(url, {
213+
method: 'post',
214+
headers: {
215+
authorization: `Bearer ${apiToken}`,
216+
'content-type': 'application/json',
217+
},
218+
body: JSON.stringify({ query }),
219+
signal: combinedSignal,
220+
});
221+
222+
if (response.status !== 200) {
223+
throw new Error(
224+
`Failed to run EventQL query, got HTTP status code '${response.status}', expected '200'.`,
225+
);
226+
}
227+
if (!response.body) {
228+
throw new Error('Failed to run EventQL query.');
229+
}
230+
231+
for await (const line of readNdJsonStream(response.body, combinedSignal)) {
232+
if (isStreamHeartbeat(line)) {
233+
continue;
234+
}
235+
if (isStreamError(line)) {
236+
throw new Error(`${line.payload.error}.`);
237+
}
238+
if (isStreamRow(line)) {
239+
const row = line.payload;
240+
yield row;
241+
continue;
242+
}
243+
244+
throw new Error('Failed to run EventQL query.');
245+
}
246+
} catch (error) {
247+
if (error instanceof DOMException && error.name === 'AbortError') {
248+
return;
249+
}
250+
throw error;
251+
} finally {
252+
if (removeAbortListener) {
253+
removeAbortListener();
254+
}
255+
if (shouldAbortInternally) {
256+
internalAbortController.abort();
257+
}
258+
}
259+
})();
260+
}
261+
193262
public observeEvents(
194263
subject: string,
195264
options: ObserveEventsOptions,

src/stream/StreamRow.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
interface StreamRow {
2+
type: 'row';
3+
payload: unknown;
4+
}
5+
6+
export type { StreamRow };

src/stream/isStreamRow.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import { hasShapeOf } from '../types/hasShapeOf.js';
2+
import type { StreamRow } from './StreamRow.js';
3+
4+
const blueprint: Omit<StreamRow, 'payload'> = {
5+
type: 'row',
6+
};
7+
8+
const isStreamRow = (line: unknown): line is StreamRow => {
9+
if (!hasShapeOf(line, blueprint)) {
10+
return false;
11+
}
12+
if (line.type !== 'row') {
13+
return false;
14+
}
15+
if (!('payload' in line)) {
16+
return false;
17+
}
18+
19+
return true;
20+
};
21+
22+
export { isStreamRow };

test/runEventQlQuery.test.ts

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import assert from 'node:assert/strict';
2+
import { afterEach, beforeEach, suite, test } from 'node:test';
3+
import type { EventCandidate } from '../src/EventCandidate.js';
4+
import { EventSourcingDbContainer } from '../src/EventSourcingDbContainer.js';
5+
6+
suite('runEventQlQuery', { timeout: 30_000 }, () => {
7+
let container: EventSourcingDbContainer;
8+
9+
beforeEach(async () => {
10+
container = new EventSourcingDbContainer();
11+
await container.start();
12+
});
13+
14+
afterEach(async () => {
15+
await container.stop();
16+
});
17+
18+
test('reads no rows if the query does not return any rows.', async (): Promise<void> => {
19+
const client = container.getClient();
20+
21+
let didReadRows = false;
22+
for await (const _row of client.runEventQlQuery('FROM e IN events PROJECT INTO e')) {
23+
didReadRows = true;
24+
}
25+
26+
assert.equal(didReadRows, false);
27+
});
28+
29+
test('reads all rows the query returns.', async (): Promise<void> => {
30+
const client = container.getClient();
31+
32+
const firstEvent: EventCandidate = {
33+
source: 'https://www.eventsourcingdb.io',
34+
subject: '/test',
35+
type: 'io.eventsourcingdb.test',
36+
data: {
37+
value: 23,
38+
},
39+
};
40+
41+
const secondEvent: EventCandidate = {
42+
source: 'https://www.eventsourcingdb.io',
43+
subject: '/test',
44+
type: 'io.eventsourcingdb.test',
45+
data: {
46+
value: 42,
47+
},
48+
};
49+
50+
await client.writeEvents([firstEvent, secondEvent]);
51+
52+
const rowsRead: unknown[] = [];
53+
for await (const row of client.runEventQlQuery('FROM e IN events PROJECT INTO e')) {
54+
rowsRead.push(row);
55+
}
56+
57+
assert.equal(rowsRead.length, 2);
58+
59+
// biome-ignore lint/suspicious/noExplicitAny: Here, the cast is okay.
60+
const firstRow = rowsRead[0] as any;
61+
assert.equal(firstRow.id, '0');
62+
assert.equal(firstRow.data.value, 23);
63+
64+
// biome-ignore lint/suspicious/noExplicitAny: Here, the cast is okay.
65+
const secondRow = rowsRead[1] as any;
66+
assert.equal(secondRow.id, '1');
67+
assert.equal(secondRow.data.value, 42);
68+
});
69+
});

0 commit comments

Comments
 (0)