@@ -6,6 +6,13 @@ import { expect } from "chai";
66import { FanoutTree , TopicControlPlane , TopicRootControlPlane } from "../src/index.js" ;
77
88describe ( "pubsub (subscribe race regressions)" , function ( ) {
9+ let session :
10+ | TestSession < {
11+ pubsub : TopicControlPlane ;
12+ fanout : FanoutTree ;
13+ } >
14+ | undefined ;
15+
916 const createDisconnectedSession = async (
1017 peerCount : number ,
1118 options ?: {
@@ -51,96 +58,91 @@ describe("pubsub (subscribe race regressions)", function () {
5158 ...( options ?. pubsub || { } ) ,
5259 } ) ,
5360 } ,
54- } ) ;
55- } ;
56-
57- it ( "discovers peers when subscribe and connect happen concurrently" , async ( ) => {
58- const TOPIC = "concurrent-subscribe-connect-regression" ;
59- const session = await createDisconnectedSession ( 2 ) ;
60-
61- try {
62- const a = session . peers [ 0 ] ! . services . pubsub ;
63- const b = session . peers [ 1 ] ! . services . pubsub ;
64-
65- await Promise . all ( [
66- a . subscribe ( TOPIC ) ,
67- b . subscribe ( TOPIC ) ,
68- session . connect ( [ [ session . peers [ 0 ] , session . peers [ 1 ] ] ] ) ,
69- ] ) ;
70-
71- await waitForResolved ( ( ) => {
72- const aTopics = a . topics . get ( TOPIC ) ;
73- const bTopics = b . topics . get ( TOPIC ) ;
74- expect ( aTopics ) . to . not . equal ( undefined ) ;
75- expect ( bTopics ) . to . not . equal ( undefined ) ;
76- expect ( aTopics ?. has ( b . publicKeyHash ) ) . to . equal ( true ) ;
77- expect ( bTopics ?. has ( a . publicKeyHash ) ) . to . equal ( true ) ;
7861 } ) ;
79- } finally {
62+ } ;
63+
64+ afterEach ( async ( ) => {
65+ if ( session ) {
8066 await session . stop ( ) ;
67+ session = undefined ;
8168 }
8269 } ) ;
8370
71+ it ( "discovers peers when subscribe and connect happen concurrently" , async ( ) => {
72+ const TOPIC = "concurrent-subscribe-connect-regression" ;
73+ session = await createDisconnectedSession ( 2 ) ;
74+
75+ const a = session . peers [ 0 ] ! . services . pubsub ;
76+ const b = session . peers [ 1 ] ! . services . pubsub ;
77+
78+ await Promise . all ( [
79+ a . subscribe ( TOPIC ) ,
80+ b . subscribe ( TOPIC ) ,
81+ session . connect ( [ [ session . peers [ 0 ] , session . peers [ 1 ] ] ] ) ,
82+ ] ) ;
83+
84+ await waitForResolved ( ( ) => {
85+ const aTopics = a . topics . get ( TOPIC ) ;
86+ const bTopics = b . topics . get ( TOPIC ) ;
87+ expect ( aTopics ) . to . not . equal ( undefined ) ;
88+ expect ( bTopics ) . to . not . equal ( undefined ) ;
89+ expect ( aTopics ?. has ( b . publicKeyHash ) ) . to . equal ( true ) ;
90+ expect ( bTopics ?. has ( a . publicKeyHash ) ) . to . equal ( true ) ;
91+ } ) ;
92+ } ) ;
93+
8494 it ( "does not track a topic on a peer that never subscribed" , async ( ) => {
8595 const TOPIC = "non-subscriber-should-not-track-regression" ;
86- const session = await createDisconnectedSession ( 2 ) ;
87-
88- try {
89- const a = session . peers [ 0 ] ! . services . pubsub ;
90- const b = session . peers [ 1 ] ! . services . pubsub ;
91-
92- await session . connect ( [ [ session . peers [ 0 ] , session . peers [ 1 ] ] ] ) ;
93- await waitForNeighbour ( a , b ) ;
94-
95- await b . subscribe ( TOPIC ) ;
96- await waitForResolved ( ( ) => {
97- expect ( b . subscriptions . has ( TOPIC ) ) . to . equal ( true ) ;
98- const bSubscribers = b . getSubscribers ( TOPIC ) ;
99- expect (
100- bSubscribers ?. some ( ( subscriber ) => subscriber . hashcode ( ) === b . publicKeyHash ) ,
101- ) . to . equal ( true ) ;
102- } ) ;
96+ session = await createDisconnectedSession ( 2 ) ;
10397
104- expect ( a . topics . has ( TOPIC ) ) . to . equal ( false ) ;
105- expect ( a . topics . get ( TOPIC ) ) . to . equal ( undefined ) ;
106- } finally {
107- await session . stop ( ) ;
108- }
98+ const a = session . peers [ 0 ] ! . services . pubsub ;
99+ const b = session . peers [ 1 ] ! . services . pubsub ;
100+
101+ await session . connect ( [ [ session . peers [ 0 ] , session . peers [ 1 ] ] ] ) ;
102+ await waitForNeighbour ( a , b ) ;
103+
104+ await b . subscribe ( TOPIC ) ;
105+ await waitForResolved ( ( ) => {
106+ expect ( b . subscriptions . has ( TOPIC ) ) . to . equal ( true ) ;
107+ const bSubscribers = b . getSubscribers ( TOPIC ) ;
108+ expect (
109+ bSubscribers ?. some ( ( subscriber ) => subscriber . hashcode ( ) === b . publicKeyHash ) ,
110+ ) . to . equal ( true ) ;
111+ } ) ;
112+
113+ expect ( a . topics . has ( TOPIC ) ) . to . equal ( false ) ;
114+ expect ( a . topics . get ( TOPIC ) ) . to . equal ( undefined ) ;
109115 } ) ;
110116
111117 it ( "does not advertise cancelled pending subscriptions to peers" , async ( ) => {
112118 const TOPIC = "subscribe-then-unsubscribe-before-debounce-regression" ;
113119 const debounceDelayMs = 500 ;
114- const session = await createDisconnectedSession ( 2 , {
120+ session = await createDisconnectedSession ( 2 , {
115121 pubsub : {
116122 subscriptionDebounceDelay : debounceDelayMs ,
117123 } ,
118124 } ) ;
119125
120- try {
121- const a = session . peers [ 0 ] ! . services . pubsub ;
122- const b = session . peers [ 1 ] ! . services . pubsub ;
126+ const a = session . peers [ 0 ] ! . services . pubsub ;
127+ const b = session . peers [ 1 ] ! . services . pubsub ;
123128
124- await session . connect ( [ [ session . peers [ 0 ] , session . peers [ 1 ] ] ] ) ;
125- await waitForNeighbour ( a , b ) ;
129+ await session . connect ( [ [ session . peers [ 0 ] , session . peers [ 1 ] ] ] ) ;
130+ await waitForNeighbour ( a , b ) ;
126131
127- const pendingSubscribe = a . subscribe ( TOPIC ) ;
128- const removed = await a . unsubscribe ( TOPIC ) ;
129- expect ( removed ) . to . equal ( false ) ;
132+ const pendingSubscribe = a . subscribe ( TOPIC ) ;
133+ const removed = await a . unsubscribe ( TOPIC ) ;
134+ expect ( removed ) . to . equal ( false ) ;
130135
131- await b . subscribe ( TOPIC ) ;
136+ await b . subscribe ( TOPIC ) ;
132137
133- // Wait for A's debounced subscribe cycle to settle before asserting.
134- // This validates that A does not get (stale) advertised at flush time.
135- await pendingSubscribe ;
136- await delay ( debounceDelayMs + 100 ) ;
138+ // Wait for A's debounced subscribe cycle to settle before asserting.
139+ // This validates that A does not get (stale) advertised at flush time.
140+ await pendingSubscribe ;
141+ await delay ( debounceDelayMs + 100 ) ;
137142
138- expect ( a . topics . has ( TOPIC ) ) . to . equal ( false ) ;
139- const bTopics = b . topics . get ( TOPIC ) ;
140- expect ( bTopics ) . to . not . equal ( undefined ) ;
141- expect ( bTopics ! . has ( a . publicKeyHash ) ) . to . equal ( false ) ;
142- } finally {
143- await session . stop ( ) ;
144- }
143+ expect ( a . topics . has ( TOPIC ) ) . to . equal ( false ) ;
144+ const bTopics = b . topics . get ( TOPIC ) ;
145+ expect ( bTopics ) . to . not . equal ( undefined ) ;
146+ expect ( bTopics ! . has ( a . publicKeyHash ) ) . to . equal ( false ) ;
145147 } ) ;
146148} ) ;
0 commit comments