Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -337,5 +337,8 @@ public static void ErrorHandshakeTimedOut(ILogger logger, TimeSpan handshakeTime

[LoggerMessage(93, LogLevel.Debug, "HubProtocol '{Protocol} v{Version}' does not support Stateful Reconnect. Disabling the feature.", EventName = "DisablingReconnect")]
public static partial void DisablingReconnect(ILogger logger, string protocol, int version);

[LoggerMessage(94, LogLevel.Error, "Failed to bind argument received in stream '{StreamId}'.", EventName = "StreamBindingFailure")]
public static partial void StreamBindingFailure(ILogger logger, string? streamId, Exception exception);
}
}
4 changes: 4 additions & 0 deletions src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1307,6 +1307,10 @@ private async Task SendWithLock(ConnectionState expectedConnectionState, HubMess
await SendWithLock(connectionState, CompletionMessage.WithError(bindingFailure.InvocationId, "Client failed to parse argument(s)."), cancellationToken: default).ConfigureAwait(false);
}
break;
case StreamBindingFailureMessage bindingFailure:
// The server can't receive a response, so we just drop the message and log
Log.StreamBindingFailure(_logger, bindingFailure.Id, bindingFailure.BindingFailure.SourceException);
break;
case InvocationMessage invocation:
Log.ReceivedInvocation(_logger, invocation.InvocationId, invocation.Target, invocation.Arguments);
await invocationMessageWriter.WriteAsync(invocation).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,38 @@ public async Task StreamYieldsItemsAsTheyArrive()
}
}

[Fact]
public async Task StreamBindingErrorLogsError()
{
using (StartVerifiableLog(expectedErrorsFilter: w => w.EventId.Name == "StreamBindingFailure"))
{
var connection = new TestConnection();
var hubConnection = CreateHubConnection(connection, loggerFactory: LoggerFactory);
try
{
await hubConnection.StartAsync().DefaultTimeout();

var channel = await hubConnection.StreamAsChannelAsync<string>("Foo").DefaultTimeout();

// Expects string, send int
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 2, item = 1 }).DefaultTimeout();
// Check that connection is still active, i.e. we ignore stream failures and keep things working.
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 2, item = "1" }).DefaultTimeout();
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 3 }).DefaultTimeout();

var notifications = await channel.ReadAndCollectAllAsync().DefaultTimeout();

Assert.Contains(TestSink.Writes, w => w.EventId.Name == "StreamBindingFailure");
Assert.Equal(["1"], notifications.ToArray());
}
finally
{
await hubConnection.DisposeAsync().DefaultTimeout();
await connection.DisposeAsync().DefaultTimeout();
}
}
}

