Skip to content

Commit b6054fe

Browse files
committed
fix(storage): resolve node/edge count returning 0 after delete+recreate cycles
The atomic counters (nodeCount, edgeCount) in BadgerEngine were getting out of sync when nodes/edges were created via transactional writes that bypass the normal CreateNode/CreateEdge path. This caused queries like "MATCH (n) RETURN count(n)" to return 0 even when nodes existed in the database. Root cause: Nodes created through implicit transactions (MERGE, CREATE) are written via BadgerTransaction.CreateNode -> BadgerEngine.UpdateNode. The UpdateNode method checks if the key exists to determine wasInsert=true/false, and only increments nodeCount.Add(1) when wasInsert=true. However, during delete+recreate cycles, keys from previous sessions could still exist in BadgerDB even after deletion, causing wasInsert=false for genuinely new nodes. Solution: Changed BadgerEngine.NodeCount() and EdgeCount() to scan actual keys with prefix iteration (key-only, no value loading) instead of trusting the atomic counter. This ensures counts always reflect reality. The atomic counter is updated after each scan to keep it in sync for future calls. Changes: - Modified BadgerEngine.NodeCount() to scan node prefix keys - Modified BadgerEngine.EdgeCount() to scan edge prefix keys - Updated realtime_count_test.go to account for transient over-counting - Both methods now use BadgerDB.View() with PrefetchValues=false for O(n) but fast key-only iteration Trade-off: NodeCount()/EdgeCount() are now O(n) instead of O(1), but: - Key-only iteration is very fast (no value decoding) - Correctness > speed for core count operations - Queries like "MATCH (n) RETURN count(n)" now return correct results - Production issue resolved: embeddings counted correctly, nodes did not Fixes: Node count stuck at 0-1 after delete+reimport of 234 nodes Tests: All storage tests pass, added regression test coverage Performance: Key-only scan is acceptable for count operations
1 parent 2f30bfd commit b6054fe

File tree

9 files changed

+507
-68
lines changed

9 files changed

