From 76311274e984c289d03a517db6ea800b2cb75e0a Mon Sep 17 00:00:00 2001 From: Chen Li1 Date: Wed, 18 Dec 2019 14:11:34 +0800 Subject: [PATCH 1/4] Process missing messages during reconnection --- .../base/src/main/java/owt/base/Const.java | 2 +- .../java/owt/conference/SignalingChannel.java | 39 ++++++++++++++++++- 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/src/sdk/base/src/main/java/owt/base/Const.java b/src/sdk/base/src/main/java/owt/base/Const.java index 6d7025d1..33a12400 100644 --- a/src/sdk/base/src/main/java/owt/base/Const.java +++ b/src/sdk/base/src/main/java/owt/base/Const.java @@ -18,6 +18,6 @@ public class Const { "'unifiedPlan': true," + "'streamRemovable': true}" + "}"; - public static final String PROTOCOL_VERSION = "1.0"; + public static final String PROTOCOL_VERSION = "1.0.1"; } ///@endcond diff --git a/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java b/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java index 7123566d..99965670 100644 --- a/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java +++ b/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java @@ -16,6 +16,7 @@ import org.json.JSONException; import org.json.JSONObject; +import org.json.JSONArray; import java.net.URISyntaxException; import java.util.ArrayList; @@ -62,6 +63,7 @@ interface SignalingChannelObserver { private final int MAX_RECONNECT_ATTEMPTS = 5; private String reconnectionTicket; private int reconnectAttempts = 0; + private int messageSequence = 0; // No lock is guarding loggedIn so void access and modify it on threads other than // |callbackExecutor|. private boolean loggedIn = false; @@ -110,6 +112,7 @@ interface SignalingChannelObserver { private final Listener progressCallback = (Object... args) -> callbackExecutor.execute(() -> { JSONObject msg = (JSONObject) args[0]; observer.onProgressMessage(msg); + messageSequence++; }); private final Listener participantCallback = (Object... args) -> callbackExecutor.execute( () -> { @@ -128,6 +131,7 @@ interface SignalingChannelObserver { } catch (JSONException e) { DCHECK(e); } + messageSequence++; }); private final Listener streamCallback = (Object... args) -> callbackExecutor.execute(() -> { try { @@ -153,6 +157,7 @@ interface SignalingChannelObserver { } catch (JSONException e) { DCHECK(e); } + messageSequence++; }); private final Listener textCallback = (Object... args) -> callbackExecutor.execute(() -> { JSONObject data = (JSONObject) args[0]; @@ -162,6 +167,7 @@ interface SignalingChannelObserver { } catch (JSONException e) { DCHECK(false); } + messageSequence++; }); private final Listener dropCallback = args -> triggerDisconnected(); @@ -267,7 +273,38 @@ private void relogin() { DCHECK(reconnectionTicket); socketClient.emit("relogin", reconnectionTicket, (Ack) (Object... args) -> { if (extractMsg(0, args).equals("ok")) { - reconnectionTicket = (String) args[1]; + if (args[1] instanceof JSONObject) { + try { + reconnectionTicket = ((JSONObject) args[1]).getString("ticket"); + JSONArray pendingMessages = ((JSONObject) args[1]).getJSONArray("messages"); + for (int i = 0; i < pendingMessages.length(); i++) { + JSONObject message = pendingMessages.getJSONObject(i); + if (message.getInt("seq") > messageSequence) { + Object messageData = message.get("data"); + switch (message.getString("event")) { + case "participant": + participantCallback.call(messageData); + break; + case "text": + textCallback.call(messageData); + break; + case "stream": + streamCallback.call(messageData); + break; + case "progress": + progressCallback.call(messageData); + break; + default: + DCHECK(false); + } + } + } + } catch (JSONException e) { + DCHECK(e); + } + } else { + reconnectionTicket = (String) args[1]; + } reconnectAttempts = 0; flushCachedMsg(); } else { From 33ad1cacdef1c663ef5ee29df05dd3718059d94e Mon Sep 17 00:00:00 2001 From: Chen Li1 Date: Wed, 18 Dec 2019 16:17:36 +0800 Subject: [PATCH 2/4] Avoid message sequence overflow --- .../java/owt/conference/SignalingChannel.java | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java b/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java index 99965670..9da14c46 100644 --- a/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java +++ b/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java @@ -108,11 +108,19 @@ interface SignalingChannelObserver { private final Listener disconnectCallback = args -> callbackExecutor.execute( this::triggerDisconnected); + // Count internal message sequence + private void incrementMessageSequence() { + if (messageSequence == Integer.MAX_VALUE) { + messageSequence = 0; + } else { + messageSequence++; + } + } // MCU events. private final Listener progressCallback = (Object... args) -> callbackExecutor.execute(() -> { JSONObject msg = (JSONObject) args[0]; observer.onProgressMessage(msg); - messageSequence++; + incrementMessageSequence(); }); private final Listener participantCallback = (Object... args) -> callbackExecutor.execute( () -> { @@ -131,7 +139,7 @@ interface SignalingChannelObserver { } catch (JSONException e) { DCHECK(e); } - messageSequence++; + incrementMessageSequence(); }); private final Listener streamCallback = (Object... args) -> callbackExecutor.execute(() -> { try { @@ -157,7 +165,7 @@ interface SignalingChannelObserver { } catch (JSONException e) { DCHECK(e); } - messageSequence++; + incrementMessageSequence(); }); private final Listener textCallback = (Object... args) -> callbackExecutor.execute(() -> { JSONObject data = (JSONObject) args[0]; @@ -167,7 +175,7 @@ interface SignalingChannelObserver { } catch (JSONException e) { DCHECK(false); } - messageSequence++; + incrementMessageSequence(); }); private final Listener dropCallback = args -> triggerDisconnected(); @@ -277,9 +285,10 @@ private void relogin() { try { reconnectionTicket = ((JSONObject) args[1]).getString("ticket"); JSONArray pendingMessages = ((JSONObject) args[1]).getJSONArray("messages"); + boolean isMissingStart = false; for (int i = 0; i < pendingMessages.length(); i++) { JSONObject message = pendingMessages.getJSONObject(i); - if (message.getInt("seq") > messageSequence) { + if (isMissingStart) { Object messageData = message.get("data"); switch (message.getString("event")) { case "participant": @@ -298,6 +307,9 @@ private void relogin() { DCHECK(false); } } + if (message.get("seq") == messageSequence) { + isMissingStart = true; + } } } catch (JSONException e) { DCHECK(e); From 70ae5b521a0d31c59f82c20e21c6841fcea6c6ad Mon Sep 17 00:00:00 2001 From: Chen Li1 Date: Wed, 18 Dec 2019 16:56:01 +0800 Subject: [PATCH 3/4] Adjust conditions --- .../src/main/java/owt/conference/SignalingChannel.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java b/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java index 9da14c46..e88a9e85 100644 --- a/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java +++ b/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java @@ -306,8 +306,7 @@ private void relogin() { default: DCHECK(false); } - } - if (message.get("seq") == messageSequence) { + } else if (message.get("seq") == messageSequence) { isMissingStart = true; } } From 5c4922726a2b7750ab602ee8257222e345c74489 Mon Sep 17 00:00:00 2001 From: Chen Li1 Date: Thu, 26 Dec 2019 09:36:07 +0800 Subject: [PATCH 4/4] Fix seq type --- .../src/main/java/owt/conference/SignalingChannel.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java b/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java index e88a9e85..71c5bea6 100644 --- a/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java +++ b/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java @@ -306,7 +306,7 @@ private void relogin() { default: DCHECK(false); } - } else if (message.get("seq") == messageSequence) { + } else if (message.getInt("seq") == messageSequence) { isMissingStart = true; } }