diff --git a/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java b/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java index 35158cbb..c78ca567 100644 --- a/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java +++ b/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java @@ -16,6 +16,7 @@ import org.json.JSONException; import org.json.JSONObject; +import org.json.JSONArray; import java.net.URISyntaxException; import java.util.ArrayList; @@ -63,6 +64,7 @@ interface SignalingChannelObserver { private final int MAX_RECONNECT_ATTEMPTS = 5; private String reconnectionTicket; private int reconnectAttempts = 0; + private int messageSequence = 0; // No lock is guarding loggedIn so void access and modify it on threads other than // |callbackExecutor|. private boolean loggedIn = false; @@ -107,10 +109,19 @@ interface SignalingChannelObserver { private final Listener disconnectCallback = args -> callbackExecutor.execute( this::triggerDisconnected); + // Count internal message sequence + private void incrementMessageSequence() { + if (messageSequence == Integer.MAX_VALUE) { + messageSequence = 0; + } else { + messageSequence++; + } + } // MCU events. private final Listener progressCallback = (Object... args) -> callbackExecutor.execute(() -> { JSONObject msg = (JSONObject) args[0]; observer.onProgressMessage(msg); + incrementMessageSequence(); }); private final Listener participantCallback = (Object... args) -> callbackExecutor.execute( () -> { @@ -129,6 +140,7 @@ interface SignalingChannelObserver { } catch (JSONException e) { DCHECK(e); } + incrementMessageSequence(); }); private final Listener streamCallback = (Object... args) -> callbackExecutor.execute(() -> { try { @@ -154,6 +166,7 @@ interface SignalingChannelObserver { } catch (JSONException e) { DCHECK(e); } + incrementMessageSequence(); }); private final Listener textCallback = (Object... args) -> callbackExecutor.execute(() -> { JSONObject data = (JSONObject) args[0]; @@ -163,6 +176,7 @@ interface SignalingChannelObserver { } catch (JSONException e) { DCHECK(false); } + incrementMessageSequence(); }); private final Listener dropCallback = args -> triggerDisconnected(); @@ -269,7 +283,41 @@ private void relogin() { DCHECK(reconnectionTicket); socketClient.emit("relogin", reconnectionTicket, (Ack) (Object... args) -> { if (extractMsg(0, args).equals("ok")) { - reconnectionTicket = (String) args[1]; + if (args[1] instanceof JSONObject) { + try { + reconnectionTicket = ((JSONObject) args[1]).getString("ticket"); + JSONArray pendingMessages = ((JSONObject) args[1]).getJSONArray("messages"); + boolean isMissingStart = false; + for (int i = 0; i < pendingMessages.length(); i++) { + JSONObject message = pendingMessages.getJSONObject(i); + if (isMissingStart) { + Object messageData = message.get("data"); + switch (message.getString("event")) { + case "participant": + participantCallback.call(messageData); + break; + case "text": + textCallback.call(messageData); + break; + case "stream": + streamCallback.call(messageData); + break; + case "progress": + progressCallback.call(messageData); + break; + default: + DCHECK(false); + } + } else if (message.getInt("seq") == messageSequence) { + isMissingStart = true; + } + } + } catch (JSONException e) { + DCHECK(e); + } + } else { + reconnectionTicket = (String) args[1]; + } reconnectAttempts = 0; flushCachedMsg(); onRefreshReconnectionTicket();