[Fact]
public async Task HandlerRegisteredWithOnIsFiredWhenInvocationReceived()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,26 @@ public List<HubMessage> parseMessages(ByteBuffer payload, InvocationBinder binde
error = reader.nextString();
break;
case "result":
case "item":
if (invocationId == null || binder.getReturnType(invocationId) == null) {
resultToken = JsonParser.parseReader(reader);
} else {
result = gson.fromJson(reader, binder.getReturnType(invocationId));
}
break;
case "item":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we don't need to set the same guards when we're parsing properties from the result key?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason is that it's a different bug/improvement and I don't want to pollute this PR 😆

if (invocationId == null || binder.getReturnType(invocationId) == null) {
resultToken = JsonParser.parseReader(reader);
} else {
try {
result = gson.fromJson(reader, binder.getReturnType(invocationId));
} catch (Exception ex) {
argumentBindingException = ex;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a way of capturing the value that failed to bind here? If so, that might be helpful to capture in the exception. Ditto with the streamToken below.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relying on the underlying json library to provide the error, in the case of the test I added the error would contain: java.lang.NumberFormatException: For input string: "str"

// Since we failed to parse the value, tell the reader to skip the failed item
// so it can successfully continue reading
reader.skipValue();
}
}
break;
case "arguments":
if (target != null) {
boolean startedArray = false;
Expand Down Expand Up @@ -167,9 +180,17 @@ public List<HubMessage> parseMessages(ByteBuffer payload, InvocationBinder binde
case STREAM_ITEM:
if (resultToken != null) {
Type returnType = binder.getReturnType(invocationId);
result = gson.fromJson(resultToken, returnType != null ? returnType : Object.class);
try {
result = gson.fromJson(resultToken, returnType != null ? returnType : Object.class);
} catch (Exception ex) {
argumentBindingException = ex;
}
}
if (argumentBindingException != null) {
hubMessages.add(new StreamBindingFailureMessage(invocationId, argumentBindingException));
} else {
hubMessages.add(new StreamItem(null, invocationId, result));
}
hubMessages.add(new StreamItem(null, invocationId, result));
break;
case STREAM_INVOCATION:
case CANCEL_INVOCATION:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,11 @@ private void ReceiveLoop(ByteBuffer payload)
null, "Client failed to parse argument(s)."));
}
break;
case STREAM_BINDING_FAILURE:
// The server can't receive a response, so we just drop the message and log
StreamBindingFailureMessage streamError = (StreamBindingFailureMessage)message;
logger.error("Failed to bind argument received in stream '{}'.", streamError.getInvocationId(), streamError.getException());
break;
case INVOCATION:
InvocationMessage invocationMessage = (InvocationMessage) message;
connectionState.dispatchInvocation(invocationMessage);
Expand Down
3 changes: 1 addition & 2 deletions src/SignalR/clients/java/signalr/test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ dependencies {
implementation 'org.junit.jupiter:junit-jupiter-params:5.11.2'
testImplementation 'org.junit.jupiter:junit-jupiter:5.11.2'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.11.2'
implementation 'com.google.code.gson:gson:2.8.5'
implementation 'com.google.code.gson:gson:2.8.9'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this bump related to the fix or just for good measure?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't have been needed, but for some reason when debugging the test in VS Code it complained about a method not being found in gson, command line passed just fine. So I updated the dep and VS Code started working 🤷

implementation 'ch.qos.logback:logback-classic:1.2.3'
implementation project(':core')
implementation project(':messagepack')
implementation project(':messagepack')
antJUnit 'org.apache.ant:ant-junit:1.10.15'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@

package com.microsoft.signalr;

import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.lang.reflect.Type;
import java.nio.ByteBuffer;
Expand All @@ -17,7 +21,6 @@
import java.util.concurrent.atomic.AtomicReference;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.extension.ExtendWith;

import ch.qos.logback.classic.spi.ILoggingEvent;
Expand Down Expand Up @@ -1097,6 +1100,37 @@ public void checkStreamCompletionError() {
assertEquals("There was an error", exception.getMessage());
}

@Test
public void checkStreamItemBindingFailure() {
try (TestLogger logger = new TestLogger()) {
MockTransport mockTransport = new MockTransport();
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);

hubConnection.start().timeout(30, TimeUnit.SECONDS).blockingAwait();

AtomicBoolean onNextCalled = new AtomicBoolean();
Observable<Integer> result = hubConnection.stream(Integer.class, "echo", "message");
result.subscribe((item) -> onNextCalled.set(true),
(error) -> {},
() -> {});

assertEquals("{\"type\":4,\"invocationId\":\"1\",\"target\":\"echo\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR,
TestUtils.byteBufferToString(mockTransport.getSentMessages()[1]));
assertFalse(onNextCalled.get());

mockTransport.receiveMessage("{\"type\":2,\"invocationId\":\"1\",\"item\":\"str\"}" + RECORD_SEPARATOR);

assertFalse(onNextCalled.get());

mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\",\"result\":1}" + RECORD_SEPARATOR);

assertEquals(1, result.timeout(30, TimeUnit.SECONDS).blockingFirst());

ILoggingEvent log = logger.assertLog("Failed to bind argument received in stream '1'.");
assertTrue(log.getThrowableProxy().getClassName().contains("gson.JsonSyntaxException"));
}
}

@Test
public void checkStreamCompletionErrorWithMessagePack() {
MockTransport mockTransport = new MockTransport();
Expand Down
Loading