forked from orneryd/NornicDB
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexecutor.go
More file actions
2123 lines (1915 loc) · 73.6 KB
/
executor.go
File metadata and controls
2123 lines (1915 loc) · 73.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Package cypher provides Neo4j-compatible Cypher query execution for NornicDB.
//
// This package implements a Cypher query parser and executor that supports
// the core Neo4j Cypher query language features. It enables NornicDB to be
// compatible with existing Neo4j applications and tools.
//
// Supported Cypher Features:
// - MATCH: Pattern matching with node and relationship patterns
// - CREATE: Creating nodes and relationships
// - MERGE: Upsert operations with ON CREATE/ON MATCH clauses
// - DELETE/DETACH DELETE: Removing nodes and relationships
// - SET: Updating node and relationship properties
// - REMOVE: Removing properties and labels
// - RETURN: Returning query results
// - WHERE: Filtering with conditions
// - WITH: Passing results between query parts
// - OPTIONAL MATCH: Left outer joins
// - CALL: Procedure calls
// - UNWIND: List expansion
//
// Example Usage:
//
// // Create executor with storage backend
// storage := storage.NewMemoryEngine()
// executor := cypher.NewStorageExecutor(storage)
//
// // Execute Cypher queries
// result, err := executor.Execute(ctx, "CREATE (n:Person {name: 'Alice', age: 30})", nil)
// if err != nil {
// log.Fatal(err)
// }
//
// // Query with parameters
// params := map[string]interface{}{
// "name": "Alice",
// "minAge": 25,
// }
// result, err = executor.Execute(ctx,
// "MATCH (n:Person {name: $name}) WHERE n.age >= $minAge RETURN n", params)
//
// // Complex query with relationships
// result, err = executor.Execute(ctx, `
// MATCH (a:Person)-[r:KNOWS]->(b:Person)
// WHERE a.age > 25
// RETURN a.name, r.since, b.name
// ORDER BY a.age DESC
// LIMIT 10
// `, nil)
//
// // Process results
// for _, row := range result.Rows {
// fmt.Printf("Row: %v\n", row)
// }
//
// Neo4j Compatibility:
//
// The executor aims for high compatibility with Neo4j Cypher:
// - Same syntax and semantics for core operations
// - Parameter substitution with $param syntax
// - Neo4j-style error messages and codes
// - Compatible result format for drivers
// - Support for Neo4j built-in functions
//
// Query Processing Pipeline:
//
// 1. **Parsing**: Query is parsed into an AST (Abstract Syntax Tree)
// 2. **Validation**: Syntax and semantic validation
// 3. **Parameter Substitution**: Replace $param with actual values
// 4. **Execution Planning**: Determine optimal execution strategy
// 5. **Execution**: Execute against storage backend
// 6. **Result Formatting**: Format results for Neo4j compatibility
//
// Performance Considerations:
//
// - Pattern matching is optimized for common cases
// - Indexes are used automatically when available
// - Query planning chooses efficient execution paths
// - Bulk operations are optimized for large datasets
//
// Limitations:
//
// Current limitations compared to full Neo4j:
// - No user-defined procedures (CALL is limited to built-ins)
// - No complex path expressions
// - No graph algorithms (shortest path, etc.)
// - No schema constraints (handled by storage layer)
// - No transactions (single-query atomicity only)
//
// ELI12 (Explain Like I'm 12):
//
// Think of Cypher like asking questions about a social network:
//
// 1. **MATCH**: "Find all people named Alice" - like searching through
// a phone book for everyone with a specific name.
//
// 2. **CREATE**: "Add a new person named Bob" - like writing a new
// entry in the phone book.
//
// 3. **Relationships**: "Find who Alice knows" - like following the
// lines between people on a friendship map.
//
// 4. **WHERE**: "Find people older than 25" - like adding a filter
// to only show certain results.
//
// 5. **RETURN**: "Show me their names and ages" - like deciding which
// information to display from your search.
//
// The Cypher executor is like a smart assistant that understands these
// questions and knows how to find the answers in your data!
package cypher
import (
"context"
"fmt"
"log"
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/orneryd/nornicdb/pkg/config"
"github.com/orneryd/nornicdb/pkg/cypher/antlr"
"github.com/orneryd/nornicdb/pkg/search"
"github.com/orneryd/nornicdb/pkg/storage"
"github.com/orneryd/nornicdb/pkg/vectorspace"
)
// Pre-compiled regexes for subquery detection (whitespace-flexible)
var (
// Matches EXISTS followed by optional whitespace and opening brace
existsSubqueryRe = regexp.MustCompile(`(?i)\bEXISTS\s*\{`)
// Matches NOT EXISTS followed by optional whitespace and opening brace
notExistsSubqueryRe = regexp.MustCompile(`(?i)\bNOT\s+EXISTS\s*\{`)
// Matches COUNT followed by optional whitespace and opening brace
countSubqueryRe = regexp.MustCompile(`(?i)\bCOUNT\s*\{`)
// Matches CALL followed by optional whitespace and opening brace (not CALL procedure())
callSubqueryRe = regexp.MustCompile(`(?i)\bCALL\s*\{`)
// Matches COLLECT followed by optional whitespace and opening brace
collectSubqueryRe = regexp.MustCompile(`(?i)\bCOLLECT\s*\{`)
)
// hasSubqueryPattern checks if the query contains a subquery pattern (keyword + optional whitespace + brace)
func hasSubqueryPattern(query string, pattern *regexp.Regexp) bool {
return pattern.MatchString(query)
}
// StorageExecutor executes Cypher queries against a storage backend.
//
// The StorageExecutor provides the main interface for executing Cypher queries
// in NornicDB. It handles query parsing, validation, parameter substitution,
// and execution against the underlying storage engine.
//
// Key features:
// - Neo4j-compatible Cypher syntax support
// - Parameter substitution with $param syntax
// - Query validation and error reporting
// - Optimized execution planning
// - Thread-safe concurrent execution
//
// Example:
//
// storage := storage.NewMemoryEngine()
// executor := cypher.NewStorageExecutor(storage)
//
// // Simple node creation
// result, _ := executor.Execute(ctx, "CREATE (n:Person {name: 'Alice'})", nil)
//
// // Parameterized query
// params := map[string]interface{}{"name": "Bob", "age": 30}
// result, _ = executor.Execute(ctx,
// "CREATE (n:Person {name: $name, age: $age})", params)
//
// // Complex pattern matching
// result, _ = executor.Execute(ctx, `
// MATCH (a:Person)-[:KNOWS]->(b:Person)
// WHERE a.age > 25
// RETURN a.name, b.name
// `, nil)
//
// Thread Safety:
//
// The executor is thread-safe and can handle concurrent queries.
//
// NodeMutatedCallback is called when a node is created or mutated via Cypher (CREATE, MERGE, SET, REMOVE, or procedures that update nodes).
// This allows external systems (like the embed queue) to be notified so embeddings can be (re)generated.
type NodeMutatedCallback func(nodeID string)
type StorageExecutor struct {
parser *Parser
storage storage.Engine
txContext *TransactionContext // Active transaction context
cache *SmartQueryCache // Query result cache with label-aware invalidation
planCache *QueryPlanCache // Parsed query plan cache
analyzer *QueryAnalyzer // Query analysis with AST caching
// Node lookup cache for MATCH patterns like (n:Label {prop: value})
// Key: "Label:{prop:value,...}", Value: *storage.Node
// This dramatically speeds up repeated MATCH lookups for the same pattern
nodeLookupCache map[string]*storage.Node
nodeLookupCacheMu sync.RWMutex
// deferFlush when true, writes are not auto-flushed (Bolt layer handles it)
deferFlush bool
// embedder for server-side query embedding (optional)
// If set, vector search can accept string queries which are embedded automatically
embedder QueryEmbedder
// searchService optionally provides unified search semantics for Cypher procedures.
// When set, db.index.vector.queryNodes delegates to search.Service.
searchService *search.Service
// onNodeMutated is called when a node is created or mutated (CREATE, MERGE, SET, REMOVE).
// This allows the embed queue to be notified so embeddings are (re)generated.
onNodeMutated NodeMutatedCallback
// defaultEmbeddingDimensions is the configured embedding dimensions for vector indexes
// Used as default when CREATE VECTOR INDEX doesn't specify dimensions
defaultEmbeddingDimensions int
// dbManager is optional - when set, enables system commands (CREATE/DROP/SHOW DATABASE)
// System commands require DatabaseManager to manage multiple databases
// This is an interface to avoid import cycles with multidb package
dbManager DatabaseManagerInterface
// vectorRegistry maps Cypher vector index definitions to concrete vector spaces.
vectorRegistry *vectorspace.IndexRegistry
vectorIndexSpaces map[string]vectorspace.VectorSpaceKey
}
// DatabaseManagerInterface is a minimal interface to avoid import cycles with multidb package.
// This allows the executor to call database management operations without directly
// depending on the multidb package.
type DatabaseManagerInterface interface {
CreateDatabase(name string) error
DropDatabase(name string) error
ListDatabases() []DatabaseInfoInterface
Exists(name string) bool
CreateAlias(alias, databaseName string) error
DropAlias(alias string) error
ListAliases(databaseName string) map[string]string
ResolveDatabase(nameOrAlias string) (string, error)
SetDatabaseLimits(databaseName string, limits interface{}) error
GetDatabaseLimits(databaseName string) (interface{}, error)
// Composite database methods
CreateCompositeDatabase(name string, constituents []interface{}) error
DropCompositeDatabase(name string) error
AddConstituent(compositeName string, constituent interface{}) error
RemoveConstituent(compositeName string, alias string) error
GetCompositeConstituents(compositeName string) ([]interface{}, error)
ListCompositeDatabases() []DatabaseInfoInterface
IsCompositeDatabase(name string) bool
}
// DatabaseInfoInterface provides database metadata without importing multidb.
type DatabaseInfoInterface interface {
Name() string
Type() string
Status() string
IsDefault() bool
CreatedAt() time.Time
}
// QueryEmbedder generates embeddings for search queries.
// This is a minimal interface to avoid import cycles with embed package.
type QueryEmbedder interface {
Embed(ctx context.Context, text string) ([]float32, error)
}
// NewStorageExecutor creates a new Cypher executor with the given storage backend.
//
// The executor is initialized with a parser and connected to the storage engine.
// It's ready to execute Cypher queries immediately after creation.
//
// Parameters:
// - store: Storage engine to execute queries against (required)
//
// Returns:
// - StorageExecutor ready for query execution
//
// Example:
//
// // Create storage and executor
// storage := storage.NewMemoryEngine()
// executor := cypher.NewStorageExecutor(storage)
//
// // Executor is ready for queries
// result, err := executor.Execute(ctx, "MATCH (n) RETURN count(n)", nil)
func NewStorageExecutor(store storage.Engine) *StorageExecutor {
return &StorageExecutor{
parser: NewParser(),
storage: store,
cache: NewSmartQueryCache(1000), // Query result cache with label-aware invalidation
planCache: NewQueryPlanCache(500), // Cache 500 parsed query plans
analyzer: NewQueryAnalyzer(1000), // Cache 1000 parsed query ASTs
nodeLookupCache: make(map[string]*storage.Node, 1000),
searchService: nil, // Lazy initialization - will be set via SetSearchService() to reuse DB's cached service
vectorRegistry: vectorspace.NewIndexRegistry(),
vectorIndexSpaces: make(map[string]vectorspace.VectorSpaceKey),
}
}
// SetDatabaseManager sets the database manager for system commands.
// When set, enables CREATE DATABASE, DROP DATABASE, and SHOW DATABASES commands.
//
// Example:
//
// executor := cypher.NewStorageExecutor(storage)
// executor.SetDatabaseManager(dbManager)
// // Now CREATE DATABASE, DROP DATABASE, SHOW DATABASES work
func (e *StorageExecutor) SetDatabaseManager(dbManager DatabaseManagerInterface) {
e.dbManager = dbManager
}
// SetEmbedder sets the query embedder for server-side embedding.
// When set, db.index.vector.queryNodes can accept string queries
// which are automatically embedded before search.
//
// Example:
//
// executor := cypher.NewStorageExecutor(storage)
// executor.SetEmbedder(embedder)
//
// // Now vector search accepts both:
// // CALL db.index.vector.queryNodes('idx', 10, [0.1, 0.2, ...]) // Vector
// // CALL db.index.vector.queryNodes('idx', 10, 'search query') // String (auto-embedded)
func (e *StorageExecutor) SetEmbedder(embedder QueryEmbedder) {
e.embedder = embedder
}
// SetSearchService sets the unified search service used by Cypher procedures.
// When set, db.index.vector.queryNodes will delegate to search.Service.
func (e *StorageExecutor) SetSearchService(svc *search.Service) {
e.searchService = svc
}
// SetVectorRegistry allows wiring a shared index registry (e.g., per database).
// Defaults to an internal registry when not set.
func (e *StorageExecutor) SetVectorRegistry(reg *vectorspace.IndexRegistry) {
if reg == nil {
reg = vectorspace.NewIndexRegistry()
}
e.vectorRegistry = reg
}
// GetVectorRegistry exposes the current registry (for tests and adapters).
func (e *StorageExecutor) GetVectorRegistry() *vectorspace.IndexRegistry {
return e.vectorRegistry
}
// GetEmbedder returns the query embedder if set.
// This allows copying the embedder to namespaced executors for GraphQL.
func (e *StorageExecutor) GetEmbedder() QueryEmbedder {
return e.embedder
}
// SetNodeMutatedCallback sets a callback that is invoked when nodes are created
// or mutated (CREATE, MERGE, SET, REMOVE, or procedures that update nodes).
// This allows the embed queue to be notified so embeddings can be (re)generated.
//
// Example:
//
// executor := cypher.NewStorageExecutor(storage)
// executor.SetNodeMutatedCallback(func(nodeID string) {
// embedQueue.Enqueue(nodeID)
// })
func (e *StorageExecutor) SetNodeMutatedCallback(cb NodeMutatedCallback) {
e.onNodeMutated = cb
}
// SetDefaultEmbeddingDimensions sets the default dimensions for vector indexes.
// This is used when CREATE VECTOR INDEX doesn't specify dimensions in OPTIONS.
func (e *StorageExecutor) SetDefaultEmbeddingDimensions(dims int) {
e.defaultEmbeddingDimensions = dims
}
// GetDefaultEmbeddingDimensions returns the configured default embedding dimensions.
// Returns 1024 as fallback if not configured.
func (e *StorageExecutor) GetDefaultEmbeddingDimensions() int {
return e.defaultEmbeddingDimensions
}
// notifyNodeMutated calls the onNodeMutated callback if set.
// Call after any node creation or mutation (CREATE, MERGE, SET, REMOVE) so the embed queue can re-process.
func (e *StorageExecutor) notifyNodeMutated(nodeID string) {
if e.onNodeMutated != nil {
e.onNodeMutated(nodeID)
}
}
// removeNodeFromSearch removes a node from the search service (vector/fulltext indexes).
// Call after successfully deleting a node via Cypher so embeddings are not left orphaned.
// nodeID may be prefixed (e.g. "nornic:xyz") or local ("xyz"); the search service expects local ID.
func (e *StorageExecutor) removeNodeFromSearch(nodeID string) {
if e.searchService == nil || nodeID == "" {
return
}
localID := nodeID
if _, unprefixed, ok := storage.ParseDatabasePrefix(nodeID); ok {
localID = unprefixed
}
_ = e.searchService.RemoveNode(storage.NodeID(localID))
}
// Flush persists all pending writes to storage.
// This implements FlushableExecutor for Bolt-level deferred commits.
func (e *StorageExecutor) Flush() error {
if asyncEngine, ok := e.storage.(*storage.AsyncEngine); ok {
return asyncEngine.Flush()
}
return nil
}
// SetDeferFlush enables/disables deferred flush mode.
// When enabled, writes are not auto-flushed - the Bolt layer calls Flush().
func (e *StorageExecutor) SetDeferFlush(enabled bool) {
e.deferFlush = enabled
}
// queryDeletesNodes returns true if the query deletes nodes.
// Returns false for relationship-only deletes (CREATE rel...DELETE rel pattern).
func queryDeletesNodes(query string) bool {
// DETACH DELETE always deletes nodes
if strings.Contains(strings.ToUpper(query), "DETACH DELETE") {
return true
}
// Relationship pattern (has -[...]-> or <-[...]-) with CREATE+DELETE = relationship delete only
if strings.Contains(query, "]->(") || strings.Contains(query, ")<-[") {
return false
}
return true
}
// Execute parses and executes a Cypher query with optional parameters.
//
// This is the main entry point for Cypher query execution. The method handles
// the complete query lifecycle: parsing, validation, parameter substitution,
// execution planning, and result formatting.
//
// Parameters:
// - ctx: Context for cancellation and timeouts
// - cypher: Cypher query string
// - params: Optional parameters for $param substitution
//
// Returns:
// - ExecuteResult with columns and rows
// - Error if query parsing or execution fails
//
// Example:
//
// // Simple query without parameters
// result, err := executor.Execute(ctx, "MATCH (n:Person) RETURN n.name", nil)
// if err != nil {
// log.Fatal(err)
// }
//
// // Parameterized query
// params := map[string]interface{}{
// "name": "Alice",
// "minAge": 25,
// }
// result, err = executor.Execute(ctx, `
// MATCH (n:Person {name: $name})
// WHERE n.age >= $minAge
// RETURN n.name, n.age
// `, params)
//
// // Process results
// fmt.Printf("Columns: %v\n", result.Columns)
// for _, row := range result.Rows {
// fmt.Printf("Row: %v\n", row)
// }
//
// Supported Query Types:
//
// Core Clauses:
// - MATCH: Pattern matching and traversal
// - OPTIONAL MATCH: Left outer joins (returns nulls for no matches)
// - CREATE: Node and relationship creation
// - MERGE: Upsert operations with ON CREATE SET / ON MATCH SET
// - DELETE / DETACH DELETE: Node and relationship deletion
// - SET: Property updates
// - REMOVE: Property and label removal
//
// Projection & Chaining:
// - RETURN: Result projection with expressions, aliases, aggregations
// - WITH: Query chaining and intermediate aggregation
// - UNWIND: List expansion into rows
//
// Filtering & Ordering:
// - WHERE: Filtering conditions (=, <>, <, >, <=, >=, IS NULL, IS NOT NULL, IN, CONTAINS, STARTS WITH, ENDS WITH, AND, OR, NOT)
// - ORDER BY: Result sorting (ASC/DESC)
// - SKIP / LIMIT: Pagination
//
// Aggregation Functions:
// - COUNT, SUM, AVG, MIN, MAX, COLLECT
//
// Procedures & Functions:
// - CALL: Procedure invocation (db.labels, db.propertyKeys, db.index.vector.*, etc.)
// - CALL {}: Subquery execution with UNION support
//
// Advanced:
// - UNION / UNION ALL: Query composition
// - FOREACH: Iterative updates
// - LOAD CSV: Data import
// - EXPLAIN / PROFILE: Query analysis
// - SHOW: Schema introspection
//
// Path Functions:
// - shortestPath / allShortestPaths
//
// Error Handling:
//
// Returns detailed error messages for syntax errors, type mismatches,
// and execution failures with Neo4j-compatible error codes.
func (e *StorageExecutor) Execute(ctx context.Context, cypher string, params map[string]interface{}) (*ExecuteResult, error) {
// Normalize query: trim BOM (some clients send it) then whitespace
cypher = trimBOM(cypher)
cypher = strings.TrimSpace(cypher)
if cypher == "" {
return nil, fmt.Errorf("empty query")
}
// Handle :USE command (Neo4j browser/shell compatibility)
// :USE database_name switches database context and should be stripped from query
// The actual database switching is handled at the API layer by checking context
if strings.HasPrefix(cypher, ":USE") || strings.HasPrefix(cypher, ":use") {
// Extract :USE command and remaining query
lines := strings.Split(cypher, "\n")
var remainingLines []string
useCommandFound := false
var useDatabaseName string
for _, line := range lines {
trimmed := strings.TrimSpace(line)
if !useCommandFound && (strings.HasPrefix(trimmed, ":USE") || strings.HasPrefix(trimmed, ":use")) {
useCommandFound = true
// Extract database name from :USE command
// Format: :USE database_name or :USE database_name (with whitespace)
parts := strings.Fields(trimmed)
if len(parts) >= 2 {
useDatabaseName = parts[1]
// Store database name in context for server to switch
ctx = context.WithValue(ctx, ctxKeyUseDatabase, useDatabaseName)
}
// Skip this line
continue
}
// Collect all other lines (including empty lines for formatting)
remainingLines = append(remainingLines, line)
}
if useCommandFound {
// Reconstruct query without :USE command
cypher = strings.Join(remainingLines, "\n")
cypher = strings.TrimSpace(cypher)
if cypher == "" {
// Only :USE command, no actual query - return success
return &ExecuteResult{
Columns: []string{"database"},
Rows: [][]interface{}{{"switched"}},
}, nil
}
}
} else if strings.HasPrefix(cypher, ":") {
// Starts with : but not :USE - return helpful error
return nil, fmt.Errorf("unknown command: %s (only :USE is supported)", strings.Split(cypher, "\n")[0])
}
// Validate basic syntax
if err := e.validateSyntax(cypher); err != nil {
return nil, err
}
// IMPORTANT: Do NOT substitute parameters before routing!
// We need to route the query based on the ORIGINAL query structure,
// not the substituted one. Otherwise, keywords inside parameter values
// (like 'MATCH (n) SET n.x = 1' stored as content) will be incorrectly
// detected as Cypher clauses.
//
// Parameter substitution happens AFTER routing, inside each handler.
// This matches Neo4j's architecture where params are kept separate.
// Store params in context for handlers to use
ctx = context.WithValue(ctx, paramsKey, params)
// Check query limits if storage engine supports it
// Uses interface{} to avoid importing multidb package (prevents circular dependencies)
var queryLimitCancel context.CancelFunc
if namespacedEngine, ok := e.storage.(interface {
GetQueryLimitChecker() interface {
CheckQueryRate() error
CheckQueryLimits(context.Context) (context.Context, context.CancelFunc, error)
}
}); ok {
if qlc := namespacedEngine.GetQueryLimitChecker(); qlc != nil {
// Check query rate limit
if err := qlc.CheckQueryRate(); err != nil {
return nil, err
}
// Check write rate limit for write queries
// We need to check this early, but we don't know if it's a write query yet
// So we'll check it in the write handlers too
// Apply query timeout and concurrent query limits
var err error
ctx, queryLimitCancel, err = qlc.CheckQueryLimits(ctx)
if err != nil {
return nil, err
}
// Ensure cancel is called when done
defer func() {
if queryLimitCancel != nil {
queryLimitCancel()
}
}()
}
}
// Analyze query - uses cached analysis if available
// This extracts query metadata (HasMatch, IsReadOnly, Labels, etc.) once
// and caches it for repeated queries, avoiding redundant string parsing
info := e.analyzer.Analyze(cypher)
// For routing, we still need upperQuery for some handlers
// TODO: Migrate handlers to use QueryInfo directly
upperQuery := strings.ToUpper(strings.TrimSpace(cypher))
// Try cache for read-only queries (using cached analysis)
if info.IsReadOnly && e.cache != nil {
if cached, found := e.cache.Get(cypher, params); found {
return cached, nil
}
}
// Check for transaction control statements FIRST
if result, err := e.parseTransactionStatement(cypher); result != nil || err != nil {
return result, err
}
// Check for EXPLAIN/PROFILE execution modes (using cached analysis)
if info.HasExplain {
_, innerQuery := parseExecutionMode(cypher)
return e.executeExplain(ctx, innerQuery)
}
if info.HasProfile {
_, innerQuery := parseExecutionMode(cypher)
return e.executeProfile(ctx, innerQuery)
}
// If in explicit transaction, execute within it
if e.txContext != nil && e.txContext.active {
return e.executeInTransaction(ctx, cypher, upperQuery)
}
// System commands (CREATE/DROP DATABASE, SHOW DATABASES, etc.) must not use the async engine
// or implicit transactions: they operate on dbManager/metadata, not graph storage.
// Routing them through executeWithoutTransaction directly ensures correct handling and
// avoids the write path (tryAsyncCreateNodeBatch / executeWithImplicitTransaction).
if isSystemCommandNoGraph(cypher) {
result, err := e.executeWithoutTransaction(ctx, cypher, upperQuery)
if err != nil {
return nil, err
}
return result, nil
}
// Auto-commit single query - use async path for performance
// This uses AsyncEngine's write-behind cache instead of synchronous disk I/O
// For strict ACID, users should use explicit BEGIN/COMMIT transactions
result, err := e.executeImplicitAsync(ctx, cypher, upperQuery)
// Apply result limit if set
if err == nil && result != nil {
if namespacedEngine, ok := e.storage.(interface {
GetQueryLimitChecker() interface {
GetQueryLimits() interface{}
}
}); ok {
if qlc := namespacedEngine.GetQueryLimitChecker(); qlc != nil {
if queryLimits := qlc.GetQueryLimits(); queryLimits != nil {
// Type assert to check if it has MaxResults field
// We use reflection-like approach: check if it's a struct with MaxResults
if limits, ok := queryLimits.(interface {
GetMaxResults() int64
}); ok {
if maxResults := limits.GetMaxResults(); maxResults > 0 && int64(len(result.Rows)) > maxResults {
// Truncate results to limit
result.Rows = result.Rows[:maxResults]
}
}
}
}
}
}
// Cache successful read-only queries.
//
// NOTE: Aggregation queries (COUNT/SUM/AVG/COLLECT/...) used to be excluded, but in practice they can still
// be expensive (edge scans, label scans, COLLECT materialization). Caching them is correctness-preserving as
// long as we invalidate on writes (which we do), so we cache them with a shorter TTL by default.
if err == nil && info.IsReadOnly && e.cache != nil && isCacheableReadQuery(cypher) {
// Determine TTL based on query type (using cached analysis)
ttl := 60 * time.Second // Default: 60s for data queries
if info.HasAggregation {
ttl = 1 * time.Second // Conservative TTL for aggregations
}
if info.HasCall || info.HasShow {
ttl = 300 * time.Second // 5 minutes for schema queries
}
e.cache.Put(cypher, params, result, ttl)
}
// Invalidate caches on write operations (using cached analysis)
if info.IsWriteQuery {
// Only invalidate node lookup cache when NODES are deleted
// Relationship-only deletes (like benchmark CREATE rel DELETE rel) don't affect node cache
if info.HasDelete && queryDeletesNodes(cypher) {
e.invalidateNodeLookupCache()
}
// Invalidate query result cache using cached labels
if e.cache != nil {
if len(info.Labels) > 0 {
e.cache.InvalidateLabels(info.Labels)
} else {
e.cache.Invalidate()
}
}
}
return result, err
}
// TransactionCapableEngine is an engine that supports ACID transactions.
// Used for type assertion to wrap implicit writes in rollback-capable transactions.
type TransactionCapableEngine interface {
BeginTransaction() (*storage.BadgerTransaction, error)
}
type implicitTxEngines struct {
txEngine TransactionCapableEngine
asyncEngine *storage.AsyncEngine
namespace string
}
func (e *StorageExecutor) resolveImplicitTxEngines() implicitTxEngines {
engine := e.storage
visited := make(map[storage.Engine]bool)
out := implicitTxEngines{}
for engine != nil && !visited[engine] {
visited[engine] = true
if out.namespace == "" {
if ns, ok := engine.(interface{ Namespace() string }); ok {
out.namespace = ns.Namespace()
}
}
if out.asyncEngine == nil {
if ae, ok := engine.(*storage.AsyncEngine); ok {
out.asyncEngine = ae
}
}
if out.txEngine == nil {
if tc, ok := engine.(TransactionCapableEngine); ok {
out.txEngine = tc
}
}
switch wrapper := engine.(type) {
case interface{ GetUnderlying() storage.Engine }:
engine = wrapper.GetUnderlying()
case interface{ GetEngine() storage.Engine }:
engine = wrapper.GetEngine()
case interface{ GetInnerEngine() storage.Engine }:
engine = wrapper.GetInnerEngine()
default:
engine = nil
}
}
return out
}
func (e *StorageExecutor) tryAsyncCreateNodeBatch(ctx context.Context, cypher string) (*ExecuteResult, error, bool) {
upper := strings.ToUpper(strings.TrimSpace(cypher))
if !strings.HasPrefix(upper, "CREATE") {
return nil, nil, false
}
// System commands and schema commands must not be handled here — route to executeSchemaCommand instead
if findMultiWordKeywordIndex(cypher, "CREATE", "DATABASE") == 0 ||
findMultiWordKeywordIndex(cypher, "CREATE", "COMPOSITE DATABASE") == 0 ||
findMultiWordKeywordIndex(cypher, "CREATE", "ALIAS") == 0 ||
findMultiWordKeywordIndex(cypher, "CREATE", "CONSTRAINT") == 0 ||
findMultiWordKeywordIndex(cypher, "CREATE", "INDEX") == 0 ||
findMultiWordKeywordIndex(cypher, "CREATE", "FULLTEXT") == 0 ||
findMultiWordKeywordIndex(cypher, "CREATE", "VECTOR") == 0 ||
findMultiWordKeywordIndex(cypher, "CREATE", "RANGE") == 0 {
return nil, nil, false
}
for _, keyword := range []string{
"MATCH",
"MERGE",
"SET",
"DELETE",
"DETACH",
"REMOVE",
"WITH",
"CALL",
"UNWIND",
"FOREACH",
"LOAD",
"OPTIONAL",
} {
if containsKeywordOutsideStrings(cypher, keyword) {
return nil, nil, false
}
}
// Substitute parameters before parsing so (n:Label $props) becomes (n:Label { ... })
// and the label is not mis-parsed as "Label $props".
if params := getParamsFromContext(ctx); params != nil {
cypher = e.substituteParams(cypher, params)
}
returnIdx := findKeywordIndex(cypher, "RETURN")
createPart := cypher
if returnIdx > 0 {
createPart = strings.TrimSpace(cypher[:returnIdx])
}
createClauses := createKeywordPattern.Split(createPart, -1)
if len(createClauses) == 0 {
return nil, nil, false
}
var nodePatterns []string
for _, clause := range createClauses {
clause = strings.TrimSpace(clause)
if clause == "" {
continue
}
patterns := e.splitCreatePatterns(clause)
for _, pat := range patterns {
pat = strings.TrimSpace(pat)
if pat == "" {
continue
}
if containsOutsideStrings(pat, "->") ||
containsOutsideStrings(pat, "<-") ||
containsOutsideStrings(pat, "]-") ||
containsOutsideStrings(pat, "-[") {
return nil, nil, false
}
nodePatterns = append(nodePatterns, pat)
}
}
if len(nodePatterns) == 0 {
return nil, nil, false
}
result := &ExecuteResult{
Columns: []string{},
Rows: [][]interface{}{},
Stats: &QueryStats{},
}
createdNodes := make(map[string]*storage.Node)
nodes := make([]*storage.Node, 0, len(nodePatterns))
for _, nodePatternStr := range nodePatterns {
nodePattern := e.parseNodePattern(nodePatternStr)
for _, label := range nodePattern.labels {
if !isValidIdentifier(label) {
return nil, fmt.Errorf("invalid label name: %q (must be alphanumeric starting with letter or underscore)", label), true
}
if containsReservedKeyword(label) {
return nil, fmt.Errorf("invalid label name: %q (contains reserved keyword)", label), true
}
}
for key, val := range nodePattern.properties {
if !isValidIdentifier(key) {
return nil, fmt.Errorf("invalid property key: %q (must be alphanumeric starting with letter or underscore)", key), true
}
if _, ok := val.(invalidPropertyValue); ok {
return nil, fmt.Errorf("invalid property value for key %q: malformed syntax", key), true
}
}
node := &storage.Node{
ID: storage.NodeID(e.generateID()),
Labels: nodePattern.labels,
Properties: nodePattern.properties,
}
nodes = append(nodes, node)
if nodePattern.variable != "" {
createdNodes[nodePattern.variable] = node
}
}
store := e.getStorage(ctx)
if err := store.BulkCreateNodes(nodes); err != nil {
return nil, err, true
}
for _, node := range nodes {
e.notifyNodeMutated(string(node.ID))
}
result.Stats.NodesCreated += len(nodes)
if returnIdx > 0 {
returnPart := strings.TrimSpace(cypher[returnIdx+6:])
returnItems := e.parseReturnItems(returnPart)
result.Columns = make([]string, len(returnItems))
row := make([]interface{}, len(returnItems))
for i, item := range returnItems {
if item.alias != "" {
result.Columns[i] = item.alias
} else {
result.Columns[i] = item.expr
}
for variable, node := range createdNodes {
if strings.HasPrefix(item.expr, variable) || item.expr == variable {
row[i] = e.resolveReturnItem(item, variable, node)
break
}
}
if row[i] == nil {
if varName := extractVariableNameFromReturnItem(item.expr); varName != "" {
if node, ok := createdNodes[varName]; ok {
row[i] = e.resolveReturnItem(item, varName, node)
}
}
}
}
result.Rows = [][]interface{}{row}
}
return result, nil, true
}
// executeImplicitAsync executes a single query using implicit transactions for writes.
// For write operations, wraps execution in an implicit transaction that can be
// rolled back on error, preventing partial data corruption from failed queries.
// For strict ACID guarantees with durability, use explicit BEGIN/COMMIT transactions.
func (e *StorageExecutor) executeImplicitAsync(ctx context.Context, cypher string, upperQuery string) (*ExecuteResult, error) {
// Check if this is a write operation using cached analysis
info := e.analyzer.Analyze(cypher)
isWrite := info.IsWriteQuery
// For write operations, use implicit transaction for atomicity
// This ensures partial writes are rolled back on error
if isWrite {
engines := e.resolveImplicitTxEngines()
if engines.asyncEngine != nil {
if result, err, handled := e.tryAsyncCreateNodeBatch(ctx, cypher); handled {
return result, err
}
}
return e.executeWithImplicitTransaction(ctx, cypher, upperQuery)
}
// Read-only operations don't need transaction wrapping
return e.executeWithoutTransaction(ctx, cypher, upperQuery)
}
// executeWithImplicitTransaction wraps a write query in an implicit transaction.
// If any part of the query fails, all changes are rolled back atomically.
// This prevents data corruption from partially executed queries.
func (e *StorageExecutor) executeWithImplicitTransaction(ctx context.Context, cypher string, upperQuery string) (*ExecuteResult, error) {
// Try to get a transaction-capable engine and async wrapper (if present)
engines := e.resolveImplicitTxEngines()
txEngine := engines.txEngine
asyncEngine := engines.asyncEngine
// If no transaction support, fall back to direct execution (legacy mode)
// This is less safe but maintains backward compatibility
if txEngine == nil {
result, err := e.executeWithoutTransaction(ctx, cypher, upperQuery)
if err != nil {
return nil, err
}
// Flush if needed
if !e.deferFlush {
if asyncEngine != nil {
asyncEngine.Flush()
}
}
return result, nil
}
// IMPORTANT: If using AsyncEngine with pending writes, flush its cache BEFORE