Skip to content

Commit 9b010b5

Browse files
Merge pull request #641 from pastelnetwork/PSL-939_disableP2PKey
disable p2p key
2 parents 9f2e39e + a55e5e6 commit 9b010b5

File tree

10 files changed

+456
-22
lines changed

10 files changed

+456
-22
lines changed

dupedetection/ddclient/client.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
)
1313

1414
const (
15-
defaultConnectTimeout = 1000 * time.Second
15+
defaultConnectTimeout = 60 * time.Second
1616
)
1717

1818
type client struct{}
@@ -29,10 +29,7 @@ func (cl *client) Connect(ctx context.Context, address string) (*clientConn, err
2929
//lint:ignore SA1019 we want to ignore this for now
3030
grpc.WithInsecure(),
3131
grpc.WithBlock(),
32-
grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name), grpc.MaxCallRecvMsgSize(35000000),
33-
34-
35-
),
32+
grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name), grpc.MaxCallRecvMsgSize(35000000)),
3633
)
3734
if err != nil {
3835
return nil, errors.Errorf("fail to dial: %w", err).WithField("address", address)

dupedetection/ddclient/ddclient.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ import (
2121
)
2222

2323
const (
24-
logPrefix = "ddClient"
24+
logPrefix = "ddClient"
25+
defaultDDServiceCallTimeout = 26 * time.Minute
2526
)
2627

2728
// DDServerClient contains methods for request services from dd-server service.
@@ -90,9 +91,13 @@ func (ddClient *ddServerClientImpl) callImageRarenessScore(ctx context.Context,
9091
// remove file after use
9192
defer os.Remove(inputPath)
9293

93-
res, err := client.ImageRarenessScore(context.Background(), &req, grpc.MaxCallRecvMsgSize(35000000))
94+
// Limits the dial timeout, prevent got stuck too long
95+
dialCtx, dcancel := context.WithTimeout(ctx, defaultDDServiceCallTimeout)
96+
defer dcancel()
97+
98+
res, err := client.ImageRarenessScore(dialCtx, &req, grpc.MaxCallRecvMsgSize(35000000))
9499
if err != nil {
95-
return nil, errors.Errorf("Error calling image rareness score: %w", err)
100+
return nil, errors.Errorf("Error calling image rareness score, dd-serivce returned error: %w", err)
96101
}
97102

98103
bytes, _ := json.Marshal(res)

p2p/kademlia/dht.go

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@ import (
2424
)
2525

2626
var (
27-
defaultNetworkAddr = "0.0.0.0"
28-
defaultNetworkPort = 4445
29-
defaultRefreshTime = time.Second * 3600
30-
defaultPingTime = time.Second * 10
27+
defaultNetworkAddr = "0.0.0.0"
28+
defaultNetworkPort = 4445
29+
defaultRefreshTime = time.Second * 3600
30+
defaultPingTime = time.Second * 10
31+
defaultCleanupInterval = time.Minute * 2
32+
defaultDisabledKeyExpirationInterval = time.Minute * 30
3133
)
3234

3335
const maxIterations = 5
@@ -38,6 +40,7 @@ type DHT struct {
3840
options *Options // the options of DHT
3941
network *Network // the network of DHT
4042
store Store // the storage of DHT
43+
metaStore MetaStore // the meta storage of DHT
4144
done chan struct{} // distributed hash table is done
4245
cache storage.KeyValue // store bad bootstrap addresses
4346
pastelClient pastel.Client
@@ -73,7 +76,7 @@ type Options struct {
7376
}
7477

7578
// NewDHT returns a new DHT node
76-
func NewDHT(ctx context.Context, store Store, pc pastel.Client, secInfo *alts.SecInfo, options *Options) (*DHT, error) {
79+
func NewDHT(ctx context.Context, store Store, metaStore MetaStore, pc pastel.Client, secInfo *alts.SecInfo, options *Options) (*DHT, error) {
7780
// validate the options, if it's invalid, set them to default value
7881
if options.IP == "" {
7982
options.IP = defaultNetworkAddr
@@ -92,6 +95,7 @@ func NewDHT(ctx context.Context, store Store, pc pastel.Client, secInfo *alts.Se
9295
}
9396

9497
s := &DHT{
98+
metaStore: metaStore,
9599
store: store,
96100
options: options,
97101
pastelClient: pc,
@@ -165,6 +169,7 @@ func (s *DHT) Start(ctx context.Context) error {
165169
}
166170

167171
go s.StartReplicationWorker(ctx)
172+
go s.startDisabledKeysCleanupWorker(ctx)
168173

169174
return nil
170175
}
@@ -278,6 +283,12 @@ func (s *DHT) Retrieve(ctx context.Context, key string, localOnly ...bool) ([]by
278283
}
279284

280285
dbKey := hex.EncodeToString(decoded)
286+
if s.metaStore != nil {
287+
if err := s.metaStore.Retrieve(ctx, dbKey); err == nil {
288+
return nil, fmt.Errorf("key is disabled: %v", key)
289+
}
290+
}
291+
281292
// retrieve the key/value from local storage
282293
value, err := s.store.Retrieve(ctx, decoded)
283294
if err == nil && len(value) > 0 {
@@ -871,3 +882,40 @@ func (s *DHT) storeToAlphaNodes(ctx context.Context, nl *NodeList, data []byte,
871882

872883
return fmt.Errorf("store data to alpha nodes failed, only %d nodes stored", finalStoreCount)
873884
}
885+
886+
func (s *DHT) startDisabledKeysCleanupWorker(ctx context.Context) error {
887+
log.P2P().WithContext(ctx).Info("disabled keys cleanup worker started")
888+
889+
for {
890+
select {
891+
case <-time.After(defaultCleanupInterval):
892+
s.cleanupDisabledKeys(ctx)
893+
case <-ctx.Done():
894+
log.P2P().WithContext(ctx).Error("closing disabled keys cleanup worker")
895+
return nil
896+
}
897+
}
898+
}
899+
900+
func (s *DHT) cleanupDisabledKeys(ctx context.Context) error {
901+
if s.metaStore == nil {
902+
return nil
903+
}
904+
905+
from := time.Now().Add(-1 * defaultDisabledKeyExpirationInterval)
906+
disabledKeys, err := s.metaStore.GetDisabledKeys(from)
907+
if err != nil {
908+
return errors.Errorf("get disabled keys: %w", err)
909+
}
910+
911+
for i := 0; i < len(disabledKeys); i++ {
912+
dec, err := hex.DecodeString(disabledKeys[i].Key)
913+
if err != nil {
914+
log.P2P().WithContext(ctx).WithError(err).Error("decode disabled key failed")
915+
continue
916+
}
917+
s.metaStore.Delete(ctx, dec)
918+
}
919+
920+
return nil
921+
}

p2p/kademlia/dht_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ func (ts *testSuite) newDHTNodeWithMemStore(ctx context.Context, port int, nodes
226226
// }
227227
// transportCredentials := credentials.NewClientCreds(fakePastelClient, secInfo)
228228

229-
dht, err := NewDHT(ctx, ts.memStore, pastelClientMock, secInfo, options)
229+
dht, err := NewDHT(ctx, ts.memStore, nil, pastelClientMock, secInfo, options)
230230
if err != nil {
231231
return nil, errors.Errorf("new dht: %w", err)
232232
}
@@ -270,7 +270,7 @@ func (ts *testSuite) newDHTNodeWithDBStore(ctx context.Context, port int, nodes
270270
pub: TestPub,
271271
}
272272

273-
dht, err := NewDHT(ctx, ts.dbStore, fakePastelClient, secInfo, options)
273+
dht, err := NewDHT(ctx, ts.dbStore, nil, fakePastelClient, secInfo, options)
274274
if err != nil {
275275
return nil, errors.Errorf("new dht: %w", err)
276276
}

p2p/kademlia/domain/domain.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,12 @@ type ToRepKey struct {
2727
ID string `json:"id"`
2828
Attempts int `json:"attempts"`
2929
}
30+
31+
// DisabledKey is a disabled key
32+
type DisabledKey struct {
33+
Key string `json:"key"`
34+
CreatedAt time.Time `json:"createdAt"`
35+
}
36+
37+
// DisabledKeys is the list for disabled keys
38+
type DisabledKeys []DisabledKey

p2p/kademlia/replication.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ func (s *DHT) StartReplicationWorker(ctx context.Context) error {
5555
for {
5656
select {
5757
case <-time.After(defaultReplicationInterval):
58-
s.Replicate(ctx)
58+
//s.Replicate(ctx)
59+
log.P2P().WithContext(ctx).Info("replication worker currently disabled")
5960
case <-ctx.Done():
6061
log.P2P().WithContext(ctx).Error("closing replication worker")
6162
return nil

p2p/kademlia/store.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,21 @@ type Store interface {
7373

7474
IncrementAttempts(keys []string) error
7575
}
76+
77+
// MetaStore is the interface for implementing the storage mechanism for the DHT
78+
type MetaStore interface {
79+
// Delete a key pair from the store
80+
Delete(_ context.Context, key []byte)
81+
82+
// Retrieve the local key from store
83+
Retrieve(_ context.Context, key string) error
84+
85+
// Close the store
86+
Close(ctx context.Context)
87+
88+
// Store a key pair for the local node with the replication
89+
Store(ctx context.Context, key []byte) error
90+
91+
// GetDisabledKeys returns all disabled keys
92+
GetDisabledKeys(from time.Time) (retKeys domain.DisabledKeys, err error)
93+
}

0 commit comments

Comments
 (0)