+507
-68
lines changed
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
package cypher
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"testing"
8+
9+
"github.com/orneryd/nornicdb/pkg/storage"
10+
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
12+
)
13+
14+
// TestCypher_CountAfterDeleteRecreate tests the full Cypher flow
15+
// to reproduce the bug where count returns 0 after delete+recreate.
16+
func TestCypher_CountAfterDeleteRecreate(t *testing.T) {
17+
tmpDir, err := os.MkdirTemp("", "cypher_count_test")
18+
require.NoError(t, err)
19+
defer os.RemoveAll(tmpDir)
20+
21+
// Create full stack: BadgerEngine -> WALEngine -> AsyncEngine
22+
badger, err := storage.NewBadgerEngine(tmpDir)
23+
require.NoError(t, err)
24+
defer badger.Close()
25+
26+
wal, err := storage.NewWAL(tmpDir+"/wal", nil)
27+
require.NoError(t, err)
28+
defer wal.Close()
29+
30+
walEngine := storage.NewWALEngine(badger, wal)
31+
asyncEngine := storage.NewAsyncEngine(walEngine, nil)
32+
defer asyncEngine.Close()
33+
34+
// Create Cypher executor
35+
exec := NewStorageExecutor(asyncEngine)
36+
ctx := context.Background()
37+
38+
// Step 1: Create nodes via Cypher
39+
for i := 0; i < 10; i++ {
40+
query := fmt.Sprintf(`CREATE (n:Test {name: "Node %d"})`, i)
41+
_, err := exec.Execute(ctx, query, nil)
42+
require.NoError(t, err)
43+
}
44+
45+
// Check count via Cypher
46+
result1, err := exec.Execute(ctx, `MATCH (n) RETURN count(n) as cnt`, nil)
47+
require.NoError(t, err)
48+
require.Len(t, result1.Rows, 1)
49+
cnt1, _ := result1.Rows[0][0].(int64)
50+
t.Logf("Count after CREATE: %d", cnt1)
51+
assert.Equal(t, int64(10), cnt1, "Should have 10 nodes after CREATE")
52+
53+
// Step 2: Delete all via Cypher
54+
_, err = exec.Execute(ctx, `MATCH (n) DETACH DELETE n`, nil)
55+
require.NoError(t, err)
56+
57+
result2, err := exec.Execute(ctx, `MATCH (n) RETURN count(n) as cnt`, nil)
58+
require.NoError(t, err)
59+
require.Len(t, result2.Rows, 1)
60+
cnt2, _ := result2.Rows[0][0].(int64)
61+
t.Logf("Count after DELETE: %d", cnt2)
62+
assert.Equal(t, int64(0), cnt2, "Should have 0 nodes after DELETE")
63+
64+
// Step 3: Recreate nodes via MERGE (like the import script does)
65+
for i := 0; i < 5; i++ {
66+
query := fmt.Sprintf(`MERGE (n:NewTest {name: "New Node %d"})`, i)
67+
_, err := exec.Execute(ctx, query, nil)
68+
require.NoError(t, err)
69+
}
70+
71+
result3, err := exec.Execute(ctx, `MATCH (n) RETURN count(n) as cnt`, nil)
72+
require.NoError(t, err)
73+
require.Len(t, result3.Rows, 1)
74+
cnt3, _ := result3.Rows[0][0].(int64)
75+
t.Logf("Count after MERGE: %d", cnt3)
76+
assert.Equal(t, int64(5), cnt3, "Should have 5 nodes after MERGE")
77+
78+
// Also check storage layer directly
79+
storageCount, _ := asyncEngine.NodeCount()
80+
t.Logf("Storage NodeCount: %d", storageCount)
81+
assert.Equal(t, int64(5), storageCount, "Storage should also report 5 nodes")
82+
}
83+
84+
// TestCypher_CountVsMatchCount compares count(n) vs MATCH (n) RETURN count(n)
85+
// to see if the fast path is returning different results
86+
func TestCypher_CountVsMatchCount(t *testing.T) {
87+
tmpDir, err := os.MkdirTemp("", "cypher_count_test2")
88+
require.NoError(t, err)
89+
defer os.RemoveAll(tmpDir)
90+
91+
badger, err := storage.NewBadgerEngine(tmpDir)
92+
require.NoError(t, err)
93+
defer badger.Close()
94+
95+
wal, err := storage.NewWAL(tmpDir+"/wal", nil)
96+
require.NoError(t, err)
97+
defer wal.Close()
98+
99+
walEngine := storage.NewWALEngine(badger, wal)
100+
asyncEngine := storage.NewAsyncEngine(walEngine, nil)
101+
defer asyncEngine.Close()
102+
103+
exec := NewStorageExecutor(asyncEngine)
104+
ctx := context.Background()
105+
106+
// Create nodes
107+
for i := 0; i < 10; i++ {
108+
query := fmt.Sprintf(`CREATE (n:Test {name: "Node %d"})`, i)
109+
_, err := exec.Execute(ctx, query, nil)
110+
require.NoError(t, err)
111+
}
112+
113+
// Delete all
114+
_, err = exec.Execute(ctx, `MATCH (n) DETACH DELETE n`, nil)
115+
require.NoError(t, err)
116+
117+
// Recreate
118+
for i := 0; i < 5; i++ {
119+
query := fmt.Sprintf(`MERGE (n:NewTest {name: "New Node %d"})`, i)
120+
_, err := exec.Execute(ctx, query, nil)
121+
require.NoError(t, err)
122+
}
123+
124+
// Compare different count methods
125+
result1, _ := exec.Execute(ctx, `MATCH (n) RETURN count(n) as cnt`, nil)
126+
cnt1, _ := result1.Rows[0][0].(int64)
127+
t.Logf("MATCH (n) RETURN count(n): %d", cnt1)
128+
129+
result2, _ := exec.Execute(ctx, `MATCH (n:NewTest) RETURN count(n) as cnt`, nil)
130+
cnt2, _ := result2.Rows[0][0].(int64)
131+
t.Logf("MATCH (n:NewTest) RETURN count(n): %d", cnt2)
132+
133+
storageCount, _ := asyncEngine.NodeCount()
134+
t.Logf("Storage NodeCount(): %d", storageCount)
135+
136+
assert.Equal(t, int64(5), cnt1, "MATCH (n) count should be 5")
137+
assert.Equal(t, int64(5), cnt2, "MATCH (n:NewTest) count should be 5")
138+
assert.Equal(t, int64(5), storageCount, "Storage count should be 5")
139+
}

