Skip to content

Commit 6c37501

Browse files
committed
fixing counting and sync issues for deletes
1 parent 4cf6fa6 commit 6c37501

File tree

5 files changed

+399
-23
lines changed

5 files changed

+399
-23
lines changed

pkg/cypher/executor.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ import (
120120
"sync/atomic"
121121
"time"
122122

123+
"github.com/google/uuid"
123124
"github.com/orneryd/nornicdb/pkg/storage"
124125
)
125126

@@ -2296,14 +2297,11 @@ func (e *StorageExecutor) resolveReturnItem(item returnItem, variable string, no
22962297
return result
22972298
}
22982299

2299-
// idGen is a fast atomic counter for ID generation
2300-
var idGen int64
2301-
23022300
func (e *StorageExecutor) generateID() string {
2303-
// Use fast atomic counter + process start time for unique IDs
2304-
// Much faster than crypto/rand while still globally unique
2305-
id := atomic.AddInt64(&idGen, 1)
2306-
return fmt.Sprintf("n%d", id)
2301+
// Use UUID for globally unique IDs
2302+
// This prevents ID collisions across server restarts which caused
2303+
// the race condition where CREATE would cancel pending DELETEs
2304+
return uuid.New().String()
23072305
}
23082306

23092307
// Deprecated: Sequential counter replaced with UUID generation

