Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
0c2a0ab
feat: add streamedListObjects for unlimited object retrieval
daniel-jonathan Oct 28, 2025
6fe722d
feat/streaming-only: Removed unneeded requirement in CHANGELOG.
daniel-jonathan Nov 3, 2025
b010a25
refactor: address code review feedback
daniel-jonathan Nov 3, 2025
3036f36
feat/streaming-only:: fixed merge conflicts from main
daniel-jonathan Nov 5, 2025
0279076
docs: enhance StreamedListObjects example README
daniel-jonathan Nov 10, 2025
71160b3
fix: batch tuple writes to respect 100-tuple limit
daniel-jonathan Nov 13, 2025
d149317
fix: batch tuple writes to respect 100-tuple limit
daniel-jonathan Nov 13, 2025
b2fa56f
chore: merge main into feat/streaming-only
daniel-jonathan Nov 13, 2025
9d593dd
fix: prevent logging sensitive data in error handler
daniel-jonathan Nov 13, 2025
d8ffd07
fix: remove field name from validation error logging
daniel-jonathan Nov 13, 2025
0d47948
refactor: Updated the example README based on review feedback
daniel-jonathan Nov 13, 2025
2d523ed
Merge branch 'main' into feat/streaming-only
daniel-jonathan Nov 19, 2025
5316902
Merge branch 'main' into feat/streaming-only
SoulPancake Nov 30, 2025
3c9274e
Merge branch 'main' into feat/streaming-only
SoulPancake Jan 12, 2026
195ba7c
Merge branch 'main' into feat/streaming-only
SoulPancake Jan 12, 2026
c80b8f5
Merge branch 'main' into feat/streaming-only
SoulPancake Jan 20, 2026
b929706
Merge branch 'main' into feat/streaming-only
SoulPancake Feb 2, 2026
a8cbf8e
Merge branch 'main' into feat/streaming-only
SoulPancake Feb 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@
## [Unreleased](https://github.com/openfga/js-sdk/compare/v0.9.0...HEAD)

