1515namespace KurrentDB . Components . Query ;
1616
1717public static partial class QueryService {
18- public static string AmendQuery ( string query ) {
18+ private static string AmendQuery ( DuckDBConnectionPool pool , string query ) {
1919 var matches = ExtractionRegex ( ) . Matches ( query ) ;
2020 List < string > ctes = [ AllCte ] ;
2121 foreach ( Match match in matches ) {
2222 if ( ! match . Success ) continue ;
2323 var tokens = match . Value . Split ( ':' ) ;
2424 var cteName = ReplaceSpecialCharsWithUnderscore ( $ "{ tokens [ 0 ] } _{ tokens [ 1 ] } ") ;
25- var where = "where " + tokens [ 0 ] switch {
25+ var where = $ "where { tokens [ 0 ] switch {
2626 "stream" => $ "stream = '{ tokens [ 1 ] } '",
2727 "category" => $ "category = '{ tokens [ 1 ] } '",
2828 _ => throw new ( "Invalid token" )
29- } ;
29+ } } " ;
3030 var cte = string . Format ( CteTemplate , cteName , where ) ;
3131 ctes . Add ( cte ) ;
3232 query = query . Replace ( match . Value , cteName ) ;
3333 }
3434
35+ ValidateQuery ( pool , query ) ;
3536 return $ "with\r \n { string . Join ( ",\r \n " , ctes ) } \r \n { query } ";
3637 }
3738
38- internal static List < Dictionary < string , object > > ExecuteAdHocUserQuery ( this DuckDBConnectionPool pool , string sql ) {
39- var result = pool . QueryFirstOrDefault < Sql2Json . Args , Sql2Json . Result , Sql2Json > ( new ( sql . ReplaceLineEndings ( "" ) ) ) ;
40- if ( ! result . HasValue ) {
39+ private static void ValidateQuery ( DuckDBConnectionPool pool , string query ) {
40+ // var result = pool.QueryFirstOrDefault<Sql2Json.Args, Sql2Json.Result, Sql2Json>(new(query));
41+ using var _ = pool . Rent ( out var connection ) ;
42+ var result = connection . QueryFirstOrDefault < string > ( "select json_serialize_sql($query::varchar)" , new { query } ) ;
43+ if ( result == null ) {
4144 throw new ( "Error parsing query" ) ;
4245 }
43-
44- var conversionResponse = JsonSerializer . Deserialize < SqlJsonResponse > ( result . Value . Json ) ;
46+ var conversionResponse = JsonSerializer . Deserialize < SqlJsonResponse > ( result ) ;
4547 if ( conversionResponse . Error ) {
4648 var error = conversionResponse . ErrorMessage . StartsWith ( "Only SELECT" )
4749 ? "Only SELECT statements are allowed"
4850 : $ "Error parsing query: { conversionResponse . ErrorMessage } ";
4951 throw new ( error ) ;
5052 }
53+ }
5154
55+ internal static List < Dictionary < string , object > > ExecuteAdHocUserQuery ( this DuckDBConnectionPool pool , string sql ) {
56+ var query = AmendQuery ( pool , sql ) ;
5257 using var scope = pool . Rent ( out var connection ) ;
53- var query = AmendQuery ( sql ) ;
5458 var items = ( IEnumerable < IDictionary < string , object > > ) connection . Query ( query ) ;
5559 return items . Select ( x => x . ToDictionary ( y => y . Key , y => y . Value ) ) . ToList ( ) ;
5660 }
5761
58- private static string ReplaceSpecialCharsWithUnderscore ( string input ) {
59- return string . IsNullOrEmpty ( input ) ? string . Empty : SpecialCharsRegex ( ) . Replace ( input , "_" ) ;
60- }
62+ private static string ReplaceSpecialCharsWithUnderscore ( string input )
63+ => string . IsNullOrEmpty ( input ) ? string . Empty : SpecialCharsRegex ( ) . Replace ( input , "_" ) ;
6164
6265 [ GeneratedRegex ( @"\b(?:stream|category):([A-Za-z0-9_-]+)\b" , RegexOptions . IgnoreCase | RegexOptions . CultureInvariant ) ]
6366 private static partial Regex ExtractionRegex ( ) ;
@@ -69,22 +72,23 @@ private static string ReplaceSpecialCharsWithUnderscore(string input) {
6972
7073 private const string CteTemplate = """
7174 {0} AS (
72- select log_position, stream, event_number, epoch_ms(created) as created_at, event->>'data' as data, event->>'metadata' as metadata
75+ select log_position, stream, event_number, event_type, epoch_ms(created) as created_at, event->>'data' as data, event->>'metadata' as metadata
7376 from (
7477 select *, kdb_get(log_position)::JSON as event
7578 from (
76- select stream, event_number, log_position, created from idx_all {1}
79+ select stream, event_number, event_type, log_position, created from idx_all {1}
7780 union all
78- select stream, event_number, log_position, created from inflight() {1}
81+ select stream, event_number, event_type, log_position, created from inflight() {1}
7982 )
8083 )
8184 )
8285 """ ;
8386
8487 private record SqlJsonResponse {
8588 [ JsonPropertyName ( "error" ) ] public bool Error { get ; init ; }
86-
8789 [ JsonPropertyName ( "error_message" ) ] public string ErrorMessage { get ; init ; } = "" ;
90+ [ JsonPropertyName ( "error_subtype" ) ] public string ErrorSubtype { get ; init ; } = "" ;
91+ [ JsonPropertyName ( "position" ) ] public string Position { get ; init ; }
8892 }
8993
9094 public struct Sql2Json : IQuery < Sql2Json . Args , Sql2Json . Result > {
0 commit comments