diff --git a/.gitignore b/.gitignore index e2bde68bc..6ce9162a8 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,5 @@ coverage.out *.swo wallets.json + +*.key diff --git a/p2p/database/datastore.go b/p2p/database/datastore.go index 143e069e7..c4aa549e6 100644 --- a/p2p/database/datastore.go +++ b/p2p/database/datastore.go @@ -3,7 +3,9 @@ package database import ( "context" "fmt" + "hash/fnv" "math/big" + "sync" "time" "cloud.google.com/go/datastore" @@ -127,6 +129,82 @@ type DatastoreOptions struct { TTL time.Duration } +const ( + shardCount = 32 + cleanupInterval = 5 * time.Minute // Interval between cleanups + entryTTL = 20 * time.Minute // TTL for each entry +) + +var ( + dedupMaps [shardCount]map[string]time.Time + mutexes [shardCount]sync.Mutex +) + +// Initialize dedupMaps +func init() { + // Initialize each shard in dedupMaps + for i := range dedupMaps { + dedupMaps[i] = make(map[string]time.Time) + } + // Start the cleanup routine to periodically remove stale entries + StartCleanupRoutine() +} + +// getShard retrieves the shard and mutex for a given key +func getShard(key string) (map[string]time.Time, *sync.Mutex) { + hash := fnv.New32a() + hash.Write([]byte(key)) + shardIndex := hash.Sum32() % shardCount + return dedupMaps[shardIndex], &mutexes[shardIndex] +} + +// Deduplication function that checks for uniqueness and writes if unique +func (d *Datastore) DeduplicateAndWriteEvent(peer *enode.Node, eventKind string, hash common.Hash, hashKind string, tfs time.Time) { + key := fmt.Sprintf("%s_%s_%s", d.sensorID, peer.URLv4(), hash.Hex()) + shard, mu := getShard(key) + + mu.Lock() + defer mu.Unlock() + + if existingTime, found := shard[key]; found && !tfs.Before(existingTime) { + return // Skip if not the earliest + } + shard[key] = tfs + d.writeEvent(peer, eventKind, hash, hashKind, tfs) +} + +func cleanupDedupMap() { + for { + time.Sleep(cleanupInterval) // Wait for the next cleanup cycle + + expiryTime := time.Now().Add(-entryTTL) + for i := 0; i < shardCount; i++ { + mu := &mutexes[i] + shard := dedupMaps[i] + + mu.Lock() + newShard := make(map[string]time.Time) // New empty map for the shard + + for key, timestamp := range shard { + if !timestamp.Before(expiryTime) { + newShard[key] = timestamp // Retain only non-expired entries + } + } + dedupMaps[i] = newShard // Replace with the new, trimmed map + + // Log the size of the shard after cleanup for monitoring + log.Printf("Shard %d size after cleanup: %d entries", i, len(dedupMaps[i])) + + mu.Unlock() + } + } +} + +// StartCleanupRoutine starts the cleanup routine as a goroutine +func StartCleanupRoutine() { + go cleanupDedupMap() +} + // NewDatastore connects to datastore and creates the client. This should // only be called once unless trying to write to different databases. func NewDatastore(ctx context.Context, opts DatastoreOptions) Database { @@ -158,7 +236,7 @@ func (d *Datastore) WriteBlock(ctx context.Context, peer *enode.Node, block *typ if d.ShouldWriteBlockEvents() { d.jobs <- struct{}{} go func() { - d.writeEvent(peer, BlockEventsKind, block.Hash(), BlocksKind, tfs) + d.DeduplicateAndWriteEvent(peer, BlockEventsKind, block.Hash(), BlocksKind, tfs) <-d.jobs }() } @@ -453,22 +531,40 @@ func (d *Datastore) writeEvent(peer *enode.Node, eventKind string, hash common.H // on the provided eventKind and hashKind. This is similar to writeEvent but // batches the request. func (d *Datastore) writeEvents(ctx context.Context, peer *enode.Node, eventKind string, hashes []common.Hash, hashKind string, tfs time.Time) { + // Create slices to hold only deduplicated keys and events keys := make([]*datastore.Key, 0, len(hashes)) events := make([]*DatastoreEvent, 0, len(hashes)) for _, hash := range hashes { - keys = append(keys, datastore.IncompleteKey(eventKind, nil)) + // Generate deduplication key + key := fmt.Sprintf("%s_%s_%s", d.sensorID, peer.URLv4(), hash.Hex()) + + // Determine the shard for this key and retrieve the associated map and mutex + shardMap, shardMutex := getShard(key) - event := DatastoreEvent{ + // Lock the specific shard mutex for thread-safe access + shardMutex.Lock() + if existingTime, found := shardMap[key]; found && !tfs.Before(existingTime) { + // If found and not the earliest, skip + shardMutex.Unlock() + continue + } + // Either add a new event or update to an earlier time in the specific shard + shardMap[key] = tfs + shardMutex.Unlock() + + // Proceed with writing to the datastore + keys = append(keys, datastore.IncompleteKey(eventKind, nil)) + events = append(events, &DatastoreEvent{ SensorId: d.sensorID, PeerId: peer.URLv4(), Hash: datastore.NameKey(hashKind, hash.Hex(), nil), Time: tfs, TTL: tfs.Add(d.ttl), - } - events = append(events, &event) + }) } + // Perform batch write for deduplicated events only if _, err := d.client.PutMulti(ctx, keys, events); err != nil { log.Error().Err(err).Msgf("Failed to write to %v", eventKind) }