pkg/cypher/merge.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func (e *StorageExecutor) executeMerge(ctx context.Context, cypher string) (*Exe
113113
if err != nil || (len(labels) == 0 && len(matchProps) == 0) {
114114
// If we truly can't parse, create a basic node
115115
node := &storage.Node{
116-
ID: storage.NodeID(fmt.Sprintf("node-%d", e.idCounter())),
116+
ID: storage.NodeID(e.generateID()),
117117
Labels: labels,
118118
Properties: matchProps,
119119
}
@@ -893,7 +893,7 @@ func (e *StorageExecutor) executeMergeRelationshipWithContext(ctx context.Contex
893893
} else {
894894
// Create new relationship
895895
edge = &storage.Edge{
896-
ID: storage.EdgeID(fmt.Sprintf("edge-%d", e.idCounter())),
896+
ID: storage.EdgeID(e.generateID()),
897897
Type: relType,
898898
StartNode: startNode.ID,
899899
EndNode: endNode.ID,

pkg/cypher/set_helpers.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"fmt"
4747
"strconv"
4848
"strings"
49+
"time"
4950

5051
"github.com/orneryd/nornicdb/pkg/storage"
5152
)
@@ -299,14 +300,14 @@ func (e *StorageExecutor) evaluateSetExpression(expr string) interface{} {
299300
// Handle function calls and expressions
300301
lowerExpr := strings.ToLower(expr)
301302

302-
// timestamp() - returns current timestamp
303+
// timestamp() - returns current timestamp in milliseconds
303304
if lowerExpr == "timestamp()" {
304-
return e.idCounter()
305+
return time.Now().UnixMilli()
305306
}
306307

307308
// datetime() - returns ISO date string
308309
if lowerExpr == "datetime()" {
309-
return fmt.Sprintf("%d", e.idCounter())
310+
return time.Now().Format(time.RFC3339)
310311
}
311312

312313
// randomUUID() or randomuuid()

pkg/storage/async_engine.go

Lines changed: 77 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ type AsyncEngine struct {
3838
inFlightNodes map[NodeID]bool
3939
inFlightEdges map[EdgeID]bool
4040

41+
// Updates tracking: nodes/edges in cache that are UPDATES (not creates)
42+
// These exist in underlying engine and shouldn't be counted as pending creates
43+
updateNodes map[NodeID]bool
44+
updateEdges map[EdgeID]bool
45+
4146
// Label index for fast lookups - maps normalized label to node IDs
4247
labelIndex map[string]map[NodeID]bool
4348

@@ -88,6 +93,8 @@ func NewAsyncEngine(engine Engine, config *AsyncEngineConfig) *AsyncEngine {
8893
deleteEdges: make(map[EdgeID]bool),
8994
inFlightNodes: make(map[NodeID]bool),
9095
inFlightEdges: make(map[EdgeID]bool),
96+
updateNodes: make(map[NodeID]bool),
97+
updateEdges: make(map[EdgeID]bool),
9198
labelIndex: make(map[string]map[NodeID]bool),
9299
flushInterval: config.FlushInterval,
93100
stopChan: make(chan struct{}),
@@ -312,13 +319,15 @@ func (ae *AsyncEngine) FlushWithResult() FlushResult {
312319
// Only clear if successfully written AND still the same object in cache
313320
if successfulNodeWrites[id] && ae.nodeCache[id] == nodesToWrite[id] {
314321
delete(ae.nodeCache, id)
322+
delete(ae.updateNodes, id) // Clear update flag
315323
}
316324
// Always clear in-flight marker for this batch (success or fail)
317325
delete(ae.inFlightNodes, id)
318326
}
319327
for id := range edgesToWrite {
320328
if successfulEdgeWrites[id] && ae.edgeCache[id] == edgesToWrite[id] {
321329
delete(ae.edgeCache, id)
330+
delete(ae.updateEdges, id) // Clear update flag
322331
}
323332
// Always clear in-flight marker for this batch (success or fail)
324333
delete(ae.inFlightEdges, id)
@@ -349,8 +358,31 @@ func (ae *AsyncEngine) CreateNode(node *Node) error {
349358
ae.mu.Lock()
350359
defer ae.mu.Unlock()
351360

352-
// Remove from delete set if present
361+
// Check if this was a pending delete (node exists in underlying engine)
362+
// If so, this is an UPDATE not a CREATE - don't count as pending create
363+
wasDeleted := ae.deleteNodes[node.ID]
353364
delete(ae.deleteNodes, node.ID)
365+
366+
// Determine if this is an update (node exists in engine or cache) vs a true create
367+
isUpdate := wasDeleted // Was pending delete = exists in engine
368+
369+
// Also check if node already exists in underlying engine
370+
// This handles the case where we create a node that was previously flushed
371+
if !isUpdate {
372+
if _, exists := ae.nodeCache[node.ID]; !exists {
373+
// Not in cache - check if it exists in underlying engine
374+
if _, err := ae.engine.GetNode(node.ID); err == nil {
375+
isUpdate = true // Node exists in engine
376+
}
377+
}
378+
}
379+
380+
if isUpdate {
381+
ae.updateNodes[node.ID] = true
382+
} else {
383+
delete(ae.updateNodes, node.ID)
384+
}
385+
354386
ae.nodeCache[node.ID] = node
355387

356388
// Update label index
@@ -439,7 +471,29 @@ func (ae *AsyncEngine) CreateEdge(edge *Edge) error {
439471
ae.mu.Lock()
440472
defer ae.mu.Unlock()
441473

474+
// Check if this was a pending delete (edge exists in underlying engine)
475+
// If so, this is an UPDATE not a CREATE - don't count as pending create
476+
wasDeleted := ae.deleteEdges[edge.ID]
442477
delete(ae.deleteEdges, edge.ID)
478+
479+
// Determine if this is an update (edge exists in engine or cache) vs a true create
480+
isUpdate := wasDeleted
481+
482+
// Also check if edge already exists in underlying engine
483+
if !isUpdate {
484+
if _, exists := ae.edgeCache[edge.ID]; !exists {
485+
if _, err := ae.engine.GetEdge(edge.ID); err == nil {
486+
isUpdate = true
487+
}
488+
}
489+
}
490+
491+
if isUpdate {
492+
ae.updateEdges[edge.ID] = true
493+
} else {
494+
delete(ae.updateEdges, edge.ID)
495+
}
496+
443497
ae.edgeCache[edge.ID] = edge
444498
ae.pendingWrites++
445499
return nil
@@ -914,14 +968,20 @@ func (ae *AsyncEngine) NodeCount() (int64, error) {
914968
// This prevents race with flush which clears cache before writing to engine
915969
ae.mu.RLock()
916970

917-
// Count pending creates, excluding in-flight nodes (already written to engine)
918-
// In-flight nodes exist in BOTH nodeCache AND underlying engine temporarily,
919-
// so we must not double-count them
971+
// Count pending creates, excluding:
972+
// - in-flight nodes (already written to engine)
973+
// - update nodes (exist in engine, just being modified)
920974
pendingCreates := int64(0)
975+
pendingUpdates := int64(0)
921976
for id := range ae.nodeCache {
922-
if !ae.inFlightNodes[id] {
923-
pendingCreates++
977+
if ae.inFlightNodes[id] {
978+
continue // Don't count in-flight (already in engine)
924979
}
980+
if ae.updateNodes[id] {
981+
pendingUpdates++ // Exists in engine, just updating
982+
continue
983+
}
984+
pendingCreates++
925985
}
926986
pendingDeletes := int64(len(ae.deleteNodes))
927987

@@ -933,6 +993,7 @@ func (ae *AsyncEngine) NodeCount() (int64, error) {
933993
}
934994

935995
// Adjust for pending creates and deletes
996+
// Note: pendingUpdates don't change count (already counted in engineCount)
936997
count := engineCount + pendingCreates - pendingDeletes
937998

938999
// Clamp to zero if negative (should never happen, log for debugging)
@@ -949,14 +1010,18 @@ func (ae *AsyncEngine) EdgeCount() (int64, error) {
9491010
// This prevents race with flush which clears cache before writing to engine
9501011
ae.mu.RLock()
9511012

952-
// Count pending creates, excluding in-flight edges (already written to engine)
953-
// In-flight edges exist in BOTH edgeCache AND underlying engine temporarily,
954-
// so we must not double-count them
1013+
// Count pending creates, excluding:
1014+
// - in-flight edges (already written to engine)
1015+
// - update edges (exist in engine, just being modified)
9551016
pendingCreates := int64(0)
9561017
for id := range ae.edgeCache {
957-
if !ae.inFlightEdges[id] {
958-
pendingCreates++
1018+
if ae.inFlightEdges[id] {
1019+
continue // Don't count in-flight (already in engine)
1020+
}
1021+
if ae.updateEdges[id] {
1022+
continue // Exists in engine, just updating
9591023
}
1024+
pendingCreates++
9601025
}
9611026
pendingDeletes := int64(len(ae.deleteEdges))
9621027

@@ -968,6 +1033,7 @@ func (ae *AsyncEngine) EdgeCount() (int64, error) {
9681033
}
9691034

9701035
// Adjust for pending creates and deletes
1036+
// Note: updates don't change count (already counted in engineCount)
9711037
count := engineCount + pendingCreates - pendingDeletes
9721038

9731039
// Clamp to zero if negative (should never happen, log for debugging)

0 commit comments

Comments
 (0)