88using GraphRag . Constants ;
99using GraphRag . Graphs ;
1010using GraphRag . Storage . Postgres . ApacheAge ;
11+ using GraphRag . Storage . Postgres . ApacheAge . Types ;
1112using Microsoft . Extensions . DependencyInjection ;
1213using Microsoft . Extensions . Logging ;
1314using Microsoft . Extensions . Logging . Abstractions ;
1415using Npgsql ;
15- using NpgsqlTypes ;
1616
1717namespace GraphRag . Storage . Postgres ;
1818
@@ -401,6 +401,10 @@ async IAsyncEnumerable<GraphRelationship> FetchAsync(string nodeId, [EnumeratorC
401401 await client . OpenConnectionAsync ( token ) . ConfigureAwait ( false ) ;
402402
403403 await using var command = client . Connection . CreateCommand ( ) ;
404+ var payload = JsonSerializer . Serialize ( new Dictionary < string , object ? >
405+ {
406+ [ CypherParameterNames . NodeId ] = nodeId
407+ } ) ;
404408 command . CommandText = string . Concat (
405409 "SELECT " ,
406410 "\n source_id::text," ,
@@ -410,13 +414,8 @@ async IAsyncEnumerable<GraphRelationship> FetchAsync(string nodeId, [EnumeratorC
410414 "\n FROM ag_catalog.cypher(" , _graphNameLiteral , ", $$" ,
411415 "\n MATCH (source { id: $node_id })-[rel]->(target)" ,
412416 "\n RETURN source.id AS source_id, target.id AS target_id, type(rel) AS edge_type, properties(rel) AS edge_props" ,
413- "\n $$, @params::ag_catalog.agtype) AS (source_id agtype, target_id agtype, edge_type agtype, edge_props agtype);" ) ;
414- var payload = JsonSerializer . Serialize ( new Dictionary < string , object ? >
415- {
416- [ CypherParameterNames . NodeId ] = nodeId
417- } ) ;
417+ "\n $$, @params) AS (source_id agtype, target_id agtype, edge_type agtype, edge_props agtype);" ) ;
418418 command . Parameters . Add ( CreateAgTypeParameter ( CypherParameterNames . Parameters , payload ) ) ;
419-
420419 await using var reader = await command . ExecuteReaderAsync ( token ) . ConfigureAwait ( false ) ;
421420 while ( await reader . ReadAsync ( token ) . ConfigureAwait ( false ) )
422421 {
@@ -445,6 +444,7 @@ async IAsyncEnumerable<GraphRelationship> FetchRelationships(GraphTraversalOptio
445444
446445 await using var command = client . Connection . CreateCommand ( ) ;
447446 var pagination = BuildPaginationClause ( traversalOptions ) ;
447+ var parametersJson = SerializeParameters ( null ) ;
448448 command . CommandText = string . Concat (
449449 "SELECT " ,
450450 "\n source_id::text," ,
@@ -456,9 +456,8 @@ async IAsyncEnumerable<GraphRelationship> FetchRelationships(GraphTraversalOptio
456456 "\n RETURN source.id AS source_id, target.id AS target_id, type(rel) AS edge_type, properties(rel) AS edge_props" ,
457457 "\n ORDER BY source.id, target.id, type(rel)" ,
458458 pagination ,
459- "\n $$, @params::ag_catalog.agtype) AS (source_id agtype, target_id agtype, edge_type agtype, edge_props agtype);" ) ;
460- command . Parameters . Add ( CreateAgTypeParameter ( CypherParameterNames . Parameters , SerializeParameters ( null ) ) ) ;
461-
459+ "\n $$, @params) AS (source_id agtype, target_id agtype, edge_type agtype, edge_props agtype);" ) ;
460+ command . Parameters . Add ( CreateAgTypeParameter ( CypherParameterNames . Parameters , parametersJson ) ) ;
462461 await using var reader = await command . ExecuteReaderAsync ( token ) . ConfigureAwait ( false ) ;
463462 while ( await reader . ReadAsync ( token ) . ConfigureAwait ( false ) )
464463 {
@@ -487,6 +486,7 @@ async IAsyncEnumerable<GraphNode> FetchNodes(GraphTraversalOptions? traversalOpt
487486
488487 await using var command = client . Connection . CreateCommand ( ) ;
489488 var pagination = BuildPaginationClause ( traversalOptions ) ;
489+ var parametersJson = SerializeParameters ( null ) ;
490490 command . CommandText = string . Concat (
491491 "SELECT " ,
492492 "\n node_label::text," ,
@@ -497,9 +497,8 @@ async IAsyncEnumerable<GraphNode> FetchNodes(GraphTraversalOptions? traversalOpt
497497 "\n RETURN head(labels(n)) AS node_label, n.id AS node_id, properties(n) AS node_props" ,
498498 "\n ORDER BY n.id" ,
499499 pagination ,
500- "\n $$, @params::ag_catalog.agtype) AS (node_label agtype, node_id agtype, node_props agtype);" ) ;
501- command . Parameters . Add ( CreateAgTypeParameter ( CypherParameterNames . Parameters , SerializeParameters ( null ) ) ) ;
502-
500+ "\n $$, @params) AS (node_label agtype, node_id agtype, node_props agtype);" ) ;
501+ command . Parameters . Add ( CreateAgTypeParameter ( CypherParameterNames . Parameters , parametersJson ) ) ;
503502 await using var reader = await command . ExecuteReaderAsync ( token ) . ConfigureAwait ( false ) ;
504503 while ( await reader . ReadAsync ( token ) . ConfigureAwait ( false ) )
505504 {
@@ -525,17 +524,16 @@ protected virtual async Task ExecuteCypherAsync(string query, IReadOnlyDictionar
525524 private async Task ExecuteCypherAsync ( IAgeClient client , string query , IReadOnlyDictionary < string , object ? > parameters , CancellationToken cancellationToken )
526525 {
527526 var queryLiteral = WrapInDollarQuotes ( query ) ;
527+ var payload = SerializeParameters ( parameters ) ;
528528 var commandText = string . Concat (
529529 "SELECT *" ,
530- "\n FROM ag_catalog.cypher(" , _graphNameLiteral , ", " , queryLiteral , "::cstring, @params::ag_catalog.agtype) AS (result agtype);" ) ;
531- var payload = SerializeParameters ( parameters ) ;
530+ "\n FROM ag_catalog.cypher(" , _graphNameLiteral , ", " , queryLiteral , "::cstring, @params) AS (result agtype);" ) ;
532531
533532 for ( var attempt = 0 ; attempt < 2 ; attempt ++ )
534533 {
535534 await using var command = client . Connection . CreateCommand ( ) ;
536535 command . CommandText = commandText ;
537536 command . Parameters . Add ( CreateAgTypeParameter ( CypherParameterNames . Parameters , payload ) ) ;
538-
539537 try
540538 {
541539 await command . ExecuteNonQueryAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
@@ -856,7 +854,7 @@ protected virtual async Task<IReadOnlyList<string>> ExecuteExplainAsync(string e
856854 var explainLiteral = WrapInDollarQuotes ( explainQuery ) ;
857855 command . CommandText = string . Concat (
858856 "SELECT plan" ,
859- "\n FROM ag_catalog.cypher(" , _graphNameLiteral , ", " , explainLiteral , "::cstring, @params::ag_catalog.agtype ) AS (plan text);" ) ;
857+ "\n FROM ag_catalog.cypher(" , _graphNameLiteral , ", " , explainLiteral , "::cstring, @params) AS (plan text);" ) ;
860858 command . Parameters . Add ( CreateAgTypeParameter ( CypherParameterNames . Parameters , parameterJson ) ) ;
861859
862860 var plan = new List < string > ( ) ;
@@ -892,10 +890,10 @@ private static NpgsqlParameter CreateAgTypeParameter(string name, string jsonPay
892890 {
893891 ArgumentNullException . ThrowIfNull ( jsonPayload ) ;
894892
895- return new NpgsqlParameter ( name , NpgsqlDbType . Unknown )
893+ var agtype = new Agtype ( jsonPayload ) ;
894+ return new NpgsqlParameter < Agtype > ( name , agtype )
896895 {
897- DataTypeName = "ag_catalog.agtype" ,
898- Value = jsonPayload
896+ DataTypeName = "ag_catalog.agtype"
899897 } ;
900898 }
901899
0 commit comments