@@ -234,3 +234,170 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
234
234
t .Fatal (result .Error )
235
235
}
236
236
}
237
+
238
+ //TestSameVersionID just checks that if the version is not changed,
239
+ //then streamer peers see each other
240
+ func TestSameVersionID (t * testing.T ) {
241
+ //test version ID
242
+ v := uint (1 )
243
+ sim := simulation .New (map [string ]simulation.ServiceFunc {
244
+ "streamer" : func (ctx * adapters.ServiceContext , bucket * sync.Map ) (s node.Service , cleanup func (), err error ) {
245
+ var store storage.ChunkStore
246
+ var datadir string
247
+
248
+ node := ctx .Config .Node ()
249
+ addr := network .NewAddr (node )
250
+
251
+ store , datadir , err = createTestLocalStorageForID (node .ID (), addr )
252
+ if err != nil {
253
+ return nil , nil , err
254
+ }
255
+ bucket .Store (bucketKeyStore , store )
256
+ cleanup = func () {
257
+ store .Close ()
258
+ os .RemoveAll (datadir )
259
+ }
260
+ localStore := store .(* storage.LocalStore )
261
+ netStore , err := storage .NewNetStore (localStore , nil )
262
+ if err != nil {
263
+ return nil , nil , err
264
+ }
265
+ bucket .Store (bucketKeyDB , netStore )
266
+ kad := network .NewKademlia (addr .Over (), network .NewKadParams ())
267
+ delivery := NewDelivery (kad , netStore )
268
+ netStore .NewNetFetcherFunc = network .NewFetcherFactory (delivery .RequestFromPeers , true ).New
269
+
270
+ bucket .Store (bucketKeyDelivery , delivery )
271
+
272
+ r := NewRegistry (addr .ID (), delivery , netStore , state .NewInmemoryStore (), & RegistryOptions {
273
+ Retrieval : RetrievalDisabled ,
274
+ Syncing : SyncingAutoSubscribe ,
275
+ }, nil )
276
+ //assign to each node the same version ID
277
+ r .spec .Version = v
278
+
279
+ bucket .Store (bucketKeyRegistry , r )
280
+
281
+ return r , cleanup , nil
282
+
283
+ },
284
+ })
285
+ defer sim .Close ()
286
+
287
+ //connect just two nodes
288
+ log .Info ("Adding nodes to simulation" )
289
+ _ , err := sim .AddNodesAndConnectChain (2 )
290
+ if err != nil {
291
+ t .Fatal (err )
292
+ }
293
+
294
+ log .Info ("Starting simulation" )
295
+ ctx := context .Background ()
296
+ //make sure they have time to connect
297
+ time .Sleep (200 * time .Millisecond )
298
+ result := sim .Run (ctx , func (ctx context.Context , sim * simulation.Simulation ) error {
299
+ //get the pivot node's filestore
300
+ nodes := sim .UpNodeIDs ()
301
+
302
+ item , ok := sim .NodeItem (nodes [0 ], bucketKeyRegistry )
303
+ if ! ok {
304
+ return fmt .Errorf ("No filestore" )
305
+ }
306
+ registry := item .(* Registry )
307
+
308
+ //the peers should connect, thus getting the peer should not return nil
309
+ if registry .getPeer (nodes [1 ]) == nil {
310
+ t .Fatal ("Expected the peer to not be nil, but it is" )
311
+ }
312
+ return nil
313
+ })
314
+ if result .Error != nil {
315
+ t .Fatal (result .Error )
316
+ }
317
+ log .Info ("Simulation ended" )
318
+ }
319
+
320
+ //TestDifferentVersionID proves that if the streamer protocol version doesn't match,
321
+ //then the peers are not connected at streamer level
322
+ func TestDifferentVersionID (t * testing.T ) {
323
+ //create a variable to hold the version ID
324
+ v := uint (0 )
325
+ sim := simulation .New (map [string ]simulation.ServiceFunc {
326
+ "streamer" : func (ctx * adapters.ServiceContext , bucket * sync.Map ) (s node.Service , cleanup func (), err error ) {
327
+ var store storage.ChunkStore
328
+ var datadir string
329
+
330
+ node := ctx .Config .Node ()
331
+ addr := network .NewAddr (node )
332
+
333
+ store , datadir , err = createTestLocalStorageForID (node .ID (), addr )
334
+ if err != nil {
335
+ return nil , nil , err
336
+ }
337
+ bucket .Store (bucketKeyStore , store )
338
+ cleanup = func () {
339
+ store .Close ()
340
+ os .RemoveAll (datadir )
341
+ }
342
+ localStore := store .(* storage.LocalStore )
343
+ netStore , err := storage .NewNetStore (localStore , nil )
344
+ if err != nil {
345
+ return nil , nil , err
346
+ }
347
+ bucket .Store (bucketKeyDB , netStore )
348
+ kad := network .NewKademlia (addr .Over (), network .NewKadParams ())
349
+ delivery := NewDelivery (kad , netStore )
350
+ netStore .NewNetFetcherFunc = network .NewFetcherFactory (delivery .RequestFromPeers , true ).New
351
+
352
+ bucket .Store (bucketKeyDelivery , delivery )
353
+
354
+ r := NewRegistry (addr .ID (), delivery , netStore , state .NewInmemoryStore (), & RegistryOptions {
355
+ Retrieval : RetrievalDisabled ,
356
+ Syncing : SyncingAutoSubscribe ,
357
+ }, nil )
358
+
359
+ //increase the version ID for each node
360
+ v ++
361
+ r .spec .Version = v
362
+
363
+ bucket .Store (bucketKeyRegistry , r )
364
+
365
+ return r , cleanup , nil
366
+
367
+ },
368
+ })
369
+ defer sim .Close ()
370
+
371
+ //connect the nodes
372
+ log .Info ("Adding nodes to simulation" )
373
+ _ , err := sim .AddNodesAndConnectChain (2 )
374
+ if err != nil {
375
+ t .Fatal (err )
376
+ }
377
+
378
+ log .Info ("Starting simulation" )
379
+ ctx := context .Background ()
380
+ //make sure they have time to connect
381
+ time .Sleep (200 * time .Millisecond )
382
+ result := sim .Run (ctx , func (ctx context.Context , sim * simulation.Simulation ) error {
383
+ //get the pivot node's filestore
384
+ nodes := sim .UpNodeIDs ()
385
+
386
+ item , ok := sim .NodeItem (nodes [0 ], bucketKeyRegistry )
387
+ if ! ok {
388
+ return fmt .Errorf ("No filestore" )
389
+ }
390
+ registry := item .(* Registry )
391
+
392
+ //getting the other peer should fail due to the different version numbers
393
+ if registry .getPeer (nodes [1 ]) != nil {
394
+ t .Fatal ("Expected the peer to be nil, but it is not" )
395
+ }
396
+ return nil
397
+ })
398
+ if result .Error != nil {
399
+ t .Fatal (result .Error )
400
+ }
401
+ log .Info ("Simulation ended" )
402
+
403
+ }
0 commit comments