- feat: add support for handling Retry-After header (#267)
- feat: streamedListObjects (streaming ListObjects) - Node.js only
- Enables retrieving >1000 objects beyond standard listObjects limit
- Requires OpenFGA server [v1.2.0+](https://github.com/openfga/openfga/releases/tag/v1.2.0)
- Uses axios streaming via API layer with preserved telemetry
- Resilient NDJSON parsing (supports async-iterable and event-based streams)
- Parses chunked data across multiple reads; handles Buffer/string inputs
- Adds example for usage: `example/streamed-list-objects`
- Adds example for local usage: `example/streamed-list-objects-local`

## v0.9.0

Expand Down
83 changes: 83 additions & 0 deletions api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
serializeDataIfNeeded,
toPathString,
createRequestFunction,
createStreamingRequestFunction,
RequestArgs,
CallResult,
PromiseResult
Expand Down Expand Up @@ -383,6 +384,45 @@ export const OpenFgaApiAxiosParamCreator = function (configuration: Configuratio
options: localVarRequestOptions,
};
},
/**
* The Streamed ListObjects API is very similar to the ListObjects API, with two differences:
* 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected.
* 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE.
* @summary Stream all objects of the given type that the user has a relation with
* @param {string} storeId
* @param {ListObjectsRequest} body
* @param {*} [options] Override http request option.
* @throws { FgaError }
*/
streamedListObjects: (storeId: string, body: ListObjectsRequest, options: any = {}): RequestArgs => {
// verify required parameter 'storeId' is not null or undefined
assertParamExists("streamedListObjects", "storeId", storeId);
// verify required parameter 'body' is not null or undefined
assertParamExists("streamedListObjects", "body", body);
const localVarPath = "/stores/{store_id}/streamed-list-objects"
.replace(`{${"store_id"}}`, encodeURIComponent(String(storeId)));
// use dummy base URL string because the URL constructor only accepts absolute URLs.
const localVarUrlObj = new URL(localVarPath, DUMMY_BASE_URL);
let baseOptions;
if (configuration) {
baseOptions = configuration.baseOptions;
}

const localVarRequestOptions = { method: "POST", ...baseOptions, ...options };
const localVarHeaderParameter = {} as any;
const localVarQueryParameter = {} as any;

localVarHeaderParameter["Content-Type"] = "application/json";

setSearchParams(localVarUrlObj, localVarQueryParameter, options.query);
localVarRequestOptions.headers = { ...localVarHeaderParameter, ...options.headers };
localVarRequestOptions.data = serializeDataIfNeeded(body, localVarRequestOptions);

return {
url: toPathString(localVarUrlObj),
options: localVarRequestOptions,
};
},
/**
* Returns a paginated list of OpenFGA stores and a continuation token to get additional stores. The continuation token will be empty if there are no more stores.
* @summary List all stores
Expand Down Expand Up @@ -912,6 +952,22 @@ export const OpenFgaApiFp = function(configuration: Configuration, credentials:
...TelemetryAttributes.fromRequestBody(body)
});
},
/**
* The Streamed ListObjects API is very similar to the ListObjects API, with two differences:
* 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected.
* 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE.
* @summary Stream all objects of the given type that the user has a relation with
* @param {string} storeId
* @param {ListObjectsRequest} body
* @param {*} [options] Override http request option.
* @throws { FgaError }
*/
async streamedListObjects(storeId: string, body: ListObjectsRequest, options?: any): Promise<(axios?: AxiosInstance) => Promise<any>> {
const localVarAxiosArgs = localVarAxiosParamCreator.streamedListObjects(storeId, body, options);
return createStreamingRequestFunction(localVarAxiosArgs, globalAxios, configuration, credentials, {
[TelemetryAttribute.FgaClientRequestMethod]: "StreamedListObjects"
});
},
/**
* Returns a paginated list of OpenFGA stores and a continuation token to get additional stores. The continuation token will be empty if there are no more stores.
* @summary List all stores
Expand Down Expand Up @@ -1156,6 +1212,19 @@ export const OpenFgaApiFactory = function (configuration: Configuration, credent
listObjects(storeId: string, body: ListObjectsRequest, options?: any): PromiseResult<ListObjectsResponse> {
return localVarFp.listObjects(storeId, body, options).then((request) => request(axios));
},
/**
* The Streamed ListObjects API is very similar to the ListObjects API, with two differences:
* 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected.
* 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE.
* @summary Stream all objects of the given type that the user has a relation with
* @param {string} storeId
* @param {ListObjectsRequest} body
* @param {*} [options] Override http request option.
* @throws { FgaError }
*/
streamedListObjects(storeId: string, body: ListObjectsRequest, options?: any): Promise<any> {
return localVarFp.streamedListObjects(storeId, body, options).then((request) => request(axios));
},
/**
* Returns a paginated list of OpenFGA stores and a continuation token to get additional stores. The continuation token will be empty if there are no more stores.
* @summary List all stores
Expand Down Expand Up @@ -1370,6 +1439,20 @@ export class OpenFgaApi extends BaseAPI {
return OpenFgaApiFp(this.configuration, this.credentials).listObjects(storeId, body, options).then((request) => request(this.axios));
}

/**
* The Streamed ListObjects API is very similar to the ListObjects API, with two differences:
* 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected.
* 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE.
* @summary Stream all objects of the given type that the user has a relation with
* @param {string} storeId
* @param {ListObjectsRequest} body
* @param {*} [options] Override http request option.
* @throws { FgaError }
*/
public streamedListObjects(storeId: string, body: ListObjectsRequest, options?: any): Promise<any> {
return OpenFgaApiFp(this.configuration, this.credentials).streamedListObjects(storeId, body, options).then((request) => request(this.axios));
}

/**
* Returns a paginated list of OpenFGA stores and a continuation token to get additional stores. The continuation token will be empty if there are no more stores.
* @summary List all stores
Expand Down
15 changes: 15 additions & 0 deletions apiModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,21 @@ export interface ListObjectsResponse {
*/
objects: Array<string>;
}

/**
* The response for a StreamedListObjects RPC.
* @export
* @interface StreamedListObjectsResponse
*/
export interface StreamedListObjectsResponse {
/**
*
* @type {string}
* @memberof StreamedListObjectsResponse
*/
object: string;
}

