@@ -63,13 +63,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
6363 synchronizer . Map ( syncStore ) ;
6464
6565 ILogger storeLogger = _loggerFactory . CreateLogger ( nameof ( ArqStore ) ) ;
66- bool storeInitialized = false ;
6766 store . PropertyChanged += ( _ , args ) =>
6867 {
69- if ( ! storeInitialized && args . PropertyName == nameof ( ArqStore . Ber ) )
70- {
71- storeInitialized = true ;
72- }
7368 string value = args . PropertyName switch
7469 {
7570 nameof ( ArqStore . Ber ) => store . Ber . ToString ( CultureInfo . InvariantCulture ) ,
@@ -91,6 +86,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
9186
9287 ProtocolBuilder . ProtocolStack syncStack = ProtocolBuilder . Create ( _services )
9388 . Add ( synchronizer )
89+ . Add < BufferLayer > ( )
9490 . Add < SegmentationLayer > ( )
9591 . Add < LoggerLayer > ( )
9692 . Add < ArqLayer > ( )
@@ -111,7 +107,6 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
111107
112108 ArqLayer arqLayer = ( ArqLayer ) syncStack . Layers . First ( x => x is ArqLayer ) ;
113109 bool connected = false ;
114- bool initialConnect = false ;
115110 arqLayer . EventOccurred += ( _ , e ) =>
116111 {
117112 _logger . LogInformation ( "Arc Event: {Event}" , e . Event ) ;
@@ -125,43 +120,29 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
125120 }
126121 } ;
127122
128- // First make sure the connection is established before calling syncFrom.
129- // Otherwise, many reset commands send before the first ACK replay is received, causing resetting back-and-forth.
130- arqLayer . KeepAlive ( ) ;
123+ synchronizer . SyncFrom ( syncStore , syncLayer ) ;
131124
132- int retryIn = 3 ;
133125 NetMQTimer syncTimer = new ( TimeSpan . FromMilliseconds ( 1000 ) ) ;
134- using ( NetMQPoller poller = [ syncTimer , socket ] )
126+ NetMQTimer keepAliveTimer = new ( TimeSpan . FromMilliseconds ( 5000 ) ) ;
127+ using ( NetMQPoller poller = [ syncTimer , keepAliveTimer , socket ] )
135128 {
136129 socket . ReceiveReady += ( _ , _ ) =>
137130 {
138131 int bytesReceived = syncZmqLayer . ReceiveAll ( ) ;
139132 _logger . LogTrace ( "Sync Received {Bytes} bytes" , bytesReceived ) ;
140133 } ;
141134
142- syncTimer . Elapsed += ( _ , _ ) =>
135+ keepAliveTimer . Elapsed += ( _ , _ ) =>
143136 {
144- if ( connected && ! initialConnect )
145- {
146- // First time after connected, enable synchronisation for the store.
147- synchronizer . SyncFrom ( syncStore , syncLayer ) ;
148- initialConnect = true ;
149- }
137+ arqLayer . KeepAlive ( ) ;
138+ } ;
150139
140+ syncTimer . Elapsed += ( _ , _ ) =>
141+ {
151142 if ( connected )
152143 {
153144 store . Hello += 1 ;
154145 }
155- else
156- {
157- retryIn -- ;
158- if ( retryIn <= 0 )
159- {
160- _logger . LogInformation ( "Not connected yet, sending keep-alive" ) ;
161- arqLayer . KeepAlive ( ) ;
162- retryIn = 3 ;
163- }
164- }
165146 synchronizer . Process ( ) ;
166147 } ;
167148
0 commit comments