@@ -179,17 +179,19 @@ record -> {
179179 throw Status .CANCELLED
180180 .withDescription ("hit deadline while retransmitting" )
181181 .asRuntimeException ();
182- } else if (notification instanceof Ack ack ) {
182+ } else if (notification instanceof Ack ) {
183+ final Ack ack = (Ack ) notification ;
183184 this .remainingAttempts .set (this .client .config .maxRetries );
184185 var correspondingInflight = inflightQueue .poll ();
185186 if (correspondingInflight == null ) {
186187 throw Status .INTERNAL .withDescription ("inflight queue is empty" ).asRuntimeException ();
187188 } else {
188189 validate (correspondingInflight , ack .output );
189190 correspondingInflight .callback .set (ack .output );
190- this .inflightBytes .release (correspondingInflight .meteredBytes . intValue () );
191+ this .inflightBytes .release (( int ) correspondingInflight .meteredBytes );
191192 }
192- } else if (notification instanceof Error error ) {
193+ } else if (notification instanceof Error ) {
194+ final Error error = (Error ) notification ;
193195 throw new RuntimeException (error .throwable );
194196 } else {
195197 throw Status .INTERNAL
@@ -218,7 +220,8 @@ private synchronized Void cleanUp(Status fatal) throws InterruptedException {
218220 // through them and cancel any pending batches via their callback.
219221 while (!notificationQueue .isEmpty ()) {
220222 var entry = notificationQueue .poll ();
221- if (entry instanceof Batch batch ) {
223+ if (entry instanceof Batch ) {
224+ final Batch batch = (Batch ) entry ;
222225 batch .input .callback .setException (fatal .asRuntimeException ());
223226 }
224227 }
@@ -228,7 +231,7 @@ private synchronized Void cleanUp(Status fatal) throws InterruptedException {
228231 }
229232
230233 private void validate (InflightRecord record , AppendOutput output ) {
231- var numRecordsForAcknowledgement = output .endSeqNum () - output .startSeqNum () ;
234+ var numRecordsForAcknowledgement = output .endSeqNum - output .startSeqNum ;
232235 if (numRecordsForAcknowledgement != record .input .records .size ()) {
233236 throw Status .INTERNAL
234237 .withDescription (
@@ -287,7 +290,8 @@ public void onCompleted() {
287290 clientObserver .onError (Status .CANCELLED .asRuntimeException ());
288291 throw Status .CANCELLED .asRuntimeException ();
289292 }
290- } else if (notification instanceof Batch batch ) {
293+ } else if (notification instanceof Batch ) {
294+ final Batch batch = (Batch ) notification ;
291295 logger .debug ("notification=BATCH" );
292296 if (!inflightQueue .offer (batch .input )) {
293297 throw Status .INTERNAL .asRuntimeException ();
@@ -304,7 +308,8 @@ public void onCompleted() {
304308 + TimeUnit .NANOSECONDS .convert (this .client .config .requestTimeout )));
305309 }
306310
307- } else if (notification instanceof Ack ack ) {
311+ } else if (notification instanceof Ack ) {
312+ final Ack ack = (Ack ) notification ;
308313 logger .debug ("notification=ACK" );
309314 this .remainingAttempts .set (this .client .config .maxRetries );
310315 var correspondingInflight = inflightQueue .poll ();
@@ -313,7 +318,7 @@ public void onCompleted() {
313318 } else {
314319 validate (correspondingInflight , ack .output );
315320 correspondingInflight .callback .set (ack .output );
316- this .inflightBytes .release (correspondingInflight .meteredBytes . intValue () );
321+ this .inflightBytes .release (( int ) correspondingInflight .meteredBytes );
317322
318323 // Reset the next deadline.
319324 this .nextDeadlineSystemNanos .set (
@@ -323,18 +328,20 @@ public void onCompleted() {
323328 entry .entryNanos
324329 + TimeUnit .NANOSECONDS .convert (this .client .config .requestTimeout )));
325330 }
326- } else if (notification instanceof Error error ) {
331+ } else if (notification instanceof Error ) {
332+ final Error error = (Error ) notification ;
327333 logger .debug ("notification=ERROR" );
328334 clientObserver .onError (Status .CANCELLED .asRuntimeException ());
329335 throw new RuntimeException (error .throwable );
330336
331- } else if (notification instanceof ClientClose close ) {
337+ } else if (notification instanceof ClientClose ) {
338+ final ClientClose close = (ClientClose ) notification ;
332339 logger .debug ("notification=CLIENT_CLOSE,gracefully={}" , close .gracefully );
333340 clientObserver .onCompleted ();
334341 if (!close .gracefully ) {
335342 return null ;
336343 }
337- } else if (notification instanceof ServerClose close ) {
344+ } else if (notification instanceof ServerClose ) {
338345 logger .debug ("notification=SERVER_CLOSE" );
339346 if (acceptingAppends .get () || !inflightQueue .isEmpty () || !notificationQueue .isEmpty ()) {
340347 throw Status .INTERNAL
@@ -363,25 +370,63 @@ public ListenableFuture<Void> closeGracefully() {
363370 return daemon ;
364371 }
365372
366- sealed interface Notification permits Ack , Batch , ClientClose , Error , ServerClose {}
373+ static class InflightRecord {
374+ final AppendInput input ;
375+ final long entryNanos ;
376+ final SettableFuture <AppendOutput > callback ;
377+ final long meteredBytes ;
378+
379+ InflightRecord (
380+ AppendInput input ,
381+ long entryNanos ,
382+ SettableFuture <AppendOutput > callback ,
383+ long meteredBytes ) {
384+ this .input = input ;
385+ this .entryNanos = entryNanos ;
386+ this .callback = callback ;
387+ this .meteredBytes = meteredBytes ;
388+ }
367389
368- record InflightRecord (
369- AppendInput input ,
370- Long entryNanos ,
371- SettableFuture <AppendOutput > callback ,
372- Long meteredBytes ) {
373390 static InflightRecord construct (AppendInput input , Long meteredBytes ) {
374391 return new InflightRecord (input , System .nanoTime (), SettableFuture .create (), meteredBytes );
375392 }
376393 }
377394
378- record Batch (InflightRecord input ) implements Notification {}
395+ interface Notification {}
396+
397+ class Ack implements Notification {
398+ final AppendOutput output ;
399+
400+ Ack (AppendOutput output ) {
401+ this .output = output ;
402+ }
403+ }
379404
380- record Ack (AppendOutput output ) implements Notification {}
405+ class Batch implements Notification {
406+ final InflightRecord input ;
381407
382- record Error (Throwable throwable ) implements Notification {}
408+ Batch (InflightRecord input ) {
409+ this .input = input ;
410+ }
411+ }
383412
384- record ClientClose (boolean gracefully ) implements Notification {}
413+ class ClientClose implements Notification {
414+ final boolean gracefully ;
385415
386- record ServerClose () implements Notification {}
416+ ClientClose (boolean gracefully ) {
417+ this .gracefully = gracefully ;
418+ }
419+ }
420+
421+ class Error implements Notification {
422+ final Throwable throwable ;
423+
424+ Error (Throwable throwable ) {
425+ this .throwable = throwable ;
426+ }
427+ }
428+
429+ class ServerClose implements Notification {
430+ ServerClose () {}
431+ }
387432}
0 commit comments