/**
*
* @export
Expand Down
47 changes: 47 additions & 0 deletions client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
GetStoreResponse,
ListObjectsRequest,
ListObjectsResponse,
StreamedListObjectsResponse,
ListStoresResponse,
ListUsersRequest,
ListUsersResponse,
Expand Down Expand Up @@ -48,6 +49,7 @@ import {
} from "./utils";
import { isWellFormedUlidString } from "./validation";
import SdkConstants from "./constants";
import { parseNDJSONStream } from "./streaming";

export type UserClientConfigurationParams = UserConfigurationParams & {
storeId?: string;
Expand Down Expand Up @@ -804,6 +806,51 @@ export class OpenFgaClient extends BaseAPI {
}, options);
}

/**
* StreamedListObjects - Stream all objects of a particular type that the user has a certain relation to (evaluates)
*
* Note: This method is Node.js only. Streams are supported via the axios API layer.
* The response will be streamed as newline-delimited JSON objects.
*
* @param {ClientListObjectsRequest} body
* @param {ClientRequestOptsWithConsistency} [options]
* @param {string} [options.authorizationModelId] - Overrides the authorization model id in the configuration
* @param {object} [options.headers] - Custom headers to send alongside the request
* @param {ConsistencyPreference} [options.consistency] - The consistency preference to use
* @param {object} [options.retryParams] - Override the retry parameters for this request
* @param {number} [options.retryParams.maxRetry] - Override the max number of retries on each API request
* @param {number} [options.retryParams.minWaitInMs] - Override the minimum wait before a retry is initiated
* @returns {AsyncGenerator<StreamedListObjectsResponse>} An async generator that yields objects as they are received
*/
async *streamedListObjects(body: ClientListObjectsRequest, options: ClientRequestOptsWithConsistency = {}): AsyncGenerator<StreamedListObjectsResponse> {
const stream = await this.api.streamedListObjects(this.getStoreId(options)!, {
authorization_model_id: this.getAuthorizationModelId(options),
user: body.user,
relation: body.relation,
type: body.type,
context: body.context,
contextual_tuples: { tuple_keys: body.contextualTuples || [] },
consistency: options.consistency
}, options);

// Unwrap axios CallResult to get the raw Node.js stream when needed
const source = stream?.$response?.data ?? stream;

// Parse the Node.js stream
try {
for await (const item of parseNDJSONStream(source as any)) {
if (item && item.result && item.result.object) {
yield { object: item.result.object } as StreamedListObjectsResponse;
}
}
} finally {
// Ensure underlying HTTP connection closes if consumer stops early
if (source && typeof source.destroy === "function") {
try { source.destroy(); } catch { }
}
}
}