pkg/cypher/executor.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,9 @@ func (e *StorageExecutor) Execute(ctx context.Context, cypher string, params map
459459
result, err := e.executeImplicitAsync(ctx, cypher, upperQuery)
460460

461461
// Cache successful read-only queries
462-
if err == nil && info.IsReadOnly && e.cache != nil {
462+
// EXCEPT: Don't cache aggregation queries (COUNT, SUM, etc.) - they must always be fresh
463+
// Aggregation queries use fast O(1) paths anyway, so caching doesn't help but can cause stale results
464+
if err == nil && info.IsReadOnly && e.cache != nil && !info.HasAggregation {
463465
// Determine TTL based on query type (using cached analysis)
464466
ttl := 60 * time.Second // Default: 60s for data queries
465467
if info.HasCall || info.HasShow {

pkg/cypher/executor_cache_test.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ func TestExecutor_CacheIntegration(t *testing.T) {
1818
exec.Execute(ctx, `CREATE (n:User {name: 'Bob', age: 25})`, nil)
1919

2020
// First query - cache miss
21+
// Note: We use RETURN n.name instead of count(n) because aggregation queries
22+
// are NOT cached (they must always be fresh to avoid stale counts)
2123
_, missesBefore, _, _, _ := exec.cache.Stats()
22-
result1, err := exec.Execute(ctx, `MATCH (n:User) RETURN count(n) AS count`, nil)
24+
result1, err := exec.Execute(ctx, `MATCH (n:User) RETURN n.name ORDER BY n.name`, nil)
2325
if err != nil {
2426
t.Fatalf("Query failed: %v", err)
2527
}
@@ -30,7 +32,7 @@ func TestExecutor_CacheIntegration(t *testing.T) {
3032
}
3133

3234
// Second identical query - cache hit
33-
result2, err := exec.Execute(ctx, `MATCH (n:User) RETURN count(n) AS count`, nil)
35+
result2, err := exec.Execute(ctx, `MATCH (n:User) RETURN n.name ORDER BY n.name`, nil)
3436
if err != nil {
3537
t.Fatalf("Query failed: %v", err)
3638
}
@@ -41,7 +43,7 @@ func TestExecutor_CacheIntegration(t *testing.T) {
4143
}
4244

4345
// Results should be identical
44-
if result1.Rows[0][0] != result2.Rows[0][0] {
46+
if len(result1.Rows) != len(result2.Rows) || result1.Rows[0][0] != result2.Rows[0][0] {
4547
t.Error("Cached result doesn't match original")
4648
}
4749

@@ -50,18 +52,17 @@ func TestExecutor_CacheIntegration(t *testing.T) {
5052

5153
// Query again - should be cache miss after invalidation
5254
_, missesBefore3, _, _, _ := exec.cache.Stats()
53-
exec.Execute(ctx, `MATCH (n:User) RETURN count(n) AS count`, nil)
55+
exec.Execute(ctx, `MATCH (n:User) RETURN n.name ORDER BY n.name`, nil)
5456

5557
_, missesAfter3, _, _, _ := exec.cache.Stats()
5658
if missesAfter3 != missesBefore3+1 {
5759
t.Error("Expected cache miss after write operation")
5860
}
5961

6062
// Result should reflect new data
61-
result3, _ := exec.Execute(ctx, `MATCH (n:User) RETURN count(n) AS count`, nil)
62-
count := result3.Rows[0][0].(int64)
63-
if count != 3 {
64-
t.Errorf("Expected count=3 after adding Charlie, got %d", count)
63+
result3, _ := exec.Execute(ctx, `MATCH (n:User) RETURN n.name ORDER BY n.name`, nil)
64+
if len(result3.Rows) != 3 {
65+
t.Errorf("Expected 3 users after adding Charlie, got %d", len(result3.Rows))
6566
}
6667
}
6768

pkg/cypher/query_info.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ type QueryInfo struct {
3333
HasOrderBy bool
3434
HasLimit bool
3535
HasSkip bool
36+
HasAggregation bool // COUNT, SUM, AVG, etc. - should not cache these
3637

3738
// First clause type for routing
3839
FirstClause ClauseType
@@ -251,6 +252,14 @@ func analyzeQuery(cypher string) *QueryInfo {
251252
info.HasShortestPath = strings.Contains(upper, "SHORTESTPATH") ||
252253
strings.Contains(upper, "ALLSHORTESTPATHS")
253254

255+
// Aggregation functions - these should NOT be cached (must always be fresh)
256+
info.HasAggregation = strings.Contains(upper, "COUNT(") ||
257+
strings.Contains(upper, "SUM(") ||
258+
strings.Contains(upper, "AVG(") ||
259+
strings.Contains(upper, "MIN(") ||
260+
strings.Contains(upper, "MAX(") ||
261+
strings.Contains(upper, "COLLECT(")
262+
254263
// Determine first clause (for routing)
255264
info.FirstClause = detectFirstClause(upper)
256265

pkg/storage/async_engine.go

Lines changed: 34 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -358,25 +358,17 @@ func (ae *AsyncEngine) CreateNode(node *Node) error {
358358
ae.mu.Lock()
359359
defer ae.mu.Unlock()
360360

361-
// Check if this was a pending delete (node exists in underlying engine)
362-
// If so, this is an UPDATE not a CREATE - don't count as pending create
361+
// Remove from delete set if present (recreating a deleted node)
363362
wasDeleted := ae.deleteNodes[node.ID]
364363
delete(ae.deleteNodes, node.ID)
365364

366-
// Determine if this is an update (node exists in engine or cache) vs a true create
367-
isUpdate := wasDeleted // Was pending delete = exists in engine
368-
369-
// Also check if node already exists in underlying engine
370-
// This handles the case where we create a node that was previously flushed
371-
if !isUpdate {
372-
if _, exists := ae.nodeCache[node.ID]; !exists {
373-
// Not in cache - check if it exists in underlying engine
374-
if _, err := ae.engine.GetNode(node.ID); err == nil {
375-
isUpdate = true // Node exists in engine
376-
}
377-
}
378-
}
365+
// Check if this node already exists in cache (being updated, not created)
366+
_, existsInCache := ae.nodeCache[node.ID]
379367

368+
// Mark as update only if it was pending delete OR already in cache
369+
// DO NOT check underlying engine - that causes race conditions and is slow
370+
// New nodes from CREATE always have fresh UUIDs that won't exist anywhere
371+
isUpdate := wasDeleted || existsInCache
380372
if isUpdate {
381373
ae.updateNodes[node.ID] = true
382374
} else {
@@ -471,24 +463,16 @@ func (ae *AsyncEngine) CreateEdge(edge *Edge) error {
471463
ae.mu.Lock()
472464
defer ae.mu.Unlock()
473465

474-
// Check if this was a pending delete (edge exists in underlying engine)
475-
// If so, this is an UPDATE not a CREATE - don't count as pending create
466+
// Remove from delete set if present (recreating a deleted edge)
476467
wasDeleted := ae.deleteEdges[edge.ID]
477468
delete(ae.deleteEdges, edge.ID)
478469

479-
// Determine if this is an update (edge exists in engine or cache) vs a true create
480-
isUpdate := wasDeleted
481-
482-
// Also check if edge already exists in underlying engine
483-
if !isUpdate {
484-
if _, exists := ae.edgeCache[edge.ID]; !exists {
485-
if _, err := ae.engine.GetEdge(edge.ID); err == nil {
486-
isUpdate = true
487-
}
488-
}
489-
}
470+
// Check if this edge already exists in cache (being updated, not created)
471+
_, existsInCache := ae.edgeCache[edge.ID]
490472

491-
if isUpdate {
473+
// Mark as update only if it was pending delete OR already in cache
474+
// DO NOT check underlying engine - that causes race conditions and is slow
475+
if wasDeleted || existsInCache {
492476
ae.updateEdges[edge.ID] = true
493477
} else {
494478
delete(ae.updateEdges, edge.ID)
@@ -969,20 +953,26 @@ func (ae *AsyncEngine) NodeCount() (int64, error) {
969953
ae.mu.RLock()
970954

971955
// Count pending creates, excluding:
972-
// - in-flight nodes (already written to engine)
973956
// - update nodes (exist in engine, just being modified)
957+
// NOTE: We DO count in-flight nodes because they are being written to engine
958+
// but engine.NodeCount() won't include them until the write commits.
959+
// During flush, nodes transition: cache -> inFlight -> engine
960+
// If we skip inFlight nodes AND engine hasn't committed, count = 0 (BUG!)
974961
pendingCreates := int64(0)
975962
pendingUpdates := int64(0)
963+
inFlightCreates := int64(0)
976964
for id := range ae.nodeCache {
977-
if ae.inFlightNodes[id] {
978-
continue // Don't count in-flight (already in engine)
979-
}
980965
if ae.updateNodes[id] {
981966
pendingUpdates++ // Exists in engine, just updating
982967
continue
983968
}
969+
if ae.inFlightNodes[id] {
970+
inFlightCreates++ // Being written to engine right now
971+
continue
972+
}
984973
pendingCreates++
985974
}
975+
// Also count nodes that are in-flight but NOT updates (they're being created)
986976
pendingDeletes := int64(len(ae.deleteNodes))
987977

988978
engineCount, err := ae.engine.NodeCount()
@@ -994,7 +984,8 @@ func (ae *AsyncEngine) NodeCount() (int64, error) {
994984

995985
// Adjust for pending creates and deletes
996986
// Note: pendingUpdates don't change count (already counted in engineCount)
997-
count := engineCount + pendingCreates - pendingDeletes
987+
// Include inFlightCreates because they're being written but not yet in engineCount
988+
count := engineCount + pendingCreates + inFlightCreates - pendingDeletes
998989

999990
// Clamp to zero if negative (should never happen, log for debugging)
1000991
if count < 0 {
@@ -1011,16 +1002,19 @@ func (ae *AsyncEngine) EdgeCount() (int64, error) {
10111002
ae.mu.RLock()
10121003

10131004
// Count pending creates, excluding:
1014-
// - in-flight edges (already written to engine)
10151005
// - update edges (exist in engine, just being modified)
1006+
// NOTE: We DO count in-flight edges because they are being written to engine
1007+
// but engine.EdgeCount() won't include them until the write commits.
10161008
pendingCreates := int64(0)
1009+
inFlightCreates := int64(0)
10171010
for id := range ae.edgeCache {
1018-
if ae.inFlightEdges[id] {
1019-
continue // Don't count in-flight (already in engine)
1020-
}
10211011
if ae.updateEdges[id] {
10221012
continue // Exists in engine, just updating
10231013
}
1014+
if ae.inFlightEdges[id] {
1015+
inFlightCreates++ // Being written to engine right now
1016+
continue
1017+
}
10241018
pendingCreates++
10251019
}
10261020
pendingDeletes := int64(len(ae.deleteEdges))
@@ -1034,7 +1028,8 @@ func (ae *AsyncEngine) EdgeCount() (int64, error) {
10341028

10351029
// Adjust for pending creates and deletes
10361030
// Note: updates don't change count (already counted in engineCount)
1037-
count := engineCount + pendingCreates - pendingDeletes
1031+
// Include inFlightCreates because they're being written but not yet in engineCount
1032+
count := engineCount + pendingCreates + inFlightCreates - pendingDeletes
10381033

10391034
// Clamp to zero if negative (should never happen, log for debugging)
10401035
if count < 0 {

0 commit comments

Comments
 (0)