diff --git a/.openapi-generator/FILES b/.openapi-generator/FILES index c692bd1c..c26e86a8 100644 --- a/.openapi-generator/FILES +++ b/.openapi-generator/FILES @@ -68,6 +68,8 @@ docs/RelationshipCondition.md docs/SourceInfo.md docs/Status.md docs/Store.md +docs/StreamResultOfStreamedListObjectsResponse.md +docs/StreamedListObjectsResponse.md docs/Tuple.md docs/TupleChange.md docs/TupleKey.md @@ -157,6 +159,8 @@ src/OpenFga.Sdk/Model/RelationshipCondition.cs src/OpenFga.Sdk/Model/SourceInfo.cs src/OpenFga.Sdk/Model/Status.cs src/OpenFga.Sdk/Model/Store.cs +src/OpenFga.Sdk/Model/StreamResultOfStreamedListObjectsResponse.cs +src/OpenFga.Sdk/Model/StreamedListObjectsResponse.cs src/OpenFga.Sdk/Model/Tuple.cs src/OpenFga.Sdk/Model/TupleChange.cs src/OpenFga.Sdk/Model/TupleKey.cs diff --git a/CHANGELOG.md b/CHANGELOG.md index 4190127f..f6fc9aed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## [Unreleased](https://github.com/openfga/dotnet-sdk/compare/v0.9.0...HEAD) +- feat: add support for [StreamedListObjects](https://openfga.dev/api/service#/Relationship%20Queries/StreamedListObjects). See [documentation](#streamed-list-objects) + ## v0.9.0 ### [0.9.0](https://github.com/openfga/dotnet-sdk/compare/v0.8.0...v0.9.0) (2025-12-01) diff --git a/README.md b/README.md index 28eb95df..948bbaa1 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,7 @@ This is an autogenerated SDK for OpenFGA. It provides a wrapper around the [Open - [Client Batch Check](#client-batch-check) - [Expand](#expand) - [List Objects](#list-objects) + - [Streamed List Objects](#streamed-list-objects) - [List Relations](#list-relations) - [List Users](#list-users) - [Assertions](#assertions) @@ -190,10 +191,6 @@ namespace Example { Credentials = new Credentials() { Method = CredentialsMethod.ClientCredentials, Config = new CredentialsConfig() { - // API Token Issuer can contain: - // - a scheme, defaults to https - // - a path, defaults to /oauth/token - // - a port ApiTokenIssuer = Environment.GetEnvironmentVariable("FGA_API_TOKEN_ISSUER"), ApiAudience = Environment.GetEnvironmentVariable("FGA_API_AUDIENCE"), ClientId = Environment.GetEnvironmentVariable("FGA_CLIENT_ID"), @@ -844,6 +841,38 @@ var response = await fgaClient.ListObjects(body, options); // response.Objects = ["document:0192ab2a-d83f-756d-9397-c5ed9f3cb69a"] ``` +##### Streamed List Objects + +List objects of a particular type that the user has access to, using the streaming API. + +The Streamed ListObjects API is very similar to the ListObjects API, with two key differences: +1. **Streaming Results**: Instead of collecting all objects before returning a response, it streams them to the client as they are collected. +2. **No Pagination Limit**: Returns all results without the 1000-object limit of the standard ListObjects API. + +This is particularly useful when querying **computed relations** that may return large result sets. + +[API Documentation](https://openfga.dev/api/service#/Relationship%20Queries/StreamedListObjects) + +```csharp +var options = new ClientListObjectsOptions { + AuthorizationModelId = "01GXSA8YR785C4FYS3C0RTG7B1", + Consistency = ConsistencyPreference.HIGHERCONSISTENCY +}; + +var objects = new List(); +await foreach (var response in fgaClient.StreamedListObjects( + new ClientListObjectsRequest { + User = "user:anne", + Relation = "can_read", + Type = "document" + }, + options)) { + objects.Add(response.Object); +} + +// objects = ["document:0192ab2a-d83f-756d-9397-c5ed9f3cb69a"] +``` + ##### List Relations List the relations a user has on an object. @@ -1003,6 +1032,7 @@ namespace Example { | [**ReadAuthorizationModel**](docs/OpenFgaApi.md#readauthorizationmodel) | **GET** /stores/{store_id}/authorization-models/{id} | Return a particular version of an authorization model | | [**ReadAuthorizationModels**](docs/OpenFgaApi.md#readauthorizationmodels) | **GET** /stores/{store_id}/authorization-models | Return all the authorization models for a particular store | | [**ReadChanges**](docs/OpenFgaApi.md#readchanges) | **GET** /stores/{store_id}/changes | Return a list of all the tuple changes | +| [**StreamedListObjects**](docs/OpenFgaApi.md#streamedlistobjects) | **POST** /stores/{store_id}/streamed-list-objects | Stream all objects of the given type that the user has a relation with | | [**Write**](docs/OpenFgaApi.md#write) | **POST** /stores/{store_id}/write | Add or delete tuples from the store | | [**WriteAssertions**](docs/OpenFgaApi.md#writeassertions) | **PUT** /stores/{store_id}/assertions/{authorization_model_id} | Upsert assertions for an authorization model ID | | [**WriteAuthorizationModel**](docs/OpenFgaApi.md#writeauthorizationmodel) | **POST** /stores/{store_id}/authorization-models | Create a new authorization model | @@ -1069,6 +1099,8 @@ namespace Example { - [Model.SourceInfo](docs/SourceInfo.md) - [Model.Status](docs/Status.md) - [Model.Store](docs/Store.md) + - [Model.StreamResultOfStreamedListObjectsResponse](docs/StreamResultOfStreamedListObjectsResponse.md) + - [Model.StreamedListObjectsResponse](docs/StreamedListObjectsResponse.md) - [Model.Tuple](docs/Tuple.md) - [Model.TupleChange](docs/TupleChange.md) - [Model.TupleKey](docs/TupleKey.md) diff --git a/docs/OpenFgaApi.md b/docs/OpenFgaApi.md index 5cb8b3fc..ba96938e 100644 --- a/docs/OpenFgaApi.md +++ b/docs/OpenFgaApi.md @@ -18,6 +18,7 @@ Method | HTTP request | Description [**ReadAuthorizationModel**](OpenFgaApi.md#readauthorizationmodel) | **GET** /stores/{store_id}/authorization-models/{id} | Return a particular version of an authorization model [**ReadAuthorizationModels**](OpenFgaApi.md#readauthorizationmodels) | **GET** /stores/{store_id}/authorization-models | Return all the authorization models for a particular store [**ReadChanges**](OpenFgaApi.md#readchanges) | **GET** /stores/{store_id}/changes | Return a list of all the tuple changes +[**StreamedListObjects**](OpenFgaApi.md#streamedlistobjects) | **POST** /stores/{store_id}/streamed-list-objects | Stream all objects of the given type that the user has a relation with [**Write**](OpenFgaApi.md#write) | **POST** /stores/{store_id}/write | Add or delete tuples from the store [**WriteAssertions**](OpenFgaApi.md#writeassertions) | **PUT** /stores/{store_id}/assertions/{authorization_model_id} | Upsert assertions for an authorization model ID [**WriteAuthorizationModel**](OpenFgaApi.md#writeauthorizationmodel) | **POST** /stores/{store_id}/authorization-models | Create a new authorization model @@ -1160,6 +1161,87 @@ Name | Type | Description | Notes [[Back to top]](#) [[Back to API list]](../README.md#api-endpoints) [[Back to Model list]](../README.md#models) [[Back to README]](../README.md) + +# **StreamedListObjects** +> StreamResultOfStreamedListObjectsResponse StreamedListObjects (ListObjectsRequest body) + +Stream all objects of the given type that the user has a relation with + +The Streamed ListObjects API is very similar to the the ListObjects API, with two differences: 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + +### Example +```csharp +using System.Collections.Generic; +using System.Diagnostics; +using System.Net.Http; +using OpenFga.Sdk.Api; +using OpenFga.Sdk.Client; +using OpenFga.Sdk.Configuration; +using OpenFga.Sdk.Model; + +namespace Example +{ + public class StreamedListObjectsExample + { + public static void Main() + { + var configuration = new Configuration() { + ApiScheme = Environment.GetEnvironmentVariable("OPENFGA_API_SCHEME"), // optional, defaults to "https" + ApiHost = Environment.GetEnvironmentVariable("OPENFGA_API_HOST"), // required, define without the scheme (e.g. api.fga.example instead of https://api.fga.example) + StoreId = Environment.GetEnvironmentVariable("OPENFGA_STORE_ID"), // not needed when calling `CreateStore` or `ListStores` + }; + HttpClient httpClient = new HttpClient(); + var openFgaApi = new OpenFgaApi(config, httpClient); + var body = new ListObjectsRequest(); // ListObjectsRequest | + + try + { + // Stream all objects of the given type that the user has a relation with + StreamResultOfStreamedListObjectsResponse response = await openFgaApi.StreamedListObjects(body); + Debug.WriteLine(response); + } + catch (ApiException e) + { + Debug.Print("Exception when calling OpenFgaApi.StreamedListObjects: " + e.Message ); + Debug.Print("Status Code: "+ e.ErrorCode); + Debug.Print(e.StackTrace); + } + } + } +} +``` + +### Parameters + +Name | Type | Description | Notes +------------- | ------------- | ------------- | ------------- + + **body** | [**ListObjectsRequest**](ListObjectsRequest.md)| | + +### Return type + +[**StreamResultOfStreamedListObjectsResponse**](StreamResultOfStreamedListObjectsResponse.md) + +### HTTP request headers + + - **Content-Type**: application/json + - **Accept**: application/json + + +### HTTP response details +| Status code | Description | Response headers | +|-------------|-------------|------------------| +| **200** | A successful response.(streaming responses) | - | +| **400** | Request failed due to invalid input. | - | +| **401** | Not authenticated. | - | +| **403** | Forbidden. | - | +| **404** | Request failed due to incorrect path. | - | +| **409** | Request was aborted due a transaction conflict. | - | +| **422** | Request timed out due to excessive request throttling. | - | +| **500** | Request failed due to internal server error. | - | + +[[Back to top]](#) [[Back to API list]](../README.md#api-endpoints) [[Back to Model list]](../README.md#models) [[Back to README]](../README.md) + # **Write** > Object Write (WriteRequest body) diff --git a/docs/StreamResultOfStreamedListObjectsResponse.md b/docs/StreamResultOfStreamedListObjectsResponse.md new file mode 100644 index 00000000..6c2f6805 --- /dev/null +++ b/docs/StreamResultOfStreamedListObjectsResponse.md @@ -0,0 +1,11 @@ +# OpenFga.Sdk.Model.StreamResultOfStreamedListObjectsResponse + +## Properties + +Name | Type | Description | Notes +------------ | ------------- | ------------- | ------------- +**Result** | [**StreamedListObjectsResponse**](StreamedListObjectsResponse.md) | | [optional] +**Error** | [**Status**](Status.md) | | [optional] + +[[Back to Model list]](../README.md#models) [[Back to API list]](../README.md#api-endpoints) [[Back to README]](../README.md) + diff --git a/docs/StreamedListObjectsResponse.md b/docs/StreamedListObjectsResponse.md new file mode 100644 index 00000000..327b798f --- /dev/null +++ b/docs/StreamedListObjectsResponse.md @@ -0,0 +1,11 @@ +# OpenFga.Sdk.Model.StreamedListObjectsResponse +The response for a StreamedListObjects RPC. + +## Properties + +Name | Type | Description | Notes +------------ | ------------- | ------------- | ------------- +**Object** | **string** | | + +[[Back to Model list]](../README.md#models) [[Back to API list]](../README.md#api-endpoints) [[Back to README]](../README.md) + diff --git a/example/StreamedListObjectsExample/README.md b/example/StreamedListObjectsExample/README.md new file mode 100644 index 00000000..602262f2 --- /dev/null +++ b/example/StreamedListObjectsExample/README.md @@ -0,0 +1,119 @@ +# Streamed List Objects Example + +Demonstrates using `StreamedListObjects` to retrieve objects via the streaming API in the .NET SDK. + +## What is StreamedListObjects? + +The Streamed ListObjects API is very similar to the ListObjects API, with two key differences: + +1. **Streaming Results**: Instead of collecting all objects before returning a response, it streams them to the client as they are collected. +2. **No Pagination Limit**: Returns all results without the 1000-object limit of the standard ListObjects API. + +This makes it ideal for scenarios where you need to retrieve large numbers of objects, especially when querying computed relations. + +## Prerequisites + +- .NET 6.0 or higher +- OpenFGA server running on `http://localhost:8080` (or set `FGA_API_URL`) + +## Running + +```bash +# From the SDK root directory, build the SDK first +dotnet build src/OpenFga.Sdk/OpenFga.Sdk.csproj + +# Then run the example +cd example/StreamedListObjectsExample +dotnet run +``` + +## What it does + +- Creates a temporary store +- Writes an authorization model with **computed relations** +- Adds 2000 tuples (1000 owners + 1000 viewers) +- Queries the **computed `can_read` relation** via `StreamedListObjects` +- Shows all 2000 results (demonstrating computed relations) +- Shows progress (first 3 objects and every 500th) +- Cleans up the store + +## Authorization Model + +The example demonstrates OpenFGA's **computed relations**: + +``` +type user + +type document + relations + define owner: [user] + define viewer: [user] + define can_read: owner or viewer +``` + +**Why this matters:** +- We write tuples to `owner` and `viewer` (base permissions) +- We query `can_read` (computed from owner OR viewer) + +**Example flow:** +1. Write: `user:anne owner document:1-1000` +2. Write: `user:anne viewer document:1001-2000` +3. Query: `StreamedListObjects(user:anne, relation:can_read, type:document)` +4. Result: All 2000 documents (because `can_read = owner OR viewer`) + +## Key Features Demonstrated + +### IAsyncEnumerable Pattern + +The `StreamedListObjects` method returns `IAsyncEnumerable`, which is the idiomatic .NET way to handle streaming data: + +```csharp +await foreach (var response in fgaClient.StreamedListObjects(request)) { + Console.WriteLine($"Received: {response.Object}"); +} +``` + +### Early Break and Cleanup + +The streaming implementation properly handles early termination: + +```csharp +await foreach (var response in fgaClient.StreamedListObjects(request)) { + Console.WriteLine(response.Object); + if (someCondition) { + break; // Stream is automatically cleaned up + } +} +``` + +### Cancellation Support + +Full support for `CancellationToken`: + +```csharp +using var cts = new CancellationTokenSource(); +cts.CancelAfter(TimeSpan.FromSeconds(5)); + +try { + await foreach (var response in fgaClient.StreamedListObjects(request, options, cts.Token)) { + Console.WriteLine(response.Object); + } +} +catch (OperationCanceledException) { + Console.WriteLine("Operation cancelled"); +} +``` + +## Benefits Over ListObjects + +- **No Pagination**: Retrieve all objects in a single streaming request +- **Lower Memory**: Objects are processed as they arrive, not held in memory +- **Early Termination**: Can stop streaming at any point without wasting resources +- **Better for Large Results**: Ideal when expecting hundreds or thousands of objects + +## Performance Considerations + +- Streaming starts immediately - no need to wait for all results +- HTTP connection remains open during streaming +- Properly handles cleanup if consumer stops early +- Supports all the same options as `ListObjects` (consistency, contextual tuples, etc.) diff --git a/example/StreamedListObjectsExample/StreamedListObjectsExample.cs b/example/StreamedListObjectsExample/StreamedListObjectsExample.cs new file mode 100644 index 00000000..80cb77bb --- /dev/null +++ b/example/StreamedListObjectsExample/StreamedListObjectsExample.cs @@ -0,0 +1,167 @@ +using OpenFga.Sdk.Client; +using OpenFga.Sdk.Client.Model; +using OpenFga.Sdk.Configuration; +using OpenFga.Sdk.Exceptions; +using OpenFga.Sdk.Model; + +namespace StreamedListObjectsExample; + +public class StreamedListObjectsExample { + public static async Task Main() { + try { + var apiUrl = Environment.GetEnvironmentVariable("FGA_API_URL") ?? "http://localhost:8080"; + + var client = new OpenFgaClient(new ClientConfiguration { ApiUrl = apiUrl }); + + Console.WriteLine("Creating temporary store"); + var store = await client.CreateStore(new ClientCreateStoreRequest { Name = "streamed-list-objects" }); + + var clientWithStore = new OpenFgaClient(new ClientConfiguration { + ApiUrl = apiUrl, + StoreId = store.Id + }); + + Console.WriteLine("Writing authorization model"); + var authModel = await clientWithStore.WriteAuthorizationModel(new ClientWriteAuthorizationModelRequest { + SchemaVersion = "1.1", + TypeDefinitions = new List { + new() { + Type = "user", + Relations = new Dictionary() + }, + new() { + Type = "document", + Relations = new Dictionary { + { + "owner", new Userset { + This = new object() + } + }, + { + "viewer", new Userset { + This = new object() + } + }, + { + "can_read", new Userset { + Union = new Usersets { + Child = new List { + new() { + ComputedUserset = new ObjectRelation { + Relation = "owner" + } + }, + new() { + ComputedUserset = new ObjectRelation { + Relation = "viewer" + } + } + } + } + } + } + }, + Metadata = new Metadata { + Relations = new Dictionary { + { + "owner", new RelationMetadata { + DirectlyRelatedUserTypes = new List { + new() { Type = "user" } + } + } + }, + { + "viewer", new RelationMetadata { + DirectlyRelatedUserTypes = new List { + new() { Type = "user" } + } + } + }, + { + "can_read", new RelationMetadata { + DirectlyRelatedUserTypes = new List() + } + } + } + } + } + } + }); + + var fga = new OpenFgaClient(new ClientConfiguration { + ApiUrl = apiUrl, + StoreId = store.Id, + AuthorizationModelId = authModel.AuthorizationModelId + }); + + Console.WriteLine("Writing tuples (1000 as owner, 1000 as viewer)"); + + // Write in batches of 100 (OpenFGA limit) + const int batchSize = 100; + int totalWritten = 0; + + // Write 1000 documents where anne is the owner + for (int batch = 0; batch < 10; batch++) { + var tuples = new List(); + for (int i = 1; i <= batchSize; i++) { + tuples.Add(new ClientTupleKey { + User = "user:anne", + Relation = "owner", + Object = $"document:{batch * batchSize + i}" + }); + } + await fga.WriteTuples(tuples); + totalWritten += tuples.Count; + } + + // Write 1000 documents where anne is a viewer + for (int batch = 0; batch < 10; batch++) { + var tuples = new List(); + for (int i = 1; i <= batchSize; i++) { + tuples.Add(new ClientTupleKey { + User = "user:anne", + Relation = "viewer", + Object = $"document:{1000 + batch * batchSize + i}" + }); + } + await fga.WriteTuples(tuples); + totalWritten += tuples.Count; + } + + Console.WriteLine($"Wrote {totalWritten} tuples"); + + Console.WriteLine("Streaming objects via computed 'can_read' relation..."); + var count = 0; + await foreach (var response in fga.StreamedListObjects( + new ClientListObjectsRequest { + User = "user:anne", + Relation = "can_read", // Computed: owner OR viewer + Type = "document" + }, + new ClientListObjectsOptions { + Consistency = ConsistencyPreference.HIGHERCONSISTENCY + })) { + count++; + if (count <= 3 || count % 500 == 0) { + Console.WriteLine($"- {response.Object}"); + } + } + Console.WriteLine($"✓ Streamed {count} objects"); + + Console.WriteLine("Cleaning up..."); + await fga.DeleteStore(); + Console.WriteLine("Done"); + } + catch (Exception ex) { + // Avoid logging sensitive data; only display generic info + if (ex is FgaValidationError) { + Console.Error.WriteLine("Validation error in configuration. Please check your configuration for errors."); + } else if (ex.Message?.Contains("Connection refused") == true || ex.InnerException?.Message?.Contains("Connection refused") == true) { + Console.Error.WriteLine("Is OpenFGA server running? Check FGA_API_URL environment variable or default http://localhost:8080"); + } else { + Console.Error.WriteLine($"An error occurred. [{ex.GetType().Name}]"); + } + Environment.Exit(1); + } + } +} diff --git a/example/StreamedListObjectsExample/StreamedListObjectsExample.csproj b/example/StreamedListObjectsExample/StreamedListObjectsExample.csproj new file mode 100644 index 00000000..8255d3e2 --- /dev/null +++ b/example/StreamedListObjectsExample/StreamedListObjectsExample.csproj @@ -0,0 +1,22 @@ + + + + Exe + net9.0 + enable + enable + Linux + + + + + + + + + + + + + + diff --git a/src/OpenFga.Sdk.Test/ApiClient/RetryHandlerTests.cs b/src/OpenFga.Sdk.Test/ApiClient/RetryHandlerTests.cs index cf0cbc3b..eee7a602 100644 --- a/src/OpenFga.Sdk.Test/ApiClient/RetryHandlerTests.cs +++ b/src/OpenFga.Sdk.Test/ApiClient/RetryHandlerTests.cs @@ -130,7 +130,7 @@ public async Task IsTransientError_TaskCanceledException_UserCancelled_ReturnsFa var attemptCount = 0; // Create TaskCanceledException with a cancelled token in a framework-compatible way - var cts = new CancellationTokenSource(); + using var cts = new CancellationTokenSource(); cts.Cancel(); TaskCanceledException exception; diff --git a/src/OpenFga.Sdk.Test/ApiClient/StreamingTests.cs b/src/OpenFga.Sdk.Test/ApiClient/StreamingTests.cs new file mode 100644 index 00000000..8753bf01 --- /dev/null +++ b/src/OpenFga.Sdk.Test/ApiClient/StreamingTests.cs @@ -0,0 +1,731 @@ +using Moq; +using Moq.Protected; +using OpenFga.Sdk.ApiClient; +using OpenFga.Sdk.Constants; +using OpenFga.Sdk.Exceptions; +using OpenFga.Sdk.Model; +using System; +using System.Collections.Generic; +using System.IO; +using System.Net; +using System.Net.Http; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace OpenFga.Sdk.Test.ApiClient; + +/// +/// Tests for NDJSON streaming functionality in BaseClient +/// +public class StreamingTests { + /// + /// Custom HttpContent that emits data in controlled chunks to test partial NDJSON handling + /// + private class ChunkedStreamContent : HttpContent { + private readonly string[] _chunks; + private readonly int _delayMs; + + public ChunkedStreamContent(string[] chunks, int delayMs = 0) { + _chunks = chunks; + _delayMs = delayMs; + Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue("application/x-ndjson"); + } + + protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context) { + foreach (var chunk in _chunks) { + if (_delayMs > 0) { + await Task.Delay(_delayMs); + } + var bytes = Encoding.UTF8.GetBytes(chunk); + await stream.WriteAsync(bytes, 0, bytes.Length); + await stream.FlushAsync(); + } + } + + protected override bool TryComputeLength(out long length) { + length = 0; + return false; // Unknown length, force streaming + } + } + + private Mock CreateMockHttpHandler(HttpStatusCode statusCode, string content, + string contentType = "application/x-ndjson") { + var mockHandler = new Mock(); + mockHandler.Protected() + .Setup>( + "SendAsync", + ItExpr.IsAny(), + ItExpr.IsAny() + ) + .ReturnsAsync(() => new HttpResponseMessage { + StatusCode = statusCode, + Content = new StringContent(content, Encoding.UTF8, contentType) + }); + return mockHandler; + } + + private Mock CreateMockHttpHandlerWithChunks(HttpStatusCode statusCode, string[] chunks, + int delayMs = 0) { + var mockHandler = new Mock(); + mockHandler.Protected() + .Setup>( + "SendAsync", + ItExpr.IsAny(), + ItExpr.IsAny() + ) + .ReturnsAsync(() => new HttpResponseMessage { + StatusCode = statusCode, + Content = new ChunkedStreamContent(chunks, delayMs) + }); + return mockHandler; + } + + [Fact] + public async Task SendStreamingRequestAsync_SingleLineNDJSON_ParsesCorrectly() { + var ndjson = "{\"result\":{\"object\":\"document:1\"}}\n"; + var mockHandler = CreateMockHttpHandler(HttpStatusCode.OK, ndjson); + var httpClient = new HttpClient(mockHandler.Object); + var config = new Sdk.Configuration.Configuration { ApiUrl = FgaConstants.TestApiUrl }; + var baseClient = new BaseClient(config, httpClient); + + var requestBuilder = new RequestBuilder { + Method = HttpMethod.Post, + BasePath = FgaConstants.TestApiUrl, + PathTemplate = "/test", + PathParameters = new Dictionary(), + QueryParameters = new Dictionary(), + Body = new { } + }; + + var results = new List(); + await foreach (var item in baseClient.SendStreamingRequestAsync( + requestBuilder, null, "Test")) { + results.Add(item); + } + + Assert.Single(results); + Assert.Equal("document:1", results[0].Object); + } + + [Fact] + public async Task SendStreamingRequestAsync_MultipleLineNDJSON_ParsesAllLines() { + var ndjson = "{\"result\":{\"object\":\"document:1\"}}\n" + + "{\"result\":{\"object\":\"document:2\"}}\n" + + "{\"result\":{\"object\":\"document:3\"}}\n"; + var mockHandler = CreateMockHttpHandler(HttpStatusCode.OK, ndjson); + var httpClient = new HttpClient(mockHandler.Object); + var config = new Sdk.Configuration.Configuration { ApiUrl = FgaConstants.TestApiUrl }; + var baseClient = new BaseClient(config, httpClient); + + var requestBuilder = new RequestBuilder { + Method = HttpMethod.Post, + BasePath = FgaConstants.TestApiUrl, + PathTemplate = "/test", + PathParameters = new Dictionary(), + QueryParameters = new Dictionary(), + Body = new { } + }; + + var results = new List(); + await foreach (var item in baseClient.SendStreamingRequestAsync( + requestBuilder, null, "Test")) { + results.Add(item); + } + + Assert.Equal(3, results.Count); + Assert.Equal("document:1", results[0].Object); + Assert.Equal("document:2", results[1].Object); + Assert.Equal("document:3", results[2].Object); + } + + [Fact] + public async Task SendStreamingRequestAsync_EmptyLines_SkipsEmptyLines() { + var ndjson = "{\"result\":{\"object\":\"document:1\"}}\n\n" + + "{\"result\":{\"object\":\"document:2\"}}\n"; + var mockHandler = CreateMockHttpHandler(HttpStatusCode.OK, ndjson); + var httpClient = new HttpClient(mockHandler.Object); + var config = new Sdk.Configuration.Configuration { ApiUrl = FgaConstants.TestApiUrl }; + var baseClient = new BaseClient(config, httpClient); + + var requestBuilder = new RequestBuilder { + Method = HttpMethod.Post, + BasePath = FgaConstants.TestApiUrl, + PathTemplate = "/test", + PathParameters = new Dictionary(), + QueryParameters = new Dictionary(), + Body = new { } + }; + + var results = new List(); + await foreach (var item in baseClient.SendStreamingRequestAsync( + requestBuilder, null, "Test")) { + results.Add(item); + } + + Assert.Equal(2, results.Count); + } + + [Fact] + public async Task SendStreamingRequestAsync_LastLineWithoutNewline_ParsesCorrectly() { + var ndjson = "{\"result\":{\"object\":\"document:1\"}}\n" + + "{\"result\":{\"object\":\"document:2\"}}"; // No trailing newline + var mockHandler = CreateMockHttpHandler(HttpStatusCode.OK, ndjson); + var httpClient = new HttpClient(mockHandler.Object); + var config = new Sdk.Configuration.Configuration { ApiUrl = FgaConstants.TestApiUrl }; + var baseClient = new BaseClient(config, httpClient); + + var requestBuilder = new RequestBuilder { + Method = HttpMethod.Post, + BasePath = FgaConstants.TestApiUrl, + PathTemplate = "/test", + PathParameters = new Dictionary(), + QueryParameters = new Dictionary(), + Body = new { } + }; + + var results = new List(); + await foreach (var item in baseClient.SendStreamingRequestAsync( + requestBuilder, null, "Test")) { + results.Add(item); + } + + Assert.Equal(2, results.Count); + Assert.Equal("document:1", results[0].Object); + Assert.Equal("document:2", results[1].Object); + } + + [Fact] + public async Task SendStreamingRequestAsync_CancellationToken_CancelsStream() { + var ndjson = "{\"result\":{\"object\":\"document:1\"}}\n" + + "{\"result\":{\"object\":\"document:2\"}}\n" + + "{\"result\":{\"object\":\"document:3\"}}\n"; + var mockHandler = CreateMockHttpHandler(HttpStatusCode.OK, ndjson); + var httpClient = new HttpClient(mockHandler.Object); + var config = new Sdk.Configuration.Configuration { ApiUrl = FgaConstants.TestApiUrl }; + var baseClient = new BaseClient(config, httpClient); + + var requestBuilder = new RequestBuilder { + Method = HttpMethod.Post, + BasePath = FgaConstants.TestApiUrl, + PathTemplate = "/test", + PathParameters = new Dictionary(), + QueryParameters = new Dictionary(), + Body = new { } + }; + + using var cts = new CancellationTokenSource(); + + var results = new List(); + await Assert.ThrowsAsync(async () => { + await foreach (var item in baseClient.SendStreamingRequestAsync( + requestBuilder, null, "Test", cts.Token)) { + results.Add(item); + if (results.Count == 1) { + cts.Cancel(); // Cancel after the first item + } + } + }); + + // Cancellation happens after the first item, but timing may allow more items before cancellation takes effect + Assert.True(results.Count >= 1, "At least one item should be processed before cancellation"); + } + + [Fact] + public async Task SendStreamingRequestAsync_HttpError_ThrowsException() { + var mockHandler = CreateMockHttpHandler(HttpStatusCode.InternalServerError, + "{\"code\":\"internal_error\",\"message\":\"Server error\"}", + "application/json"); + var httpClient = new HttpClient(mockHandler.Object); + var config = new Sdk.Configuration.Configuration { ApiUrl = FgaConstants.TestApiUrl }; + var baseClient = new BaseClient(config, httpClient); + + var requestBuilder = new RequestBuilder { + Method = HttpMethod.Post, + BasePath = FgaConstants.TestApiUrl, + PathTemplate = "/test", + PathParameters = new Dictionary(), + QueryParameters = new Dictionary(), + Body = new { } + }; + + await Assert.ThrowsAsync(async () => { + await foreach (var _ in baseClient.SendStreamingRequestAsync( + requestBuilder, null, "Test")) { + // Should not get here + } + }); + } + + [Fact] + public async Task SendStreamingRequestAsync_EarlyBreak_DisposesResourcesProperly() { + var ndjson = "{\"result\":{\"object\":\"document:1\"}}\n" + + "{\"result\":{\"object\":\"document:2\"}}\n" + + "{\"result\":{\"object\":\"document:3\"}}\n" + + "{\"result\":{\"object\":\"document:4\"}}\n" + + "{\"result\":{\"object\":\"document:5\"}}\n"; + var mockHandler = CreateMockHttpHandler(HttpStatusCode.OK, ndjson); + var httpClient = new HttpClient(mockHandler.Object); + var config = new Sdk.Configuration.Configuration { ApiUrl = FgaConstants.TestApiUrl }; + var baseClient = new BaseClient(config, httpClient); + + var requestBuilder = new RequestBuilder { + Method = HttpMethod.Post, + BasePath = FgaConstants.TestApiUrl, + PathTemplate = "/test", + PathParameters = new Dictionary(), + QueryParameters = new Dictionary(), + Body = new { } + }; + + var results = new List(); + await foreach (var item in baseClient.SendStreamingRequestAsync( + requestBuilder, null, "Test")) { + results.Add(item); + if (results.Count == 2) { + break; // Early termination + } + } + + Assert.Equal(2, results.Count); + // If we get here without exceptions, resources were disposed of properly + } + + [Fact] + public async Task SendStreamingRequestAsync_EmptyResponse_ReturnsNoResults() { + var ndjson = ""; + var mockHandler = CreateMockHttpHandler(HttpStatusCode.OK, ndjson); + var httpClient = new HttpClient(mockHandler.Object); + var config = new Sdk.Configuration.Configuration { ApiUrl = FgaConstants.TestApiUrl }; + var baseClient = new BaseClient(config, httpClient); + + var requestBuilder = new RequestBuilder { + Method = HttpMethod.Post, + BasePath = FgaConstants.TestApiUrl, + PathTemplate = "/test", + PathParameters = new Dictionary(), + QueryParameters = new Dictionary(), + Body = new { } + }; + + var results = new List(); + await foreach (var item in baseClient.SendStreamingRequestAsync( + requestBuilder, null, "Test")) { + results.Add(item); + } + + Assert.Empty(results); + } + + [Fact] + public async Task SendStreamingRequestAsync_WhitespaceOnlyLines_SkipsWhitespace() { + var ndjson = "{\"result\":{\"object\":\"document:1\"}}\n" + + " \n" + // Whitespace only + "{\"result\":{\"object\":\"document:2\"}}\n"; + var mockHandler = CreateMockHttpHandler(HttpStatusCode.OK, ndjson); + var httpClient = new HttpClient(mockHandler.Object); + var config = new Sdk.Configuration.Configuration { ApiUrl = FgaConstants.TestApiUrl }; + var baseClient = new BaseClient(config, httpClient); + + var requestBuilder = new RequestBuilder { + Method = HttpMethod.Post, + BasePath = FgaConstants.TestApiUrl, + PathTemplate = "/test", + PathParameters = new Dictionary(), + QueryParameters = new Dictionary(), + Body = new { } + }; + + var results = new List(); + await foreach (var item in baseClient.SendStreamingRequestAsync( + requestBuilder, null, "Test")) { + results.Add(item); + } + + Assert.Equal(2, results.Count); + } + + [Fact] + public async Task SendStreamingRequestAsync_InvalidJsonLine_SkipsInvalidLine() { + var ndjson = "{\"result\":{\"object\":\"document:1\"}}\n" + + "invalid json here\n" + + "{\"result\":{\"object\":\"document:2\"}}\n"; + var mockHandler = CreateMockHttpHandler(HttpStatusCode.OK, ndjson); + var httpClient = new HttpClient(mockHandler.Object); + var config = new Sdk.Configuration.Configuration { ApiUrl = FgaConstants.TestApiUrl }; + var baseClient = new BaseClient(config, httpClient); + + var requestBuilder = new RequestBuilder { + Method = HttpMethod.Post, + BasePath = FgaConstants.TestApiUrl, + PathTemplate = "/test", + PathParameters = new Dictionary(), + QueryParameters = new Dictionary(), + Body = new { } + }; + + var results = new List(); + // Should not throw, just skip invalid lines + await foreach (var item in baseClient.SendStreamingRequestAsync( + requestBuilder, null, "Test")) { + results.Add(item); + } + + // Assert - invalid line should be skipped + Assert.Equal(2, results.Count); + Assert.Equal("document:1", results[0].Object); + Assert.Equal("document:2", results[1].Object); + } + + // ============================================================ + // Partial NDJSON Handling Tests + // ============================================================ + + [Fact] + public async Task SendStreamingRequestAsync_ChunkedDataSplitsJsonMidObject_ParsesCorrectly() { + // This is the critical test case: data arrives in chunks that split JSON objects mid-line + // Simulates real-world streaming where network packets don't align with JSON boundaries + var chunks = new[] { + "{\"result\":{\"object\":\"document:1\"}}\n{\"res", // Ends mid-JSON object + "ult\":{\"object\":\"document:2\"}}\n" // Completes the JSON object + }; + + var mockHandler = CreateMockHttpHandlerWithChunks(HttpStatusCode.OK, chunks); + var httpClient = new HttpClient(mockHandler.Object); + var config = new Sdk.Configuration.Configuration { ApiUrl = FgaConstants.TestApiUrl }; + var baseClient = new BaseClient(config, httpClient); + + var requestBuilder = new RequestBuilder { + Method = HttpMethod.Post, + BasePath = FgaConstants.TestApiUrl, + PathTemplate = "/test", + PathParameters = new Dictionary(), + QueryParameters = new Dictionary(), + Body = new { } + }; + + var results = new List(); + await foreach (var item in baseClient.SendStreamingRequestAsync( + requestBuilder, null, "Test")) { + results.Add(item); + } + + Assert.Equal(2, results.Count); + Assert.Equal("document:1", results[0].Object); + Assert.Equal("document:2", results[1].Object); + } + + [Fact] + public async Task SendStreamingRequestAsync_ChunkedDataMultipleSplits_ParsesAllCorrectly() { + // Test multiple partial chunks across many small reads + var chunks = new[] { + "{\"result\":{\"ob", // First chunk: partial first object + "ject\":\"document:1\"}}\n{\"result", // Second chunk: completes first, starts second + "\":{\"object\":\"document:", // Third chunk: middle of second object + "2\"}}\n{\"result\":{\"object\":\"do", // Fourth chunk: completes second, starts third + "cument:3\"}}\n" // Fifth chunk: completes third + }; + + var mockHandler = CreateMockHttpHandlerWithChunks(HttpStatusCode.OK, chunks); + var httpClient = new HttpClient(mockHandler.Object); + var config = new Sdk.Configuration.Configuration { ApiUrl = FgaConstants.TestApiUrl }; + var baseClient = new BaseClient(config, httpClient); + + var requestBuilder = new RequestBuilder { + Method = HttpMethod.Post, + BasePath = FgaConstants.TestApiUrl, + PathTemplate = "/test", + PathParameters = new Dictionary(), + QueryParameters = new Dictionary(), + Body = new { } + }; + + var results = new List(); + await foreach (var item in baseClient.SendStreamingRequestAsync( + requestBuilder, null, "Test")) { + results.Add(item); + } + + Assert.Equal(3, results.Count); + Assert.Equal("document:1", results[0].Object); + Assert.Equal("document:2", results[1].Object); + Assert.Equal("document:3", results[2].Object); + } + + [Fact] + public async Task SendStreamingRequestAsync_ChunkedDataWithEmptyLines_SkipsEmptyLines() { + var chunks = new[] { + "{\"result\":{\"object\":\"document:1\"}}\n\n{\"r", // Has empty line, splits second object + "esult\":{\"object\":\"document:2\"}}\n" + }; + + var mockHandler = CreateMockHttpHandlerWithChunks(HttpStatusCode.OK, chunks); + var httpClient = new HttpClient(mockHandler.Object); + var config = new Sdk.Configuration.Configuration { ApiUrl = FgaConstants.TestApiUrl }; + var baseClient = new BaseClient(config, httpClient); + + var requestBuilder = new RequestBuilder { + Method = HttpMethod.Post, + BasePath = FgaConstants.TestApiUrl, + PathTemplate = "/test", + PathParameters = new Dictionary(), + QueryParameters = new Dictionary(), + Body = new { } + }; + + var results = new List(); + await foreach (var item in baseClient.SendStreamingRequestAsync( + requestBuilder, null, "Test")) { + results.Add(item); + } + + Assert.Equal(2, results.Count); + Assert.Equal("document:1", results[0].Object); + Assert.Equal("document:2", results[1].Object); + } + + [Fact] + public async Task SendStreamingRequestAsync_ChunkedDataLastChunkNoNewline_ParsesFinalObject() { + // Test that final buffered content is parsed even without trailing newline + var chunks = new[] { + "{\"result\":{\"object\":\"document:1\"}}\n{\"result\":{\"ob", + "ject\":\"document:2\"}}" // No trailing newline + }; + + var mockHandler = CreateMockHttpHandlerWithChunks(HttpStatusCode.OK, chunks); + var httpClient = new HttpClient(mockHandler.Object); + var config = new Sdk.Configuration.Configuration { ApiUrl = FgaConstants.TestApiUrl }; + var baseClient = new BaseClient(config, httpClient); + + var requestBuilder = new RequestBuilder { + Method = HttpMethod.Post, + BasePath = FgaConstants.TestApiUrl, + PathTemplate = "/test", + PathParameters = new Dictionary(), + QueryParameters = new Dictionary(), + Body = new { } + }; + + var results = new List(); + await foreach (var item in baseClient.SendStreamingRequestAsync( + requestBuilder, null, "Test")) { + results.Add(item); + } + + Assert.Equal(2, results.Count); + Assert.Equal("document:1", results[0].Object); + Assert.Equal("document:2", results[1].Object); + } + + [Fact] + public async Task SendStreamingRequestAsync_ChunkedDataInvalidPartialJson_SkipsInvalidAndContinues() { + // Test that invalid JSON in the final buffer is skipped gracefully + var chunks = new[] { + "{\"result\":{\"object\":\"document:1\"}}\n{\"result\":{\"ob", + "ject\":\"document:2\"}}\n{\"invalid\":" // Incomplete/invalid JSON at end + }; + + var mockHandler = CreateMockHttpHandlerWithChunks(HttpStatusCode.OK, chunks); + var httpClient = new HttpClient(mockHandler.Object); + var config = new Sdk.Configuration.Configuration { ApiUrl = FgaConstants.TestApiUrl }; + var baseClient = new BaseClient(config, httpClient); + + var requestBuilder = new RequestBuilder { + Method = HttpMethod.Post, + BasePath = FgaConstants.TestApiUrl, + PathTemplate = "/test", + PathParameters = new Dictionary(), + QueryParameters = new Dictionary(), + Body = new { } + }; + + var results = new List(); + await foreach (var item in baseClient.SendStreamingRequestAsync( + requestBuilder, null, "Test")) { + results.Add(item); + } + + // Should parse the two valid objects and skip the invalid trailing fragment + Assert.Equal(2, results.Count); + Assert.Equal("document:1", results[0].Object); + Assert.Equal("document:2", results[1].Object); + } + + [Fact] + public async Task SendStreamingRequestAsync_ChunkedDataInvalidLineInMiddle_SkipsAndContinues() { + // Test that invalid JSON in the middle is skipped + var chunks = new[] { + "{\"result\":{\"object\":\"document:1\"}}\ninvalid ", + "json line here\n{\"result\":{\"object\":\"document:2\"}}\n" + }; + + var mockHandler = CreateMockHttpHandlerWithChunks(HttpStatusCode.OK, chunks); + var httpClient = new HttpClient(mockHandler.Object); + var config = new Sdk.Configuration.Configuration { ApiUrl = FgaConstants.TestApiUrl }; + var baseClient = new BaseClient(config, httpClient); + + var requestBuilder = new RequestBuilder { + Method = HttpMethod.Post, + BasePath = FgaConstants.TestApiUrl, + PathTemplate = "/test", + PathParameters = new Dictionary(), + QueryParameters = new Dictionary(), + Body = new { } + }; + + var results = new List(); + await foreach (var item in baseClient.SendStreamingRequestAsync( + requestBuilder, null, "Test")) { + results.Add(item); + } + + Assert.Equal(2, results.Count); + Assert.Equal("document:1", results[0].Object); + Assert.Equal("document:2", results[1].Object); + } + + [Fact] + public async Task SendStreamingRequestAsync_VerySmallChunks_ParsesCorrectly() { + // Test with very small chunks (even single character chunks) + var chunks = new[] { + "{", "\"result\":{\"object\":\"document:1\"}}\n", + "{\"result\":{\"object\":", + "\"", + "document:2\"}", + "}\n" + }; + + var mockHandler = CreateMockHttpHandlerWithChunks(HttpStatusCode.OK, chunks); + var httpClient = new HttpClient(mockHandler.Object); + var config = new Sdk.Configuration.Configuration { ApiUrl = FgaConstants.TestApiUrl }; + var baseClient = new BaseClient(config, httpClient); + + var requestBuilder = new RequestBuilder { + Method = HttpMethod.Post, + BasePath = FgaConstants.TestApiUrl, + PathTemplate = "/test", + PathParameters = new Dictionary(), + QueryParameters = new Dictionary(), + Body = new { } + }; + + var results = new List(); + await foreach (var item in baseClient.SendStreamingRequestAsync( + requestBuilder, null, "Test")) { + results.Add(item); + } + + Assert.Equal(2, results.Count); + Assert.Equal("document:1", results[0].Object); + Assert.Equal("document:2", results[1].Object); + } + + [Fact] + public async Task SendStreamingRequestAsync_ChunkedDataWithLargeObjects_ParsesCorrectly() { + // Test with larger JSON objects to ensure buffer handles them correctly + var largeValue = new string('x', 10000); // 10KB of data + var chunks = new[] { + $"{{\"result\":{{\"object\":\"document:1\"}}}}\n{{\"result\":{{\"object\":\"", + $"{largeValue}", + "\"}}\n" + }; + + var mockHandler = CreateMockHttpHandlerWithChunks(HttpStatusCode.OK, chunks); + var httpClient = new HttpClient(mockHandler.Object); + var config = new Sdk.Configuration.Configuration { ApiUrl = FgaConstants.TestApiUrl }; + var baseClient = new BaseClient(config, httpClient); + + var requestBuilder = new RequestBuilder { + Method = HttpMethod.Post, + BasePath = FgaConstants.TestApiUrl, + PathTemplate = "/test", + PathParameters = new Dictionary(), + QueryParameters = new Dictionary(), + Body = new { } + }; + + var results = new List(); + await foreach (var item in baseClient.SendStreamingRequestAsync( + requestBuilder, null, "Test")) { + results.Add(item); + } + + Assert.Equal(2, results.Count); + Assert.Equal("document:1", results[0].Object); + Assert.Equal(largeValue, results[1].Object); + } + + [Fact] + public async Task SendStreamingRequestAsync_ChunkedDataCancellation_CancelsCorrectly() { + // Test cancellation with chunked streaming + var chunks = new[] { + "{\"result\":{\"object\":\"document:1\"}}\n{\"res", + "ult\":{\"object\":\"document:2\"}}\n", + "{\"result\":{\"object\":\"document:3\"}}\n" + }; + + var mockHandler = CreateMockHttpHandlerWithChunks(HttpStatusCode.OK, chunks, delayMs: 50); + var httpClient = new HttpClient(mockHandler.Object); + var config = new Sdk.Configuration.Configuration { ApiUrl = FgaConstants.TestApiUrl }; + var baseClient = new BaseClient(config, httpClient); + + var requestBuilder = new RequestBuilder { + Method = HttpMethod.Post, + BasePath = FgaConstants.TestApiUrl, + PathTemplate = "/test", + PathParameters = new Dictionary(), + QueryParameters = new Dictionary(), + Body = new { } + }; + + using var cts = new CancellationTokenSource(); + + var results = new List(); + await Assert.ThrowsAsync(async () => { + await foreach (var item in baseClient.SendStreamingRequestAsync( + requestBuilder, null, "Test", cts.Token)) { + results.Add(item); + if (results.Count == 1) { + cts.Cancel(); + } + } + }); + + Assert.True(results.Count >= 1); + } + + [Fact] + public async Task SendStreamingRequestAsync_ChunkedDataResultPropertyMissing_SkipsObject() { + // Test that objects without "result" property are skipped + var chunks = new[] { + "{\"result\":{\"object\":\"document:1\"}}\n{\"no", + "_result\":true}\n{\"result\":{\"object\":\"document:2\"}}\n" + }; + + var mockHandler = CreateMockHttpHandlerWithChunks(HttpStatusCode.OK, chunks); + var httpClient = new HttpClient(mockHandler.Object); + var config = new Sdk.Configuration.Configuration { ApiUrl = FgaConstants.TestApiUrl }; + var baseClient = new BaseClient(config, httpClient); + + var requestBuilder = new RequestBuilder { + Method = HttpMethod.Post, + BasePath = FgaConstants.TestApiUrl, + PathTemplate = "/test", + PathParameters = new Dictionary(), + QueryParameters = new Dictionary(), + Body = new { } + }; + + var results = new List(); + await foreach (var item in baseClient.SendStreamingRequestAsync( + requestBuilder, null, "Test")) { + results.Add(item); + } + + // Should only get the two objects with "result" property + Assert.Equal(2, results.Count); + Assert.Equal("document:1", results[0].Object); + Assert.Equal("document:2", results[1].Object); + } +} \ No newline at end of file diff --git a/src/OpenFga.Sdk.Test/Client/StreamedListObjectsTests.cs b/src/OpenFga.Sdk.Test/Client/StreamedListObjectsTests.cs new file mode 100644 index 00000000..e9fff1d9 --- /dev/null +++ b/src/OpenFga.Sdk.Test/Client/StreamedListObjectsTests.cs @@ -0,0 +1,466 @@ +using Moq; +using Moq.Protected; +using OpenFga.Sdk.Client; +using OpenFga.Sdk.Client.Model; +using OpenFga.Sdk.Constants; +using OpenFga.Sdk.Exceptions; +using OpenFga.Sdk.Model; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Net.Http; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace OpenFga.Sdk.Test.Client; + +/// +/// Integration tests for StreamedListObjects functionality +/// +public class StreamedListObjectsTests { + private const string StoreId = "01HVMMBYVFD2W7C21S9TW5XPWT"; + private const string AuthorizationModelId = "01HVMMBZ2EMDA86PXWBQJSVQFK"; + private static readonly string ApiUrl = FgaConstants.TestApiUrl; + + private Mock CreateMockHttpHandler( + HttpStatusCode statusCode, + string ndjsonContent, + Action? requestValidator = null) { + var mockHandler = new Mock(); + mockHandler.Protected() + .Setup>( + "SendAsync", + ItExpr.IsAny(), + ItExpr.IsAny() + ) + .ReturnsAsync((HttpRequestMessage req, CancellationToken ct) => { + requestValidator?.Invoke(req); + return new HttpResponseMessage { + StatusCode = statusCode, + Content = new StringContent(ndjsonContent, Encoding.UTF8, "application/x-ndjson") + }; + }); + return mockHandler; + } + + private string CreateNDJSONResponse(params string[] objects) { + return string.Join("\n", + objects.Select(obj => $"{{\"result\":{{\"object\":\"{obj}\"}}}}")) + "\n"; + } + + [Fact] + public async Task StreamedListObjects_BasicRequest_StreamsObjectsIncrementally() { + var objects = new[] { "document:1", "document:2", "document:3" }; + var ndjson = CreateNDJSONResponse(objects); + + var mockHandler = CreateMockHttpHandler(HttpStatusCode.OK, ndjson, + req => { + Assert.Equal(HttpMethod.Post, req.Method); + Assert.Contains($"/stores/{StoreId}/streamed-list-objects", req.RequestUri!.ToString()); + }); + + using var httpClient = new HttpClient(mockHandler.Object); + var config = new ClientConfiguration { + ApiUrl = ApiUrl, + StoreId = StoreId + }; + using var fgaClient = new OpenFgaClient(config, httpClient); + + var results = new List(); + await foreach (var response in fgaClient.StreamedListObjects( + new ClientListObjectsRequest { + User = "user:anne", + Relation = "can_read", + Type = "document" + })) { + results.Add(response.Object); + } + + Assert.Equal(3, results.Count); + Assert.Equal(objects, results.ToArray()); + } + + [Fact] + public async Task StreamedListObjects_WithAuthorizationModelId_IncludesModelIdInRequest() { + var objects = new[] { "document:1" }; + var ndjson = CreateNDJSONResponse(objects); + + var mockHandler = CreateMockHttpHandler(HttpStatusCode.OK, ndjson); + using var httpClient = new HttpClient(mockHandler.Object); + var config = new ClientConfiguration { + ApiUrl = ApiUrl, + StoreId = StoreId, + AuthorizationModelId = AuthorizationModelId + }; + using var fgaClient = new OpenFgaClient(config, httpClient); + + var results = new List(); + await foreach (var response in fgaClient.StreamedListObjects( + new ClientListObjectsRequest { + User = "user:anne", + Relation = "can_read", + Type = "document" + })) { + results.Add(response.Object); + } + + // Assert - Verify request completed successfully with authorization model ID configured + Assert.Single(results); + Assert.Equal("document:1", results[0]); + } + + [Fact] + public async Task StreamedListObjects_WithConsistency_IncludesConsistencyInRequest() { + var objects = new[] { "document:1" }; + var ndjson = CreateNDJSONResponse(objects); + + var mockHandler = CreateMockHttpHandler(HttpStatusCode.OK, ndjson); + using var httpClient = new HttpClient(mockHandler.Object); + var config = new ClientConfiguration { + ApiUrl = ApiUrl, + StoreId = StoreId + }; + using var fgaClient = new OpenFgaClient(config, httpClient); + + var results = new List(); + await foreach (var response in fgaClient.StreamedListObjects( + new ClientListObjectsRequest { + User = "user:anne", + Relation = "can_read", + Type = "document" + }, + new ClientListObjectsOptions { + Consistency = ConsistencyPreference.HIGHERCONSISTENCY + })) { + results.Add(response.Object); + } + + // Assert - Verify request completed successfully with consistency preference + Assert.Single(results); + Assert.Equal("document:1", results[0]); + } + + [Fact] + public async Task StreamedListObjects_WithContextualTuples_IncludesContextualTuplesInRequest() { + var objects = new[] { "document:1" }; + var ndjson = CreateNDJSONResponse(objects); + + var mockHandler = CreateMockHttpHandler(HttpStatusCode.OK, ndjson); + using var httpClient = new HttpClient(mockHandler.Object); + var config = new ClientConfiguration { + ApiUrl = ApiUrl, + StoreId = StoreId + }; + using var fgaClient = new OpenFgaClient(config, httpClient); + + var results = new List(); + await foreach (var response in fgaClient.StreamedListObjects( + new ClientListObjectsRequest { + User = "user:anne", + Relation = "can_read", + Type = "document", + ContextualTuples = new List { + new() { + User = "user:anne", + Relation = "writer", + Object = "document:temp" + } + } + })) { + results.Add(response.Object); + } + + // Assert - Verify request completed successfully with contextual tuples + Assert.Single(results); + Assert.Equal("document:1", results[0]); + } + + [Fact] + public async Task StreamedListObjects_ServerError_ThrowsException() { + var mockHandler = CreateMockHttpHandler( + HttpStatusCode.InternalServerError, + "{\"code\":\"internal_error\",\"message\":\"Server error\"}"); + + using var httpClient = new HttpClient(mockHandler.Object); + var config = new ClientConfiguration { + ApiUrl = ApiUrl, + StoreId = StoreId + }; + using var fgaClient = new OpenFgaClient(config, httpClient); + + await Assert.ThrowsAsync(async () => { + await foreach (var _ in fgaClient.StreamedListObjects( + new ClientListObjectsRequest { + User = "user:anne", + Relation = "can_read", + Type = "document" + })) { + // Should not reach here + } + }); + } + + [Fact] + public async Task StreamedListObjects_EarlyBreak_DisposesResourcesCleanly() { + var objects = new[] { "document:1", "document:2", "document:3", "document:4", "document:5" }; + var ndjson = CreateNDJSONResponse(objects); + + var mockHandler = CreateMockHttpHandler(HttpStatusCode.OK, ndjson); + using var httpClient = new HttpClient(mockHandler.Object); + var config = new ClientConfiguration { + ApiUrl = ApiUrl, + StoreId = StoreId + }; + using var fgaClient = new OpenFgaClient(config, httpClient); + + var results = new List(); + await foreach (var response in fgaClient.StreamedListObjects( + new ClientListObjectsRequest { + User = "user:anne", + Relation = "can_read", + Type = "document" + })) { + results.Add(response.Object); + if (results.Count == 2) { + break; // Early termination - should not throw or leak resources + } + } + + Assert.Equal(2, results.Count); + Assert.Equal(new[] { "document:1", "document:2" }, results.ToArray()); + } + + [Fact] + public async Task StreamedListObjects_WithCancellationToken_SupportsCancellation() { + var objects = new[] { "document:1", "document:2", "document:3" }; + var ndjson = CreateNDJSONResponse(objects); + + var mockHandler = CreateMockHttpHandler(HttpStatusCode.OK, ndjson); + using var httpClient = new HttpClient(mockHandler.Object); + var config = new ClientConfiguration { + ApiUrl = ApiUrl, + StoreId = StoreId + }; + using var fgaClient = new OpenFgaClient(config, httpClient); + + using var cts = new CancellationTokenSource(); + + var results = new List(); + await Assert.ThrowsAsync(async () => { + await foreach (var response in fgaClient.StreamedListObjects( + new ClientListObjectsRequest { + User = "user:anne", + Relation = "can_read", + Type = "document" + }, + cancellationToken: cts.Token)) { + results.Add(response.Object); + if (results.Count == 1) { + cts.Cancel(); + } + } + }); + + // Cancellation happens after the first item, but timing may allow more items before cancellation takes effect + Assert.True(results.Count >= 1, "At least one item should be processed before cancellation"); + } + + [Fact] + public async Task StreamedListObjects_EmptyResult_ReturnsNoObjects() { + var ndjson = ""; // Empty response + var mockHandler = CreateMockHttpHandler(HttpStatusCode.OK, ndjson); + using var httpClient = new HttpClient(mockHandler.Object); + var config = new ClientConfiguration { + ApiUrl = ApiUrl, + StoreId = StoreId + }; + using var fgaClient = new OpenFgaClient(config, httpClient); + + var results = new List(); + await foreach (var response in fgaClient.StreamedListObjects( + new ClientListObjectsRequest { + User = "user:anne", + Relation = "can_read", + Type = "document" + })) { + results.Add(response.Object); + } + + Assert.Empty(results); + } + + [Fact] + public async Task StreamedListObjects_MissingStoreId_ThrowsValidationError() { + using var httpClient = new HttpClient(); + var config = new ClientConfiguration { + ApiUrl = ApiUrl + // No StoreId + }; + using var fgaClient = new OpenFgaClient(config, httpClient); + + await Assert.ThrowsAsync(async () => { + await foreach (var _ in fgaClient.StreamedListObjects( + new ClientListObjectsRequest { + User = "user:anne", + Relation = "can_read", + Type = "document" + })) { + // Should not reach here + } + }); + } + + [Fact] + public async Task StreamedListObjects_LargeNumberOfObjects_StreamsEfficiently() { + // Arrange - simulate a large response + var objects = Enumerable.Range(1, 100).Select(i => $"document:{i}").ToArray(); + var ndjson = CreateNDJSONResponse(objects); + + var mockHandler = CreateMockHttpHandler(HttpStatusCode.OK, ndjson); + using var httpClient = new HttpClient(mockHandler.Object); + var config = new ClientConfiguration { + ApiUrl = ApiUrl, + StoreId = StoreId + }; + using var fgaClient = new OpenFgaClient(config, httpClient); + + var results = new List(); + await foreach (var response in fgaClient.StreamedListObjects( + new ClientListObjectsRequest { + User = "user:anne", + Relation = "can_read", + Type = "document" + })) { + results.Add(response.Object); + } + + Assert.Equal(100, results.Count); + Assert.Equal(objects, results.ToArray()); + } + + [Fact] + public async Task StreamedListObjects_MultipleIterations_WorksCorrectly() { + var objects = new[] { "document:1", "document:2" }; + var ndjson = CreateNDJSONResponse(objects); + + // Create a new mock handler for each call + var mockHandler1 = CreateMockHttpHandler(HttpStatusCode.OK, ndjson); + using var httpClient1 = new HttpClient(mockHandler1.Object); + var config = new ClientConfiguration { + ApiUrl = ApiUrl, + StoreId = StoreId + }; + using var fgaClient1 = new OpenFgaClient(config, httpClient1); + + var mockHandler2 = CreateMockHttpHandler(HttpStatusCode.OK, ndjson); + using var httpClient2 = new HttpClient(mockHandler2.Object); + using var fgaClient2 = new OpenFgaClient(config, httpClient2); + + // Act - Call twice + var results1 = new List(); + await foreach (var response in fgaClient1.StreamedListObjects( + new ClientListObjectsRequest { + User = "user:anne", + Relation = "can_read", + Type = "document" + })) { + results1.Add(response.Object); + } + + var results2 = new List(); + await foreach (var response in fgaClient2.StreamedListObjects( + new ClientListObjectsRequest { + User = "user:anne", + Relation = "can_read", + Type = "document" + })) { + results2.Add(response.Object); + } + + Assert.Equal(objects, results1.ToArray()); + Assert.Equal(objects, results2.ToArray()); + } + + [Fact] + public async Task StreamedListObjects_WithCustomHeaders_IncludesHeadersInRequest() { + var objects = new[] { "document:1", "document:2" }; + var ndjson = CreateNDJSONResponse(objects); + + var mockHandler = CreateMockHttpHandler(HttpStatusCode.OK, ndjson, + req => { + // Verify custom headers are present + Assert.True(req.Headers.Contains("X-Custom-Header")); + Assert.Equal("custom-value", req.Headers.GetValues("X-Custom-Header").First()); + Assert.True(req.Headers.Contains("X-Request-Id")); + Assert.Equal("req-123", req.Headers.GetValues("X-Request-Id").First()); + }); + + using var httpClient = new HttpClient(mockHandler.Object); + var config = new ClientConfiguration { + ApiUrl = ApiUrl, + StoreId = StoreId + }; + using var fgaClient = new OpenFgaClient(config, httpClient); + + var results = new List(); + await foreach (var response in fgaClient.StreamedListObjects( + new ClientListObjectsRequest { + User = "user:anne", + Relation = "can_read", + Type = "document" + }, + new ClientListObjectsOptions { + Headers = new Dictionary { + { "X-Custom-Header", "custom-value" }, + { "X-Request-Id", "req-123" } + } + })) { + results.Add(response.Object); + } + + Assert.Equal(2, results.Count); + Assert.Equal(objects, results.ToArray()); + } + + [Fact] + public async Task StreamedListObjects_RateLimitError_ThrowsException() { + var mockHandler = new Mock(); + mockHandler.Protected() + .Setup>( + "SendAsync", + ItExpr.IsAny(), + ItExpr.IsAny() + ) + .ReturnsAsync(() => new HttpResponseMessage { + StatusCode = (HttpStatusCode)429, // TooManyRequests (not available in net48) + Content = new StringContent( + "{\"code\":\"rate_limit_exceeded\",\"message\":\"Too many requests\"}", + Encoding.UTF8, + "application/json"), + Headers = { + { "Retry-After", "1" } + } + }); + + using var httpClient = new HttpClient(mockHandler.Object); + var config = new ClientConfiguration { + ApiUrl = ApiUrl, + StoreId = StoreId + }; + using var fgaClient = new OpenFgaClient(config, httpClient); + + await Assert.ThrowsAsync(async () => { + await foreach (var _ in fgaClient.StreamedListObjects( + new ClientListObjectsRequest { + User = "user:anne", + Relation = "can_read", + Type = "document" + })) { + // Should not reach here + } + }); + } +} \ No newline at end of file diff --git a/src/OpenFga.Sdk/Api/OpenFgaApi.cs b/src/OpenFga.Sdk/Api/OpenFgaApi.cs index 74d4f840..91e9d64a 100644 --- a/src/OpenFga.Sdk/Api/OpenFgaApi.cs +++ b/src/OpenFga.Sdk/Api/OpenFgaApi.cs @@ -520,6 +520,41 @@ public async Task ReadAuthorizationModel(string "ReadChanges", options, cancellationToken); } + /// + /// Stream all objects of the given type that the user has a relation with The Streamed ListObjects API is very similar to the the ListObjects API, with two differences: 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + /// + /// Thrown when fails to make API call + /// + /// + /// Request options. + /// Cancellation Token to cancel the request. + /// IAsyncEnumerable of StreamedListObjectsResponse + public async IAsyncEnumerable StreamedListObjects(string storeId, ListObjectsRequest body, IRequestOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) { + var pathParams = new Dictionary { }; + if (string.IsNullOrWhiteSpace(storeId)) { + throw new FgaRequiredParamError("StreamedListObjects", "StoreId"); + } + + if (storeId != null) { + pathParams.Add("store_id", storeId.ToString()); + } + var queryParams = new Dictionary(); + + var requestBuilder = new RequestBuilder { + Method = new HttpMethod("POST"), + BasePath = _configuration.BasePath, + PathTemplate = "/stores/{store_id}/streamed-list-objects", + PathParameters = pathParams, + Body = body, + QueryParameters = queryParams, + }; + + await foreach (var response in _apiClient.SendStreamingRequestAsync(requestBuilder, + "StreamedListObjects", options, cancellationToken)) { + yield return response; + } + } + /// /// Add or delete tuples from the store The Write API will transactionally update the tuples for a certain store. Tuples and type definitions allow OpenFGA to determine whether a relationship exists between an object and an user. In the body, `writes` adds new tuples and `deletes` removes existing tuples. When deleting a tuple, any `condition` specified with it is ignored. The API is not idempotent by default: if, later on, you try to add the same tuple key (even if the `condition` is different), or if you try to delete a non-existing tuple, it will throw an error. To allow writes when an identical tuple already exists in the database, set `\"on_duplicate\": \"ignore\"` on the `writes` object. To allow deletes when a tuple was already removed from the database, set `\"on_missing\": \"ignore\"` on the `deletes` object. If a Write request contains both idempotent (ignore) and non-idempotent (error) operations, the most restrictive action (error) will take precedence. If a condition fails for a sub-request with an error flag, the entire transaction will be rolled back. This gives developers explicit control over the atomicity of the requests. The API will not allow you to write tuples such as `document:2021-budget#viewer@document:2021-budget#viewer`, because they are implicit. An `authorization_model_id` may be specified in the body. If it is, it will be used to assert that each written tuple (not deleted) is valid for the model specified. If it is not specified, the latest authorization model ID will be used. ## Example ### Adding relationships To add `user:anne` as a `writer` for `document:2021-budget`, call write API with the following ```json { \"writes\": { \"tuple_keys\": [ { \"user\": \"user:anne\", \"relation\": \"writer\", \"object\": \"document:2021-budget\" } ], \"on_duplicate\": \"ignore\" }, \"authorization_model_id\": \"01G50QVV17PECNVAHX1GG4Y5NC\" } ``` ### Removing relationships To remove `user:bob` as a `reader` for `document:2021-budget`, call write API with the following ```json { \"deletes\": { \"tuple_keys\": [ { \"user\": \"user:bob\", \"relation\": \"reader\", \"object\": \"document:2021-budget\" } ], \"on_missing\": \"ignore\" } } ``` /// diff --git a/src/OpenFga.Sdk/ApiClient/ApiClient.cs b/src/OpenFga.Sdk/ApiClient/ApiClient.cs index da2fc1a0..779cc13b 100644 --- a/src/OpenFga.Sdk/ApiClient/ApiClient.cs +++ b/src/OpenFga.Sdk/ApiClient/ApiClient.cs @@ -7,6 +7,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.Net.Http; +using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -137,6 +138,34 @@ await _baseClient.SendRequestAsync(requestBuilder, additionalHeade response.retryCount); } + /// + /// Handles streaming requests that return IAsyncEnumerable. + /// Note: Streaming responses cannot be retried once the stream has started. + /// + /// The request builder + /// The API name for error reporting and telemetry + /// Request options + /// Cancellation token + /// Request type + /// Response type for each streamed object + /// An async enumerable of response objects + /// + public async IAsyncEnumerable SendStreamingRequestAsync( + RequestBuilder requestBuilder, + string apiName, + IRequestOptions? options = null, + [EnumeratorCancellation] CancellationToken cancellationToken = default) { + + var authToken = await GetAuthenticationTokenAsync(apiName); + var additionalHeaders = BuildHeaders(_configuration, authToken, options); + var streamIter = _baseClient.SendStreamingRequestAsync( + requestBuilder, additionalHeaders, apiName, cancellationToken); + + await foreach (var item in streamIter) { + yield return item; + } + } + private async Task> Retry(Func>> retryable) { var requestCount = 0; var attemptCount = 0; // 0 = initial request, 1+ = retry attempts diff --git a/src/OpenFga.Sdk/ApiClient/BaseClient.cs b/src/OpenFga.Sdk/ApiClient/BaseClient.cs index ad90aca0..599a5c62 100644 --- a/src/OpenFga.Sdk/ApiClient/BaseClient.cs +++ b/src/OpenFga.Sdk/ApiClient/BaseClient.cs @@ -1,9 +1,13 @@ using OpenFga.Sdk.Exceptions; using System; using System.Collections.Generic; +using System.IO; +using System.Linq; using System.Net; using System.Net.Http; using System.Net.Http.Headers; +using System.Runtime.CompilerServices; +using System.Text; using System.Text.Json; using System.Threading; using System.Threading.Tasks; @@ -127,6 +131,163 @@ public async Task> SendRequestAsync(HttpRequestMessage req } } + /// + /// Handles calling the API for streaming responses (e.g., NDJSON) + /// + /// The HTTP request message + /// Additional headers to include + /// The API name for error reporting + /// Cancellation token + /// The type of each streamed response object + /// An async enumerable of parsed response objects + /// + public async IAsyncEnumerable SendStreamingRequestAsync( + HttpRequestMessage request, + IDictionary? additionalHeaders = null, + string? apiName = null, + [EnumeratorCancellation] CancellationToken cancellationToken = default) { + + if (additionalHeaders != null) { + foreach (var header in additionalHeaders.Where(header => header.Value != null)) { + request.Headers.Add(header.Key, header.Value); + } + } + + // Use ResponseHeadersRead to start streaming before full response is received + using var response = await _httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken) + .ConfigureAwait(false); + + try { + response.EnsureSuccessStatusCode(); + } + catch (HttpRequestException) { + throw await ApiException.CreateSpecificExceptionAsync(response, request, apiName).ConfigureAwait(false); + } + + if (response.Content == null) { + yield break; + } + + // Register cancellation token to dispose response and unblock stalled reads + using var disposeResponseRegistration = cancellationToken.Register(static state => ((HttpResponseMessage)state!).Dispose(), response); + + // Stream and parse NDJSON response +#if NET6_0_OR_GREATER + await using var stream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false); +#else + using var stream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false); +#endif + using var reader = new StreamReader(stream, Encoding.UTF8); + + // Replace the line-by-line reader with a buffered incremental reader to support partial NDJSON lines. + var sb = new StringBuilder(8 * 1024); // start with a reasonable buffer + var charBuffer = new char[4096]; + + while (true) { + int read; + try { + read = await reader.ReadAsync(charBuffer, 0, charBuffer.Length).ConfigureAwait(false); + } + catch (ObjectDisposedException) when (cancellationToken.IsCancellationRequested) { + throw new OperationCanceledException("Streaming request was cancelled.", cancellationToken); + } + catch (IOException ex) when (cancellationToken.IsCancellationRequested) { + throw new OperationCanceledException("Streaming request was cancelled.", ex, cancellationToken); + } + + if (read == 0) { + // End of stream: flush any remaining partial record without trailing newline + if (sb.Length > 0) { + var line = sb.ToString(); + sb.Clear(); + + cancellationToken.ThrowIfCancellationRequested(); + if (!string.IsNullOrWhiteSpace(line)) { + T? parsedResult = default; + try { + using var jsonDoc = JsonDocument.Parse(line); + var root = jsonDoc.RootElement; + if (root.TryGetProperty("result", out var resultElement)) { + parsedResult = JsonSerializer.Deserialize(resultElement.GetRawText()); + } + } + catch (JsonException) { + // Skip invalid trailing fragment + } + if (parsedResult != null) { + yield return parsedResult; + } + } + } + break; + } + + sb.Append(charBuffer, 0, read); + + // Process all complete lines currently in the buffer + int start = 0; + while (true) { + var span = sb.ToString(); // materialize for IndexOf; small overhead acceptable per chunk + int newlineIdx = span.IndexOf('\n', start); + if (newlineIdx == -1) { + // No complete line yet. Keep the current tail in StringBuilder. + // Remove processed head (if any) to avoid repeated scanning. + if (start > 0) { + sb.Clear(); + sb.Append(span.Substring(start)); + } + break; + } + + int lineLen = newlineIdx - start; + var line = lineLen > 0 ? span.Substring(start, lineLen) : string.Empty; + start = newlineIdx + 1; + + cancellationToken.ThrowIfCancellationRequested(); + + if (string.IsNullOrWhiteSpace(line)) { + continue; + } + + T? parsedResult = default; + try { + using var jsonDoc = JsonDocument.Parse(line); + var root = jsonDoc.RootElement; + if (root.TryGetProperty("result", out var resultElement)) { + parsedResult = JsonSerializer.Deserialize(resultElement.GetRawText()); + } + } + catch (JsonException) { + // Skip malformed line + } + + if (parsedResult != null) { + yield return parsedResult; + } + } + } + } + + /// + /// Handles calling the API for streaming responses (e.g., NDJSON) from a RequestBuilder + /// + /// The request builder + /// Additional headers to include + /// The API name for error reporting + /// Cancellation token + /// The request type + /// The response type for each streamed object + /// An async enumerable of parsed response objects + public IAsyncEnumerable SendStreamingRequestAsync( + RequestBuilder requestBuilder, + IDictionary? additionalHeaders = null, + string? apiName = null, + CancellationToken cancellationToken = default) { + + var request = requestBuilder.BuildRequest(); + return SendStreamingRequestAsync(request, additionalHeaders, apiName, cancellationToken); + } + /// /// Disposes of any owned disposable resources such as the underlying if owned. /// diff --git a/src/OpenFga.Sdk/Client/Client.cs b/src/OpenFga.Sdk/Client/Client.cs index 2f6fe59f..6cec8df8 100644 --- a/src/OpenFga.Sdk/Client/Client.cs +++ b/src/OpenFga.Sdk/Client/Client.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.Linq; using System.Net.Http; +using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -595,6 +596,38 @@ public async Task ListObjects(IClientListObjectsRequest bod Consistency = options?.Consistency, }, options, cancellationToken); + /** + * StreamedListObjects - Stream all objects of a particular type that the user has a certain relation to (evaluates) + * + * The Streamed ListObjects API is very similar to the ListObjects API, with two key differences: + * 1. Streaming Results: Instead of collecting all objects before returning a response, it streams them to the client as they are collected. + * 2. No Pagination Limit: Returns all results without the 1000-object limit of the standard ListObjects API. + * + * This is particularly useful when querying computed relations that may return large result sets. + * + * Returns an async enumerable that yields StreamedListObjectsResponse objects as they are received from the server. + */ + public async IAsyncEnumerable StreamedListObjects( + IClientListObjectsRequest body, + IClientListObjectsOptions? options = default, + [EnumeratorCancellation] CancellationToken cancellationToken = default) { + + await foreach (var response in api.StreamedListObjects(GetStoreId(options), new ListObjectsRequest { + User = body.User, + Relation = body.Relation, + Type = body.Type, + ContextualTuples = + new ContextualTupleKeys { + TupleKeys = body.ContextualTuples?.ConvertAll(tupleKey => tupleKey.ToTupleKey()) ?? + new List() + }, + Context = body.Context, + AuthorizationModelId = GetAuthorizationModelId(options), + Consistency = options?.Consistency, + }, options, cancellationToken)) { + yield return response; + } + } /// public async Task ListRelations(IClientListRelationsRequest body, diff --git a/src/OpenFga.Sdk/Model/StreamResultOfStreamedListObjectsResponse.cs b/src/OpenFga.Sdk/Model/StreamResultOfStreamedListObjectsResponse.cs new file mode 100644 index 00000000..0b8fbef7 --- /dev/null +++ b/src/OpenFga.Sdk/Model/StreamResultOfStreamedListObjectsResponse.cs @@ -0,0 +1,152 @@ +// +// OpenFGA/.NET SDK for OpenFGA +// +// API version: 1.x +// Website: https://openfga.dev +// Documentation: https://openfga.dev/docs +// Support: https://openfga.dev/community +// License: [Apache-2.0](https://github.com/openfga/dotnet-sdk/blob/main/LICENSE) +// +// NOTE: This file was auto generated. DO NOT EDIT. +// + + +using OpenFga.Sdk.Constants; +using System; +using System.Collections.Generic; +using System.ComponentModel.DataAnnotations; +using System.Linq; +using System.Runtime.Serialization; +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace OpenFga.Sdk.Model { + /// + /// StreamResultOfStreamedListObjectsResponse + /// + [DataContract(Name = "Stream_result_of_StreamedListObjectsResponse")] + public partial class StreamResultOfStreamedListObjectsResponse : IEquatable, IValidatableObject { + /// + /// Initializes a new instance of the class. + /// + [JsonConstructor] + public StreamResultOfStreamedListObjectsResponse() { + this.AdditionalProperties = new Dictionary(); + } + + /// + /// Initializes a new instance of the class. + /// + /// result. + /// error. + public StreamResultOfStreamedListObjectsResponse(StreamedListObjectsResponse result = default, Status error = default) { + this.Result = result; + this.Error = error; + this.AdditionalProperties = new Dictionary(); + } + + /// + /// Gets or Sets Result + /// + [DataMember(Name = "result", EmitDefaultValue = false)] + [JsonPropertyName("result")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public StreamedListObjectsResponse? Result { get; set; } + + /// + /// Gets or Sets Error + /// + [DataMember(Name = "error", EmitDefaultValue = false)] + [JsonPropertyName("error")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public Status? Error { get; set; } + + /// + /// Gets or Sets additional properties + /// + [JsonExtensionData] + public IDictionary AdditionalProperties { get; set; } + + + /// + /// Returns the JSON string presentation of the object + /// + /// JSON string presentation of the object + public virtual string ToJson() { + return JsonSerializer.Serialize(this); + } + + /// + /// Builds a StreamResultOfStreamedListObjectsResponse from the JSON string presentation of the object + /// + /// StreamResultOfStreamedListObjectsResponse + public static StreamResultOfStreamedListObjectsResponse FromJson(string jsonString) { + return JsonSerializer.Deserialize(jsonString) ?? throw new InvalidOperationException(); + } + + /// + /// Returns true if objects are equal + /// + /// Object to be compared + /// Boolean + public override bool Equals(object input) { + if (input == null || input.GetType() != this.GetType()) return false; + return this.Equals((StreamResultOfStreamedListObjectsResponse)input); + } + + /// + /// Returns true if StreamResultOfStreamedListObjectsResponse instances are equal + /// + /// Instance of StreamResultOfStreamedListObjectsResponse to be compared + /// Boolean + public bool Equals(StreamResultOfStreamedListObjectsResponse input) { + if (input == null) { + return false; + } + return + ( + this.Result == input.Result || + (this.Result != null && + this.Result.Equals(input.Result)) + ) && + ( + this.Error == input.Error || + (this.Error != null && + this.Error.Equals(input.Error)) + ) + && (this.AdditionalProperties.Count == input.AdditionalProperties.Count && this.AdditionalProperties.All(kv => input.AdditionalProperties.TryGetValue(kv.Key, out var inputValue) && Equals(kv.Value, inputValue))); + } + + /// + /// Gets the hash code + /// + /// Hash code + public override int GetHashCode() { + unchecked // Overflow is fine, just wrap + { + int hashCode = FgaConstants.HashCodeBasePrimeNumber; + if (this.Result != null) { + hashCode = (hashCode * FgaConstants.HashCodeMultiplierPrimeNumber) + this.Result.GetHashCode(); + } + if (this.Error != null) { + hashCode = (hashCode * FgaConstants.HashCodeMultiplierPrimeNumber) + this.Error.GetHashCode(); + } + if (this.AdditionalProperties != null) { + hashCode = (hashCode * FgaConstants.HashCodeMultiplierPrimeNumber) + this.AdditionalProperties.GetHashCode(); + } + return hashCode; + } + } + + /// + /// To validate all properties of the instance + /// + /// Validation context + /// Validation Result + public IEnumerable Validate(ValidationContext validationContext) { + yield break; + } + + } + +} \ No newline at end of file diff --git a/src/OpenFga.Sdk/Model/StreamedListObjectsResponse.cs b/src/OpenFga.Sdk/Model/StreamedListObjectsResponse.cs new file mode 100644 index 00000000..8681f9db --- /dev/null +++ b/src/OpenFga.Sdk/Model/StreamedListObjectsResponse.cs @@ -0,0 +1,138 @@ +// +// OpenFGA/.NET SDK for OpenFGA +// +// API version: 1.x +// Website: https://openfga.dev +// Documentation: https://openfga.dev/docs +// Support: https://openfga.dev/community +// License: [Apache-2.0](https://github.com/openfga/dotnet-sdk/blob/main/LICENSE) +// +// NOTE: This file was auto generated. DO NOT EDIT. +// + + +using OpenFga.Sdk.Constants; +using System; +using System.Collections.Generic; +using System.ComponentModel.DataAnnotations; +using System.Linq; +using System.Runtime.Serialization; +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace OpenFga.Sdk.Model { + /// + /// The response for a StreamedListObjects RPC. + /// + [DataContract(Name = "StreamedListObjectsResponse")] + public partial class StreamedListObjectsResponse : IEquatable, IValidatableObject { + /// + /// Initializes a new instance of the class. + /// + [JsonConstructor] + public StreamedListObjectsResponse() { + this.AdditionalProperties = new Dictionary(); + } + + /// + /// Initializes a new instance of the class. + /// + /// varObject (required). + public StreamedListObjectsResponse(string varObject = default) { + // to ensure "varObject" is required (not null) + if (varObject == null) { + throw new ArgumentNullException("varObject is a required property for StreamedListObjectsResponse and cannot be null"); + } + this.Object = varObject; + this.AdditionalProperties = new Dictionary(); + } + + /// + /// Gets or Sets Object + /// + [DataMember(Name = "object", IsRequired = true, EmitDefaultValue = false)] + [JsonPropertyName("object")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public string Object { get; set; } + + /// + /// Gets or Sets additional properties + /// + [JsonExtensionData] + public IDictionary AdditionalProperties { get; set; } + + + /// + /// Returns the JSON string presentation of the object + /// + /// JSON string presentation of the object + public virtual string ToJson() { + return JsonSerializer.Serialize(this); + } + + /// + /// Builds a StreamedListObjectsResponse from the JSON string presentation of the object + /// + /// StreamedListObjectsResponse + public static StreamedListObjectsResponse FromJson(string jsonString) { + return JsonSerializer.Deserialize(jsonString) ?? throw new InvalidOperationException(); + } + + /// + /// Returns true if objects are equal + /// + /// Object to be compared + /// Boolean + public override bool Equals(object input) { + if (input == null || input.GetType() != this.GetType()) return false; + return this.Equals((StreamedListObjectsResponse)input); + } + + /// + /// Returns true if StreamedListObjectsResponse instances are equal + /// + /// Instance of StreamedListObjectsResponse to be compared + /// Boolean + public bool Equals(StreamedListObjectsResponse input) { + if (input == null) { + return false; + } + return + ( + this.Object == input.Object || + (this.Object != null && + this.Object.Equals(input.Object)) + ) + && (this.AdditionalProperties.Count == input.AdditionalProperties.Count && this.AdditionalProperties.All(kv => input.AdditionalProperties.TryGetValue(kv.Key, out var inputValue) && Equals(kv.Value, inputValue))); + } + + /// + /// Gets the hash code + /// + /// Hash code + public override int GetHashCode() { + unchecked // Overflow is fine, just wrap + { + int hashCode = FgaConstants.HashCodeBasePrimeNumber; + if (this.Object != null) { + hashCode = (hashCode * FgaConstants.HashCodeMultiplierPrimeNumber) + this.Object.GetHashCode(); + } + if (this.AdditionalProperties != null) { + hashCode = (hashCode * FgaConstants.HashCodeMultiplierPrimeNumber) + this.AdditionalProperties.GetHashCode(); + } + return hashCode; + } + } + + /// + /// To validate all properties of the instance + /// + /// Validation context + /// Validation Result + public IEnumerable Validate(ValidationContext validationContext) { + yield break; + } + + } + +} \ No newline at end of file diff --git a/src/OpenFga.Sdk/Telemetry/Attributes.cs b/src/OpenFga.Sdk/Telemetry/Attributes.cs index 9968b864..ca13bdb1 100644 --- a/src/OpenFga.Sdk/Telemetry/Attributes.cs +++ b/src/OpenFga.Sdk/Telemetry/Attributes.cs @@ -207,7 +207,7 @@ private static TagList AddRequestModelIdAttributes( attributes.Add(new KeyValuePair(TelemetryAttribute.RequestModelId, modelId)); } - if (apiName is "Check" or "ListObjects" or "Write" or "Expand" or "ListUsers") { + if (apiName is "Check" or "ListObjects" or "StreamedListObjects" or "Write" or "Expand" or "ListUsers") { AddRequestBodyAttributes(requestBuilder, apiName, attributes); } @@ -227,7 +227,7 @@ private static TagList AddRequestBodyAttributes( authModelId.GetString())); } - if (apiName is "Check" or "ListObjects" && root.TryGetProperty("user", out var fgaUser) && + if (apiName is "Check" or "ListObjects" or "StreamedListObjects" && root.TryGetProperty("user", out var fgaUser) && !string.IsNullOrEmpty(fgaUser.GetString())) { attributes.Add(new KeyValuePair(TelemetryAttribute.FgaRequestUser, fgaUser.GetString()));