/**
* ListRelations - List all the relations a user has with an object (evaluates)
* @param {object} listRelationsRequest
Expand Down
71 changes: 71 additions & 0 deletions common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,77 @@ export const createRequestFunction = function (axiosArgs: RequestArgs, axiosInst
);
}

return result;
};
};

/**
* creates an axios streaming request function that returns the raw response stream
* for incremental parsing (used by streamedListObjects)
*/
export const createStreamingRequestFunction = function (axiosArgs: RequestArgs, axiosInstance: AxiosInstance, configuration: Configuration, credentials: Credentials, methodAttributes: Record<string, string | number> = {}) {
configuration.isValid();

const retryParams = axiosArgs.options?.retryParams ? axiosArgs.options?.retryParams : configuration.retryParams;
const maxRetry: number = retryParams ? retryParams.maxRetry : 0;
const minWaitInMs: number = retryParams ? retryParams.minWaitInMs : 0;

const start = performance.now();

return async (axios: AxiosInstance = axiosInstance): Promise<any> => {
await setBearerAuthToObject(axiosArgs.options.headers, credentials!);

const url = configuration.getBasePath() + axiosArgs.url;

const axiosRequestArgs = { ...axiosArgs.options, responseType: "stream", url: url };
const wrappedResponse = await attemptHttpRequest(axiosRequestArgs, {
maxRetry,
minWaitInMs,
}, axios);
const response = wrappedResponse?.response;

const result: any = response?.data; // raw stream

let attributes: StringIndexable = {};

attributes = TelemetryAttributes.fromRequest({
userAgent: configuration.baseOptions?.headers["User-Agent"],
httpMethod: axiosArgs.options?.method,
url,
resendCount: wrappedResponse?.retries,
start: start,
credentials: credentials,
attributes: methodAttributes,
});

attributes = TelemetryAttributes.fromResponse({
response,
attributes,
});

const serverRequestDuration = attributes[TelemetryAttribute.HttpServerRequestDuration];
if (configuration.telemetry?.metrics?.histogramQueryDuration && typeof serverRequestDuration !== "undefined") {
configuration.telemetry.recorder.histogram(
TelemetryHistograms.queryDuration,
parseInt(attributes[TelemetryAttribute.HttpServerRequestDuration] as string, 10),
TelemetryAttributes.prepare(
attributes,
configuration.telemetry.metrics.histogramQueryDuration.attributes
)
);
}

if (configuration.telemetry?.metrics?.histogramRequestDuration) {
configuration.telemetry.recorder.histogram(
TelemetryHistograms.requestDuration,
attributes[TelemetryAttribute.HttpClientRequestDuration],
TelemetryAttributes.prepare(
attributes,
configuration.telemetry.metrics.histogramRequestDuration.attributes
)
);
}

return result;
};
};
21 changes: 21 additions & 0 deletions example/streamed-list-objects-local/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Streamed List Objects (Local)

This example demonstrates using the js-sdk `streamedListObjects` API against a locally running OpenFGA server that you manage yourself.

Prerequisites:
- Node.js 18+
- An OpenFGA server reachable at `FGA_API_URL` (defaults to `http://localhost:8080`)

Run:
1. From repo root, build the SDK once:
- `npm run build`
2. Set the API URL (optional) and run the example:
- `cd example/streamed-list-objects-local`
- `FGA_API_URL=http://localhost:8080 node streamedListObjectsLocal.mjs`

What it does:
- Creates a temporary store
- Writes a schema 1.1 model with an assignable relation
- Inserts 3 tuples
- Streams them via `streamedListObjects`
- Cleans up the store
62 changes: 62 additions & 0 deletions example/streamed-list-objects-local/streamedListObjectsLocal.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { ClientConfiguration, OpenFgaClient, ConsistencyPreference } from "../../dist/index.js";

const apiUrl = process.env.FGA_API_URL || "http://localhost:8080";

async function main() {
const client = new OpenFgaClient(new ClientConfiguration({ apiUrl }));

console.log("Creating temporary store");
const { id: storeId } = await client.createStore({ name: "streamed-list-objects-local" });

const clientWithStore = new OpenFgaClient(new ClientConfiguration({ apiUrl, storeId }));

const model = {
schema_version: "1.1",
type_definitions: [
{ type: "user" },
{
type: "document",
relations: { can_read: { this: {} } },
metadata: {
relations: {
can_read: {
directly_related_user_types: [{ type: "user" }]
}
}
}
}
]
};

console.log("Writing authorization model");
const { authorization_model_id: authorizationModelId } = await clientWithStore.writeAuthorizationModel(model);

const fga = new OpenFgaClient(new ClientConfiguration({ apiUrl, storeId, authorizationModelId }));

console.log("Writing tuples");
await fga.write({
writes: [
{ user: "user:anne", relation: "can_read", object: "document:1" },
{ user: "user:anne", relation: "can_read", object: "document:2" },
{ user: "user:anne", relation: "can_read", object: "document:3" }
]
});

console.log("Streaming objects...");
let count = 0;
for await (const _ of fga.streamedListObjects(
{ user: "user:anne", relation: "can_read", type: "document" },
{ consistency: ConsistencyPreference.HigherConsistency }
)) {
count++;
}
console.log(`\u2713 Streamed count: ${count}`);

console.log("Cleaning up...");
await fga.deleteStore();
console.log("Done");
}

main().catch(_err => {
process.exit(1);
});
Loading