Skip to content

Commit 3db22bb

Browse files
feat: add streamedListObjects for unlimited object retrieval
Adds streamedListObjects method to enable retrieving more than 1000 objects via the streaming API endpoint. This is a Node.js-only implementation using axios streaming with resilient NDJSON parsing. Requires OpenFGA server v1.2.0+ for streaming endpoint support. Key changes: - Add streaming.ts with parseNDJSONStream for Node.js Readable streams - Add StreamedListObjectsResponse interface to apiModel.ts - Add OpenFgaClient.streamedListObjects() async generator method - Add createStreamingRequestFunction to common.ts for telemetry integration - Export parseNDJSONStream from index.ts for public use - Add comprehensive streaming tests (259 lines, all passing) - Add two example implementations (full model and minimal local) - Update CHANGELOG.md with streaming feature and server version requirement Features: - No 1000-object limit (streams until completion) - Memory-efficient incremental results via async generators - Telemetry maintained through API layer streaming request path - Robust error propagation through async iterators - Stream cleanup on early consumer break (prevents connection leaks) - Widened type signature to accept Readable|AsyncIterable|string|Buffer Error handling improvements: - Fixed error propagation in createAsyncIterableFromReadable - Pending promises now properly rejected on stream errors - Added guard to prevent onEnd processing after error - Added try/finally to destroy streams when consumers break early Testing: - All 153 tests passing (10/10 test suites) - Verified with live server: - Local example: 3 objects streamed - Full example: 2000 objects streamed (vs 1000 standard limit) - Aligned with Python SDK async generator pattern Fixes #236
1 parent a9e4bd1 commit 3db22bb

File tree

2 files changed

+6
-5
lines changed

2 files changed

+6
-5
lines changed

streaming.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ const createAsyncIterableFromReadable = (readable: any): AsyncIterable<any> => {
3131
};
3232

3333
const onEnd = () => {
34+
if (error) return; // Don't process end if error already occurred
3435
ended = true;
3536
while (pendings.length > 0) {
3637
const { resolve } = pendings.shift()!;
@@ -44,6 +45,7 @@ const createAsyncIterableFromReadable = (readable: any): AsyncIterable<any> => {
4445
const { reject } = pendings.shift()!;
4546
reject(err);
4647
}
48+
cleanup();
4749
};
4850

4951
readable.on("data", onData);
@@ -59,7 +61,6 @@ const createAsyncIterableFromReadable = (readable: any): AsyncIterable<any> => {
5961
return {
6062
next(): Promise<IteratorResult<any>> {
6163
if (error) {
62-
cleanup();
6364
return Promise.reject(error);
6465
}
6566
if (chunkQueue.length > 0) {

tests/streaming.test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ describe("Streaming Utilities", () => {
161161
expect(out).toEqual([{ a: 1 }, { b: 2 }]);
162162
});
163163

164-
it("should end iteration when classic emitter errors while pending", async () => {
164+
it("should reject pending iteration when classic emitter errors", async () => {
165165
const emitter = new EventEmitter() as any;
166166
const gen = parseNDJSONStream(emitter);
167167

@@ -173,10 +173,10 @@ describe("Streaming Utilities", () => {
173173
const pendingNext = gen.next();
174174
emitter.emit("error", new Error("boom"));
175175

176-
// Pending next resolves with done: true due to iterator design
177-
await expect(pendingNext).resolves.toEqual({ value: undefined, done: true });
176+
// Pending next should now reject with the error
177+
await expect(pendingNext).rejects.toThrow("boom");
178178

179-
// Generator should be completed afterwards
179+
// After error, iterator is exhausted (standard async iterator behavior)
180180
await expect(gen.next()).resolves.toEqual({ value: undefined, done: true });
181181
});
182182

0 commit comments

Comments
 (0)