Skip to content

Commit 23ba0b9

Browse files
committed
scoped service
1 parent 02b5add commit 23ba0b9

File tree

14 files changed

+157
-77
lines changed

14 files changed

+157
-77
lines changed

AGENTS.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
- When initialization logic is required (initialization tasks), add a simple hosted service to perform it and register that hosted service inside the relevant extensions.
1313
- Do not expose user-facing knobs for AGE/Postgres connection pool sizing (e.g., `MaxConnections` parameters); rely on EF Core-style connection-string keywords and reasonable defaults inside the connector.
1414
- Graph store registrations (Postgres, Neo4j, Cosmos) must automatically register a default `IGraphStore`; remove `MakeDefault` toggles/options and rely on the first registration when an unkeyed graph store is requested.
15+
- Keep bulk graph store operations on `IGraphStore`; do not split them into a separate `IBulkGraphStore` interface.
16+
- Avoid separate scoped graph store abstractions (e.g., `IScopedGraphStore`); keep scope management on the primary client/graph store or its factory rather than exposing an extra DI service.
17+
- Always update `README.md` (and related docs) to reflect any behavior or API changes you make so documentation stays current with the code.
1518

1619
# Conversations
1720
any resulting updates to agents.md should go under the section "## Rules to follow"

Directory.Build.props

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
<LangVersion>14</LangVersion>
55
<Nullable>enable</Nullable>
66
<ImplicitUsings>enable</ImplicitUsings>
7-
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
7+
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
8+
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
89
</PropertyGroup>
910

1011
<!--NuGet-->
@@ -25,8 +26,8 @@
2526
<RepositoryUrl>https://github.com/managedcode/graphrag</RepositoryUrl>
2627
<PackageProjectUrl>https://github.com/managedcode/graphrag</PackageProjectUrl>
2728
<Product>Managed Code GraphRag</Product>
28-
<Version>10.0.2</Version>
29-
<PackageVersion>10.0.2</PackageVersion>
29+
<Version>10.0.3</Version>
30+
<PackageVersion>10.0.3</PackageVersion>
3031

3132
</PropertyGroup>
3233
<PropertyGroup Condition="'$(GITHUB_ACTIONS)' == 'true'">

