diff --git a/src/SignalR/clients/csharp/Client.Core/src/HubConnection.Log.cs b/src/SignalR/clients/csharp/Client.Core/src/HubConnection.Log.cs index 51c8941054ca..942ac147b363 100644 --- a/src/SignalR/clients/csharp/Client.Core/src/HubConnection.Log.cs +++ b/src/SignalR/clients/csharp/Client.Core/src/HubConnection.Log.cs @@ -337,5 +337,8 @@ public static void ErrorHandshakeTimedOut(ILogger logger, TimeSpan handshakeTime [LoggerMessage(93, LogLevel.Debug, "HubProtocol '{Protocol} v{Version}' does not support Stateful Reconnect. Disabling the feature.", EventName = "DisablingReconnect")] public static partial void DisablingReconnect(ILogger logger, string protocol, int version); + + [LoggerMessage(94, LogLevel.Error, "Failed to bind argument received in stream '{StreamId}'.", EventName = "StreamBindingFailure")] + public static partial void StreamBindingFailure(ILogger logger, string? streamId, Exception exception); } } diff --git a/src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs b/src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs index 1d298f5d63fe..f52e5ba32357 100644 --- a/src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs +++ b/src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs @@ -1307,6 +1307,10 @@ private async Task SendWithLock(ConnectionState expectedConnectionState, HubMess await SendWithLock(connectionState, CompletionMessage.WithError(bindingFailure.InvocationId, "Client failed to parse argument(s)."), cancellationToken: default).ConfigureAwait(false); } break; + case StreamBindingFailureMessage bindingFailure: + // The server can't receive a response, so we just drop the message and log + Log.StreamBindingFailure(_logger, bindingFailure.Id, bindingFailure.BindingFailure.SourceException); + break; case InvocationMessage invocation: Log.ReceivedInvocation(_logger, invocation.InvocationId, invocation.Target, invocation.Arguments); await invocationMessageWriter.WriteAsync(invocation).ConfigureAwait(false); diff --git a/src/SignalR/clients/csharp/Client/test/UnitTests/HubConnectionTests.Protocol.cs b/src/SignalR/clients/csharp/Client/test/UnitTests/HubConnectionTests.Protocol.cs index cd7eff93f282..8d6f08f8e308 100644 --- a/src/SignalR/clients/csharp/Client/test/UnitTests/HubConnectionTests.Protocol.cs +++ b/src/SignalR/clients/csharp/Client/test/UnitTests/HubConnectionTests.Protocol.cs @@ -409,6 +409,38 @@ public async Task StreamYieldsItemsAsTheyArrive() } } + [Fact] + public async Task StreamBindingErrorLogsError() + { + using (StartVerifiableLog(expectedErrorsFilter: w => w.EventId.Name == "StreamBindingFailure")) + { + var connection = new TestConnection(); + var hubConnection = CreateHubConnection(connection, loggerFactory: LoggerFactory); + try + { + await hubConnection.StartAsync().DefaultTimeout(); + + var channel = await hubConnection.StreamAsChannelAsync("Foo").DefaultTimeout(); + + // Expects string, send int + await connection.ReceiveJsonMessage(new { invocationId = "1", type = 2, item = 1 }).DefaultTimeout(); + // Check that connection is still active, i.e. we ignore stream failures and keep things working. + await connection.ReceiveJsonMessage(new { invocationId = "1", type = 2, item = "1" }).DefaultTimeout(); + await connection.ReceiveJsonMessage(new { invocationId = "1", type = 3 }).DefaultTimeout(); + + var notifications = await channel.ReadAndCollectAllAsync().DefaultTimeout(); + + Assert.Contains(TestSink.Writes, w => w.EventId.Name == "StreamBindingFailure"); + Assert.Equal(["1"], notifications.ToArray()); + } + finally + { + await hubConnection.DisposeAsync().DefaultTimeout(); + await connection.DisposeAsync().DefaultTimeout(); + } + } + } + [Fact] public async Task HandlerRegisteredWithOnIsFiredWhenInvocationReceived() { diff --git a/src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/GsonHubProtocol.java b/src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/GsonHubProtocol.java index 042ca484806f..828fccb45655 100644 --- a/src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/GsonHubProtocol.java +++ b/src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/GsonHubProtocol.java @@ -92,13 +92,26 @@ public List parseMessages(ByteBuffer payload, InvocationBinder binde error = reader.nextString(); break; case "result": - case "item": if (invocationId == null || binder.getReturnType(invocationId) == null) { resultToken = JsonParser.parseReader(reader); } else { result = gson.fromJson(reader, binder.getReturnType(invocationId)); } break; + case "item": + if (invocationId == null || binder.getReturnType(invocationId) == null) { + resultToken = JsonParser.parseReader(reader); + } else { + try { + result = gson.fromJson(reader, binder.getReturnType(invocationId)); + } catch (Exception ex) { + argumentBindingException = ex; + // Since we failed to parse the value, tell the reader to skip the failed item + // so it can successfully continue reading + reader.skipValue(); + } + } + break; case "arguments": if (target != null) { boolean startedArray = false; @@ -167,9 +180,17 @@ public List parseMessages(ByteBuffer payload, InvocationBinder binde case STREAM_ITEM: if (resultToken != null) { Type returnType = binder.getReturnType(invocationId); - result = gson.fromJson(resultToken, returnType != null ? returnType : Object.class); + try { + result = gson.fromJson(resultToken, returnType != null ? returnType : Object.class); + } catch (Exception ex) { + argumentBindingException = ex; + } + } + if (argumentBindingException != null) { + hubMessages.add(new StreamBindingFailureMessage(invocationId, argumentBindingException)); + } else { + hubMessages.add(new StreamItem(null, invocationId, result)); } - hubMessages.add(new StreamItem(null, invocationId, result)); break; case STREAM_INVOCATION: case CANCEL_INVOCATION: diff --git a/src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/HubConnection.java b/src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/HubConnection.java index 7ae85b551cde..ab7763e6f54c 100644 --- a/src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/HubConnection.java +++ b/src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/HubConnection.java @@ -483,6 +483,11 @@ private void ReceiveLoop(ByteBuffer payload) null, "Client failed to parse argument(s).")); } break; + case STREAM_BINDING_FAILURE: + // The server can't receive a response, so we just drop the message and log + StreamBindingFailureMessage streamError = (StreamBindingFailureMessage)message; + logger.error("Failed to bind argument received in stream '{}'.", streamError.getInvocationId(), streamError.getException()); + break; case INVOCATION: InvocationMessage invocationMessage = (InvocationMessage) message; connectionState.dispatchInvocation(invocationMessage); diff --git a/src/SignalR/clients/java/signalr/test/build.gradle b/src/SignalR/clients/java/signalr/test/build.gradle index 27b81b32c947..8c06c5418456 100644 --- a/src/SignalR/clients/java/signalr/test/build.gradle +++ b/src/SignalR/clients/java/signalr/test/build.gradle @@ -10,11 +10,10 @@ dependencies { implementation 'org.junit.jupiter:junit-jupiter-params:5.11.2' testImplementation 'org.junit.jupiter:junit-jupiter:5.11.2' testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.11.2' - implementation 'com.google.code.gson:gson:2.8.5' + implementation 'com.google.code.gson:gson:2.8.9' implementation 'ch.qos.logback:logback-classic:1.2.3' implementation project(':core') implementation project(':messagepack') - implementation project(':messagepack') antJUnit 'org.apache.ant:ant-junit:1.10.15' } diff --git a/src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/HubConnectionTest.java b/src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/HubConnectionTest.java index 34a9d631b411..396b58518df3 100644 --- a/src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/HubConnectionTest.java +++ b/src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/HubConnectionTest.java @@ -3,7 +3,11 @@ package com.microsoft.signalr; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.lang.reflect.Type; import java.nio.ByteBuffer; @@ -17,7 +21,6 @@ import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.extension.ExtendWith; import ch.qos.logback.classic.spi.ILoggingEvent; @@ -1097,6 +1100,37 @@ public void checkStreamCompletionError() { assertEquals("There was an error", exception.getMessage()); } + @Test + public void checkStreamItemBindingFailure() { + try (TestLogger logger = new TestLogger()) { + MockTransport mockTransport = new MockTransport(); + HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); + + hubConnection.start().timeout(30, TimeUnit.SECONDS).blockingAwait(); + + AtomicBoolean onNextCalled = new AtomicBoolean(); + Observable result = hubConnection.stream(Integer.class, "echo", "message"); + result.subscribe((item) -> onNextCalled.set(true), + (error) -> {}, + () -> {}); + + assertEquals("{\"type\":4,\"invocationId\":\"1\",\"target\":\"echo\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR, + TestUtils.byteBufferToString(mockTransport.getSentMessages()[1])); + assertFalse(onNextCalled.get()); + + mockTransport.receiveMessage("{\"type\":2,\"invocationId\":\"1\",\"item\":\"str\"}" + RECORD_SEPARATOR); + + assertFalse(onNextCalled.get()); + + mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\",\"result\":1}" + RECORD_SEPARATOR); + + assertEquals(1, result.timeout(30, TimeUnit.SECONDS).blockingFirst()); + + ILoggingEvent log = logger.assertLog("Failed to bind argument received in stream '1'."); + assertTrue(log.getThrowableProxy().getClassName().contains("gson.JsonSyntaxException")); + } + } + @Test public void checkStreamCompletionErrorWithMessagePack() { MockTransport mockTransport = new MockTransport();