Skip to content

Commit 8892212

Browse files
committed
fixing performance issues related to streaming nodes and counting for LIMIT queries as well
1 parent 7c4ceab commit 8892212

File tree

12 files changed

+1455
-110
lines changed

12 files changed

+1455
-110
lines changed

macos/MenuBarApp/NornicDBMenuBar.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1463,7 +1463,7 @@ struct SettingsView: View {
14631463
<string>\(config.hostAddress)</string>
14641464
<key>NORNICDB_EMBEDDING_ENABLED</key>
14651465
<string>\(config.embeddingsEnabled ? "true" : "false")</string>
1466-
<key>NORNICDB_KMEANS_ENABLED</key>
1466+
<key>NORNICDB_KMEANS_CLUSTERING_ENABLED</key>
14671467
<string>\(config.kmeansEnabled ? "true" : "false")</string>
14681468
<key>NORNICDB_AUTO_TLP_ENABLED</key>
14691469
<string>\(config.autoTLPEnabled ? "true" : "false")</string>
@@ -2278,7 +2278,7 @@ struct FirstRunWizard: View {
22782278
<string>\(config.hostAddress)</string>
22792279
<key>NORNICDB_EMBEDDING_ENABLED</key>
22802280
<string>\(config.embeddingsEnabled ? "true" : "false")</string>
2281-
<key>NORNICDB_KMEANS_ENABLED</key>
2281+
<key>NORNICDB_KMEANS_CLUSTERING_ENABLED</key>
22822282
<string>\(config.kmeansEnabled ? "true" : "false")</string>
22832283
<key>NORNICDB_AUTO_TLP_ENABLED</key>
22842284
<string>\(config.autoTLPEnabled ? "true" : "false")</string>

pkg/config/config.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -846,6 +846,18 @@ func legacyLoadFromEnv() *Config {
846846
config.Features.TopologyABTestEnabled = getEnvBool("NORNICDB_TOPOLOGY_AB_TEST_ENABLED", false)
847847
config.Features.TopologyABTestPercentage = getEnvInt("NORNICDB_TOPOLOGY_AB_TEST_PERCENTAGE", 50)
848848

849+
// K-Means clustering (maps to Kalman filtering)
850+
// Env: NORNICDB_KMEANS_CLUSTERING_ENABLED (default: false)
851+
if v := os.Getenv("NORNICDB_KMEANS_CLUSTERING_ENABLED"); v != "" {
852+
config.Features.KalmanEnabled = v == "true" || v == "1"
853+
}
854+
855+
// Auto-TLP (Temporal Link Prediction)
856+
// Env: NORNICDB_AUTO_TLP_ENABLED (default: false)
857+
if v := os.Getenv("NORNICDB_AUTO_TLP_ENABLED"); v != "" {
858+
config.Features.TopologyAutoIntegrationEnabled = v == "true" || v == "1"
859+
}
860+
849861
// Heimdall - the cognitive guardian feature flags
850862
// Opt-in cognitive database features - only override if env var is explicitly set
851863
if v := os.Getenv("NORNICDB_HEIMDALL_ENABLED"); v != "" {
@@ -1620,6 +1632,14 @@ func applyEnvVars(config *Config) {
16201632
if v := getEnvInt("NORNICDB_TOPOLOGY_AB_TEST_PERCENTAGE", 0); v > 0 {
16211633
config.Features.TopologyABTestPercentage = v
16221634
}
1635+
// K-Means clustering (maps to Kalman filtering) - used by macOS menu bar app
1636+
if getEnv("NORNICDB_KMEANS_CLUSTERING_ENABLED", "") == "true" {
1637+
config.Features.KalmanEnabled = true
1638+
}
1639+
// Auto-TLP (Temporal Link Prediction) - used by macOS menu bar app
1640+
if getEnv("NORNICDB_AUTO_TLP_ENABLED", "") == "true" {
1641+
config.Features.TopologyAutoIntegrationEnabled = true
1642+
}
16231643
if getEnv("NORNICDB_HEIMDALL_ENABLED", "") == "true" {
16241644
config.Features.HeimdallEnabled = true
16251645
}

pkg/cypher/match.go

Lines changed: 148 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -246,9 +246,10 @@ func (e *StorageExecutor) executeMatch(ctx context.Context, cypher string) (*Exe
246246
}
247247

248248
// Extract pattern between MATCH and WHERE/RETURN
249+
whereIdx := findKeywordNotInBrackets(upper, " WHERE ")
249250
// Use findKeywordNotInBrackets to avoid matching WHERE inside list comprehensions like [x WHERE ...]
250251
matchPart := cypher[5:] // Skip "MATCH"
251-
whereIdx := findKeywordNotInBrackets(upper, " WHERE ")
252+
// Note: whereIdx already defined above for fast-path count optimization
252253
if whereIdx > 0 {
253254
matchPart = cypher[5:whereIdx]
254255
} else if returnIdx > 0 {
@@ -269,22 +270,78 @@ func (e *StorageExecutor) executeMatch(ctx context.Context, cypher string) (*Exe
269270
// Parse node pattern
270271
nodePattern := e.parseNodePattern(matchPart)
271272

272-
// Get matching nodes
273-
var nodes []*storage.Node
274-
var err error
273+
// FAST PATH: For simple node count queries like "MATCH (n) RETURN count(n)" or "MATCH (n:Label) RETURN count(n)"
274+
// Use O(1) NodeCount() instead of loading all nodes into memory.
275+
// This optimization ONLY applies to simple node patterns (not relationships - those are handled above)
276+
if hasAggregation && whereIdx == -1 && len(returnItems) == 1 {
277+
upperExpr := strings.ToUpper(strings.TrimSpace(returnItems[0].expr))
278+
// Check for COUNT(*) or COUNT(variable) - not COUNT(n.property)
279+
if strings.HasPrefix(upperExpr, "COUNT(") && strings.HasSuffix(upperExpr, ")") {
280+
inner := strings.TrimSpace(upperExpr[6 : len(upperExpr)-1])
281+
// COUNT(*) or COUNT(n) where n is any variable - just count all nodes
282+
if inner == "*" || !strings.Contains(inner, ".") {
283+
var count int64
284+
var err error
285+
if len(nodePattern.labels) > 0 {
286+
// Count nodes with specific label
287+
nodes, err := e.storage.GetNodesByLabel(nodePattern.labels[0])
288+
if err != nil {
289+
return nil, fmt.Errorf("storage error: %w", err)
290+
}
291+
count = int64(len(nodes))
292+
} else {
293+
// Count all nodes - use O(1) NodeCount()
294+
count, err = e.storage.NodeCount()
295+
if err != nil {
296+
return nil, fmt.Errorf("storage error: %w", err)
297+
}
298+
}
275299

276-
if len(nodePattern.labels) > 0 {
277-
nodes, err = e.storage.GetNodesByLabel(nodePattern.labels[0])
278-
} else {
279-
nodes, err = e.storage.AllNodes()
300+
// Return result directly
301+
result.Rows = [][]interface{}{{count}}
302+
return result, nil
303+
}
304+
}
280305
}
281-
if err != nil {
282-
return nil, fmt.Errorf("storage error: %w", err)
306+
307+
// Parse SKIP and LIMIT early for streaming optimization
308+
// Note: We can only use early termination when there's NO WHERE clause
309+
// because WHERE filtering happens after loading nodes
310+
skipIdx := findKeywordIndex(cypher, "SKIP")
311+
skip := 0
312+
if skipIdx > 0 {
313+
skipPart := strings.TrimSpace(cypher[skipIdx+4:])
314+
if fields := strings.Fields(skipPart); len(fields) > 0 {
315+
if s, err := strconv.Atoi(fields[0]); err == nil {
316+
skip = s
317+
}
318+
}
283319
}
284320

285-
// Apply property filter from MATCH pattern (e.g., {name: 'Alice'})
286-
if len(nodePattern.properties) > 0 {
287-
nodes = e.filterNodesByProperties(nodes, nodePattern.properties)
321+
limitIdx := findKeywordIndex(cypher, "LIMIT")
322+
limit := -1
323+
if limitIdx > 0 {
324+
limitPart := strings.TrimSpace(cypher[limitIdx+5:])
325+
if fields := strings.Fields(limitPart); len(fields) > 0 {
326+
if l, err := strconv.Atoi(fields[0]); err == nil {
327+
limit = l
328+
}
329+
}
330+
}
331+
332+
// Calculate streaming limit: need to load enough nodes for SKIP + LIMIT
333+
// Only use streaming optimization when there's NO WHERE clause, NO ORDER BY, and NO aggregation
334+
// (filtering and sorting invalidate early termination since they need all nodes)
335+
hasOrderBy := findKeywordIndex(cypher, "ORDER") > 0
336+
streamingLimit := -1
337+
if whereIdx == -1 && !hasOrderBy && !hasAggregation && limit > 0 {
338+
streamingLimit = skip + limit
339+
}
340+
341+
// Get matching nodes using streaming optimization when possible
342+
nodes, err := e.collectNodesWithStreaming(ctx, nodePattern.labels, nodePattern.properties, streamingLimit)
343+
if err != nil {
344+
return nil, fmt.Errorf("storage error: %w", err)
288345
}
289346

290347
// Apply WHERE filter if present
@@ -382,29 +439,7 @@ func (e *StorageExecutor) executeMatch(ctx context.Context, cypher string) (*Exe
382439
nodes = e.orderNodes(nodes, nodePattern.variable, orderExpr)
383440
}
384441

385-
// Parse SKIP (whitespace-tolerant)
386-
skipIdx := findKeywordIndex(cypher, "SKIP")
387-
skip := 0
388-
if skipIdx > 0 {
389-
skipPart := strings.TrimSpace(cypher[skipIdx+4:])
390-
if fields := strings.Fields(skipPart); len(fields) > 0 {
391-
if s, err := strconv.Atoi(fields[0]); err == nil {
392-
skip = s
393-
}
394-
}
395-
}
396-
397-
// Parse LIMIT (whitespace-tolerant)
398-
limitIdx := findKeywordIndex(cypher, "LIMIT")
399-
limit := -1
400-
if limitIdx > 0 {
401-
limitPart := strings.TrimSpace(cypher[limitIdx+5:])
402-
if fields := strings.Fields(limitPart); len(fields) > 0 {
403-
if l, err := strconv.Atoi(fields[0]); err == nil {
404-
limit = l
405-
}
406-
}
407-
}
442+
// Note: skipIdx, skip, limitIdx and limit are already parsed earlier for streaming optimization
408443

409444
// Build result rows with SKIP and LIMIT
410445
seen := make(map[string]bool) // For DISTINCT
@@ -3706,4 +3741,81 @@ func (e *StorageExecutor) resolveBindingItem(item returnItem, b binding) interfa
37063741
return nil
37073742
}
37083743

3744+
// collectNodesWithStreaming efficiently collects nodes from storage using streaming when possible.
3745+
// This avoids loading all nodes into memory, which is critical for performance with large datasets.
3746+
//
3747+
// Parameters:
3748+
// - ctx: Context for cancellation
3749+
// - labels: Optional label filter (only nodes with this label)
3750+
// - properties: Optional property filters
3751+
// - limit: Maximum number of nodes to collect (-1 for unlimited)
3752+
//
3753+
// Returns collected nodes or error.
3754+
func (e *StorageExecutor) collectNodesWithStreaming(
3755+
ctx context.Context,
3756+
labels []string,
3757+
properties map[string]interface{},
3758+
limit int,
3759+
) ([]*storage.Node, error) {
3760+
// Determine if we can use streaming optimization
3761+
canStream := len(properties) == 0 // Can't filter properties inline yet
3762+
3763+
var nodes []*storage.Node
3764+
var err error
3765+
3766+
if canStream && limit > 0 {
3767+
// Use streaming with early termination for LIMIT queries
3768+
nodes = make([]*storage.Node, 0, limit)
3769+
if streamer, ok := e.storage.(storage.StreamingEngine); ok {
3770+
err = streamer.StreamNodes(ctx, func(node *storage.Node) error {
3771+
// Check label filter
3772+
if len(labels) > 0 {
3773+
hasLabel := false
3774+
for _, nodeLabel := range node.Labels {
3775+
if nodeLabel == labels[0] {
3776+
hasLabel = true
3777+
break
3778+
}
3779+
}
3780+
if !hasLabel {
3781+
return nil // Skip this node
3782+
}
3783+
}
3784+
3785+
nodes = append(nodes, node)
3786+
if len(nodes) >= limit {
3787+
return storage.ErrIterationStopped // Early termination
3788+
}
3789+
return nil
3790+
})
3791+
// ErrIterationStopped is expected
3792+
if err == storage.ErrIterationStopped {
3793+
err = nil
3794+
}
3795+
if err != nil {
3796+
return nil, err
3797+
}
3798+
return nodes, nil
3799+
}
3800+
// Fall through to standard path if streaming not supported
3801+
}
3802+
3803+
// Standard path: load all nodes then filter
3804+
if len(labels) > 0 {
3805+
nodes, err = e.storage.GetNodesByLabel(labels[0])
3806+
} else {
3807+
nodes, err = e.storage.AllNodes()
3808+
}
3809+
if err != nil {
3810+
return nil, err
3811+
}
3812+
3813+
// Apply property filters
3814+
if len(properties) > 0 {
3815+
nodes = e.filterNodesByProperties(nodes, properties)
3816+
}
3817+
3818+
return nodes, nil
3819+
}
3820+
37093821
// executeCreate handles CREATE queries.

0 commit comments

Comments
 (0)