README.md

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ GraphRAG ships with a first-class Apache AGE adapter (`ManagedCode.GraphRag.Post
236236
}
237237
}
238238
```
239-
4. **Register through DI.** `services.AddPostgresGraphStore("postgres", configure: ...)` wires up `IAgeConnectionManager`, `IAgeClientFactory`, `IGraphStore`, `IScopedGraphStore`, and `IBulkGraphStore`. Pool sizing follows the standard Npgsql settings (configure `Max Pool Size`, `Timeout`, etc. inside the connection string). The first registration becomes the default unkeyed `IGraphStore`; additional stores remain keyed-only.
239+
4. **Register through DI.** `services.AddPostgresGraphStore("postgres", configure: ...)` wires up `IAgeConnectionManager`, `IAgeClientFactory`, `PostgresGraphStore`, `IGraphStore`, and `PostgresExplainService`. Pool sizing follows the standard Npgsql settings (configure `Max Pool Size`, `Timeout`, etc. inside the connection string). The first registration becomes the default unkeyed `IGraphStore`; additional stores remain keyed-only.
240240

241241
```csharp
242242
var services = new ServiceCollection()
@@ -253,23 +253,27 @@ GraphRAG ships with a first-class Apache AGE adapter (`ManagedCode.GraphRag.Post
253253
var graphStore = provider.GetRequiredService<IGraphStore>();
254254

255255
// Scoped operations reuse a single AGE/Postgres connection for the lifetime of the scope
256-
var scopedStore = provider.GetRequiredService<IScopedGraphStore>();
257-
await using (await scopedStore.CreateScopeAsync())
256+
var ageClientFactory = provider.GetRequiredKeyedService<IAgeClientFactory>("postgres");
257+
await using (await ageClientFactory.CreateScopeAsync())
258258
{
259259
await graphStore.UpsertNodeAsync("node-1", "Example", new Dictionary<string, object?> { ["name"] = "Scoped" });
260260
await graphStore.UpsertNodeAsync("node-2", "Example", new Dictionary<string, object?> { ["name"] = "Connection" });
261261
}
262262

263263
// Bulk helpers batch large workloads while keeping the scoped connection alive
264-
var bulkStore = provider.GetRequiredService<IBulkGraphStore>();
265-
await bulkStore.UpsertNodesAsync(new[]
264+
await graphStore.UpsertNodesAsync(new[]
266265
{
267266
new GraphNodeUpsert("bulk-1", "Example", new Dictionary<string, object?> { ["name"] = "Bulk" }),
268267
new GraphNodeUpsert("bulk-2", "Example", new Dictionary<string, object?> { ["name"] = "Write" })
269268
});
269+
270+
await graphStore.UpsertRelationshipsAsync(new[]
271+
{
272+
new GraphRelationshipUpsert("bulk-1", "bulk-2", "RELATES_TO", new Dictionary<string, object?>())
273+
});
270274
```
271275

272-
The `AgeConnectionManager` automatically retries transient `53300: too many clients` errors (up to three exponential backoff attempts) so scopes can wait for a free slot before failing. When a scope is disposed, the underlying `IAgeClientScope` returns its connection to the pool, keeping concurrency predictable even under heavy fan-out.
276+
The `AgeConnectionManager` automatically retries transient `53300: too many clients` errors (up to three exponential backoff attempts) so scopes can wait for a free slot before failing. When a scope is disposed, the underlying `IAgeClientScope` created by `IAgeClientFactory` returns its connection to the pool, keeping concurrency predictable even under heavy fan-out.
273277

274278
### Neo4j Setup
275279

src/ManagedCode.GraphRag.CosmosDb/CosmosGraphStore.cs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,16 @@ public async Task UpsertNodeAsync(string id, string label, IReadOnlyDictionary<s
3232
await container.UpsertItemAsync(document, new PartitionKey(document.Id), cancellationToken: cancellationToken);
3333
}
3434

35+
public async Task UpsertNodesAsync(IReadOnlyCollection<GraphNodeUpsert> nodes, CancellationToken cancellationToken = default)
36+
{
37+
ArgumentNullException.ThrowIfNull(nodes);
38+
39+
foreach (var node in nodes)
40+
{
41+
await UpsertNodeAsync(node.Id, node.Label, node.Properties, cancellationToken).ConfigureAwait(false);
42+
}
43+
}
44+
3545
public async Task UpsertRelationshipAsync(string sourceId, string targetId, string type, IReadOnlyDictionary<string, object?> properties, CancellationToken cancellationToken = default)
3646
{
3747
ArgumentException.ThrowIfNullOrWhiteSpace(sourceId);
@@ -43,6 +53,31 @@ public async Task UpsertRelationshipAsync(string sourceId, string targetId, stri
4353
await container.UpsertItemAsync(document, new PartitionKey(document.SourceId), cancellationToken: cancellationToken);
4454
}
4555

56+
public async Task UpsertRelationshipsAsync(IReadOnlyCollection<GraphRelationshipUpsert> relationships, CancellationToken cancellationToken = default)
57+
{
58+
ArgumentNullException.ThrowIfNull(relationships);
59+
60+
foreach (var relationship in relationships)
61+
{
62+
await UpsertRelationshipAsync(
63+
relationship.SourceId,
64+
relationship.TargetId,
65+
relationship.Type,
66+
relationship.Properties,
67+
cancellationToken).ConfigureAwait(false);
68+
69+
if (relationship.Bidirectional)
70+
{
71+
await UpsertRelationshipAsync(
72+
relationship.TargetId,
73+
relationship.SourceId,
74+
relationship.Type,
75+
relationship.Properties,
76+
cancellationToken).ConfigureAwait(false);
77+
}
78+
}
79+
}
80+
4681
public IAsyncEnumerable<GraphRelationship> GetOutgoingRelationshipsAsync(string sourceId, CancellationToken cancellationToken = default)
4782
{
4883
ArgumentException.ThrowIfNullOrWhiteSpace(sourceId);

src/ManagedCode.GraphRag.Neo4j/Neo4jGraphStore.cs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,16 @@ public async Task UpsertNodeAsync(string id, string label, IReadOnlyDictionary<s
3333
await session.ExecuteWriteAsync(tx => tx.RunAsync(cypher, new { id, props = properties }));
3434
}
3535

36+
public async Task UpsertNodesAsync(IReadOnlyCollection<GraphNodeUpsert> nodes, CancellationToken cancellationToken = default)
37+
{
38+
ArgumentNullException.ThrowIfNull(nodes);
39+
40+
foreach (var node in nodes)
41+
{
42+
await UpsertNodeAsync(node.Id, node.Label, node.Properties, cancellationToken).ConfigureAwait(false);
43+
}
44+
}
45+
3646
public async Task UpsertRelationshipAsync(string sourceId, string targetId, string type, IReadOnlyDictionary<string, object?> properties, CancellationToken cancellationToken = default)
3747
{
3848
ArgumentException.ThrowIfNullOrWhiteSpace(sourceId);
@@ -49,6 +59,31 @@ public async Task UpsertRelationshipAsync(string sourceId, string targetId, stri
4959
await session.ExecuteWriteAsync(tx => tx.RunAsync(cypher, new { sourceId, targetId, props = properties }));
5060
}
5161

62+
public async Task UpsertRelationshipsAsync(IReadOnlyCollection<GraphRelationshipUpsert> relationships, CancellationToken cancellationToken = default)
63+
{
64+
ArgumentNullException.ThrowIfNull(relationships);
65+
66+
foreach (var relationship in relationships)
67+
{
68+
await UpsertRelationshipAsync(
69+
relationship.SourceId,
70+
relationship.TargetId,
71+
relationship.Type,
72+
relationship.Properties,
73+
cancellationToken).ConfigureAwait(false);
74+
75+
if (relationship.Bidirectional)
76+
{
77+
await UpsertRelationshipAsync(
78+
relationship.TargetId,
79+
relationship.SourceId,
80+
relationship.Type,
81+
relationship.Properties,
82+
cancellationToken).ConfigureAwait(false);
83+
}
84+
}
85+
}
86+
5287
public IAsyncEnumerable<GraphRelationship> GetOutgoingRelationshipsAsync(string sourceId, CancellationToken cancellationToken = default)
5388
{
5489
ArgumentException.ThrowIfNullOrWhiteSpace(sourceId);

src/ManagedCode.GraphRag.Postgres/ApacheAge/AgeClientFactory.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ namespace GraphRag.Storage.Postgres.ApacheAge;
77
public interface IAgeClientFactory
88
{
99
IAgeClient CreateClient();
10+
IAgeClient CreateUnscopedClient();
1011
ValueTask<IAgeClientScope> CreateScopeAsync(CancellationToken cancellationToken = default);
1112
}
1213

@@ -26,6 +27,8 @@ public IAgeClient CreateClient()
2627
return CreatePhysicalClient();
2728
}
2829

30+
public IAgeClient CreateUnscopedClient() => CreatePhysicalClient();
31+
2932
public ValueTask<IAgeClientScope> CreateScopeAsync(CancellationToken cancellationToken = default)
3033
{
3134
cancellationToken.ThrowIfCancellationRequested();

src/ManagedCode.GraphRag.Postgres/ApacheAge/AgeConnectionManager.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public AgeConnectionManager(string connectionString, ILogger<AgeConnectionManage
4141
throw new ArgumentOutOfRangeException(nameof(connectionString), "Maximum Pool Size must be greater than zero.");
4242
}
4343
connectionBuilder.MinPoolSize = Math.Min(10, connectionBuilder.MaxPoolSize);
44+
connectionBuilder.Timeout = 0;
4445

4546
ConnectionString = connectionBuilder.ConnectionString;
4647
_dataSource = NpgsqlDataSource.Create(connectionBuilder.ConnectionString);
@@ -182,12 +183,23 @@ private async Task<NpgsqlConnection> OpenDataSourceConnectionAsync(CancellationT
182183
LogMessages.ConnectionRetrying(_logger, ConnectionString, attempt, delay, ex.MessageText);
183184
await Task.Delay(delay, cancellationToken).ConfigureAwait(false);
184185
}
186+
catch (NpgsqlException ex) when (ShouldRetryOnPoolExhaustion(ex, attempt))
187+
{
188+
var delay = GetRetryDelay(attempt);
189+
LogMessages.ConnectionRetrying(_logger, ConnectionString, attempt, delay, ex.Message);
190+
await Task.Delay(delay, cancellationToken).ConfigureAwait(false);
191+
}
185192
}
186193
}
187194

188195
private static bool ShouldRetry(PostgresException ex, int attempt) =>
189196
ex.SqlState == PostgresErrorCodes.TooManyConnections && attempt < ConnectionLimitMaxAttempts;
190197

198+
private static bool ShouldRetryOnPoolExhaustion(NpgsqlException ex, int attempt) =>
199+
attempt < ConnectionLimitMaxAttempts &&
200+
ex.InnerException is TimeoutException &&
201+
ex.Message.Contains("The connection pool has been exhausted", StringComparison.OrdinalIgnoreCase);
202+
191203
private static TimeSpan GetRetryDelay(int attempt)
192204
{
193205
var delayMillis = Math.Min(

0 commit comments

Comments
 (0)