4343import java .util .concurrent .locks .Lock ;
4444import java .util .concurrent .locks .ReadWriteLock ;
4545import java .util .concurrent .locks .ReentrantReadWriteLock ;
46+ import java .util .function .BiConsumer ;
4647import java .util .function .Consumer ;
4748import java .util .logging .Level ;
4849import java .util .logging .Logger ;
@@ -70,11 +71,12 @@ public class Connection implements Closeable {
7071 return thread ;
7172 });
7273 private static final AtomicLong NEXT_ID = new AtomicLong (1L );
74+ private static final AtomicLong NEXT_SEQUENCE = new AtomicLong (1L );
7375 private WebSocket socket ;
7476 private final Map <Long , Consumer <Either <Throwable , JsonInput >>> methodCallbacks =
7577 new ConcurrentHashMap <>();
7678 private final ReadWriteLock callbacksLock = new ReentrantReadWriteLock (true );
77- private final Map <Event <?>, List <Consumer < ?>>> eventCallbacks = new HashMap <>();
79+ private final Map <Event <?>, List <BiConsumer < Long , ?>>> eventCallbacks = new HashMap <>();
7880 private HttpClient client ;
7981 private final String url ;
8082 private final AtomicBoolean isClosed ;
@@ -196,7 +198,7 @@ public <X> X sendAndWait(SessionID sessionId, Command<X> command, Duration timeo
196198 }
197199 }
198200
199- public <X > void addListener (Event <X > event , Consumer < X > handler ) {
201+ public <X > void addListener (Event <X > event , BiConsumer < Long , X > handler ) {
200202 Require .nonNull ("Event to listen for" , event );
201203 Require .nonNull ("Handler to call" , handler );
202204
@@ -230,10 +232,11 @@ private class Listener implements WebSocket.Listener {
230232
231233 @ Override
232234 public void onText (CharSequence data ) {
235+ long sequence = NEXT_SEQUENCE .getAndIncrement ();
233236 EXECUTOR .execute (
234237 () -> {
235238 try {
236- handle (data );
239+ handle (sequence , data );
237240 } catch (Throwable t ) {
238241 LOG .log (Level .WARNING , "Unable to process: " + data , t );
239242 throw new DevToolsException (t );
@@ -242,7 +245,7 @@ public void onText(CharSequence data) {
242245 }
243246 }
244247
245- private void handle (CharSequence data ) {
248+ private void handle (long sequence , CharSequence data ) {
246249 // It's kind of gross to decode the data twice, but this lets us get started on something
247250 // that feels nice to users.
248251 // TODO: decode once, and once only
@@ -335,14 +338,14 @@ private void handle(CharSequence data) {
335338 return ;
336339 }
337340
338- for (Consumer < ?> action : event .getValue ()) {
341+ for (BiConsumer < Long , ?> action : event .getValue ()) {
339342 @ SuppressWarnings ("unchecked" )
340- Consumer < Object > obj = (Consumer < Object >) action ;
343+ BiConsumer < Long , Object > obj = (BiConsumer < Long , Object >) action ;
341344 LOG .log (
342345 getDebugLogLevel (),
343346 "Calling callback for {0} using {1} being passed {2}" ,
344347 new Object [] {event .getKey (), obj , params });
345- obj .accept (params );
348+ obj .accept (sequence , params );
346349 }
347350 }
348351 });
0 commit comments