Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ coverage.out
*.swo

wallets.json

*.key
106 changes: 101 additions & 5 deletions p2p/database/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package database
import (
"context"
"fmt"
"hash/fnv"
"math/big"
"sync"
"time"

"cloud.google.com/go/datastore"
Expand Down Expand Up @@ -127,6 +129,82 @@ type DatastoreOptions struct {
TTL time.Duration
}

const (
shardCount = 32
cleanupInterval = 5 * time.Minute // Interval between cleanups
entryTTL = 20 * time.Minute // TTL for each entry
)

var (
dedupMaps [shardCount]map[string]time.Time
mutexes [shardCount]sync.Mutex
)

// Initialize dedupMaps
func init() {
// Initialize each shard in dedupMaps
for i := range dedupMaps {
dedupMaps[i] = make(map[string]time.Time)
}
// Start the cleanup routine to periodically remove stale entries
StartCleanupRoutine()
}

// getShard retrieves the shard and mutex for a given key
func getShard(key string) (map[string]time.Time, *sync.Mutex) {
hash := fnv.New32a()
hash.Write([]byte(key))
shardIndex := hash.Sum32() % shardCount
return dedupMaps[shardIndex], &mutexes[shardIndex]
}

// Deduplication function that checks for uniqueness and writes if unique
func (d *Datastore) DeduplicateAndWriteEvent(peer *enode.Node, eventKind string, hash common.Hash, hashKind string, tfs time.Time) {
key := fmt.Sprintf("%s_%s_%s", d.sensorID, peer.URLv4(), hash.Hex())
shard, mu := getShard(key)

mu.Lock()
defer mu.Unlock()

if existingTime, found := shard[key]; found && !tfs.Before(existingTime) {
return // Skip if not the earliest
}
shard[key] = tfs
d.writeEvent(peer, eventKind, hash, hashKind, tfs)
}

func cleanupDedupMap() {
for {
time.Sleep(cleanupInterval) // Wait for the next cleanup cycle

expiryTime := time.Now().Add(-entryTTL)
for i := 0; i < shardCount; i++ {
mu := &mutexes[i]
shard := dedupMaps[i]

mu.Lock()
newShard := make(map[string]time.Time) // New empty map for the shard

for key, timestamp := range shard {
if !timestamp.Before(expiryTime) {
newShard[key] = timestamp // Retain only non-expired entries
}
}
dedupMaps[i] = newShard // Replace with the new, trimmed map

// Log the size of the shard after cleanup for monitoring
log.Printf("Shard %d size after cleanup: %d entries", i, len(dedupMaps[i]))

mu.Unlock()
}
}
}

// StartCleanupRoutine starts the cleanup routine as a goroutine
func StartCleanupRoutine() {
go cleanupDedupMap()
}

// NewDatastore connects to datastore and creates the client. This should
// only be called once unless trying to write to different databases.
func NewDatastore(ctx context.Context, opts DatastoreOptions) Database {
Expand Down Expand Up @@ -158,7 +236,7 @@ func (d *Datastore) WriteBlock(ctx context.Context, peer *enode.Node, block *typ
if d.ShouldWriteBlockEvents() {
d.jobs <- struct{}{}
go func() {
d.writeEvent(peer, BlockEventsKind, block.Hash(), BlocksKind, tfs)
d.DeduplicateAndWriteEvent(peer, BlockEventsKind, block.Hash(), BlocksKind, tfs)
<-d.jobs
}()
}
Expand Down Expand Up @@ -453,22 +531,40 @@ func (d *Datastore) writeEvent(peer *enode.Node, eventKind string, hash common.H
// on the provided eventKind and hashKind. This is similar to writeEvent but
// batches the request.
func (d *Datastore) writeEvents(ctx context.Context, peer *enode.Node, eventKind string, hashes []common.Hash, hashKind string, tfs time.Time) {
// Create slices to hold only deduplicated keys and events
keys := make([]*datastore.Key, 0, len(hashes))
events := make([]*DatastoreEvent, 0, len(hashes))

for _, hash := range hashes {
keys = append(keys, datastore.IncompleteKey(eventKind, nil))
// Generate deduplication key
key := fmt.Sprintf("%s_%s_%s", d.sensorID, peer.URLv4(), hash.Hex())

// Determine the shard for this key and retrieve the associated map and mutex
shardMap, shardMutex := getShard(key)

event := DatastoreEvent{
// Lock the specific shard mutex for thread-safe access
shardMutex.Lock()
if existingTime, found := shardMap[key]; found && !tfs.Before(existingTime) {
// If found and not the earliest, skip
shardMutex.Unlock()
continue
}
// Either add a new event or update to an earlier time in the specific shard
shardMap[key] = tfs
shardMutex.Unlock()

// Proceed with writing to the datastore
keys = append(keys, datastore.IncompleteKey(eventKind, nil))
events = append(events, &DatastoreEvent{
SensorId: d.sensorID,
PeerId: peer.URLv4(),
Hash: datastore.NameKey(hashKind, hash.Hex(), nil),
Time: tfs,
TTL: tfs.Add(d.ttl),
}
events = append(events, &event)
})
}

// Perform batch write for deduplicated events only
if _, err := d.client.PutMulti(ctx, keys, events); err != nil {
log.Error().Err(err).Msgf("Failed to write to %v", eventKind)
}
Expand Down