Skip to content
Open
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
eb975a4
asdf
timtay-microsoft Jun 26, 2025
4d27162
noodling so far
timtay-microsoft Jun 30, 2025
0163c76
sadf
timtay-microsoft Jun 30, 2025
73b12d7
maybe
timtay-microsoft Jun 30, 2025
4080104
doc
timtay-microsoft Jun 30, 2025
4ac4003
more
timtay-microsoft Jul 7, 2025
af7c829
asdf
timtay-microsoft Jul 7, 2025
215eb64
asdf
timtay-microsoft Jul 17, 2025
615ba26
thoughts
timtay-microsoft Jul 18, 2025
ad17af3
more
timtay-microsoft Jul 18, 2025
a2c2f6e
notes
timtay-microsoft Jul 22, 2025
6bf1bcc
caching
timtay-microsoft Jul 22, 2025
180d85e
more thoughts
timtay-microsoft Jul 24, 2025
599a431
Merge branch 'main' into timtay/streaming
timtay-microsoft Jul 25, 2025
a1d2e21
no new error code, some gRPC notes
timtay-microsoft Jul 25, 2025
e680263
save impl for later
timtay-microsoft Jul 25, 2025
629dc2f
ordering q
timtay-microsoft Jul 25, 2025
f2a2c60
wording
timtay-microsoft Jul 25, 2025
710a425
links
timtay-microsoft Jul 25, 2025
0006c02
backwards
timtay-microsoft Jul 25, 2025
b457888
first thoughts on cancellation, re-order doc a bit
timtay-microsoft Jul 25, 2025
f16ad43
more notes, more re-ordering
timtay-microsoft Jul 25, 2025
e7c98d5
cleanup
timtay-microsoft Jul 28, 2025
1c1964b
Only allow cancelling streaming commands
timtay-microsoft Jul 28, 2025
edecd39
Merge branch 'main' into timtay/streaming
timtay-microsoft Jul 28, 2025
61a5212
typo
timtay-microsoft Jul 28, 2025
79af301
code changes in another branch
timtay-microsoft Jul 28, 2025
c7fb69e
more
timtay-microsoft Jul 28, 2025
aa2dc81
more
timtay-microsoft Jul 28, 2025
0784a10
Update ExtendedResponse.cs
timtay-microsoft Jul 28, 2025
ff9efc9
Update AkriSystemProperties.cs
timtay-microsoft Jul 28, 2025
0ff4dbe
Update 0025-rpc-streaming.md
timtay-microsoft Jul 30, 2025
4aef0a4
Remove responseId concept. User will do this with their own user prop…
timtay-microsoft Aug 1, 2025
9dbb174
Incorporate a lot of feedback
timtay-microsoft Aug 1, 2025
49b57c7
cleanup
timtay-microsoft Aug 1, 2025
ac799bc
canceled error code instead of header
timtay-microsoft Aug 1, 2025
04f1ce9
timeout thoughts
timtay-microsoft Aug 1, 2025
14ca259
Merge branch 'main' into timtay/streaming
timtay-microsoft Aug 1, 2025
353a2fd
asdf
timtay-microsoft Aug 4, 2025
439c5f8
Merge branch 'timtay/streaming' of https://github.com/Azure/iot-opera…
timtay-microsoft Aug 4, 2025
e462a3f
reword
timtay-microsoft Aug 4, 2025
4c4cb0b
API fix
timtay-microsoft Aug 4, 2025
5050ada
not needed
timtay-microsoft Aug 4, 2025
9caa59d
non-req
timtay-microsoft Aug 4, 2025
58d8fc1
fix type
timtay-microsoft Aug 5, 2025
fead2ad
timeout musings
timtay-microsoft Aug 6, 2025
7c83f06
wording
timtay-microsoft Aug 7, 2025
fb9de69
Update 0025-rpc-streaming.md
timtay-microsoft Aug 7, 2025
29e1811
Update 0025-rpc-streaming.md
timtay-microsoft Aug 7, 2025
c15790c
Update 0025-rpc-streaming.md
timtay-microsoft Aug 7, 2025
98f8763
Update 0025-rpc-streaming.md
timtay-microsoft Aug 7, 2025
bf8c252
Address feedback
timtay-microsoft Sep 3, 2025
8e6a6d8
note
timtay-microsoft Sep 3, 2025
59afded
more
timtay-microsoft Sep 4, 2025
69d7781
More, complete streams at any time
timtay-microsoft Sep 5, 2025
4a5b22d
ExecutorId is mandatory
timtay-microsoft Sep 5, 2025
91f55f3
fix .NET APIs
timtay-microsoft Sep 6, 2025
0c7fd4a
disconnection considerations
timtay-microsoft Sep 6, 2025
66556a5
De-duping?
timtay-microsoft Sep 6, 2025
6b15e19
Revert "De-duping?"
timtay-microsoft Sep 6, 2025
bddf3f9
executorId in tests
timtay-microsoft Sep 8, 2025
f5dff71
Merge branch 'main' into timtay/streaming
timtay-microsoft Sep 8, 2025
c39c41a
fix
timtay-microsoft Sep 8, 2025
6c3f1ec
Merge branch 'timtay/streaming' of https://github.com/Azure/iot-opera…
timtay-microsoft Sep 8, 2025
abe8ae8
No more executor Id, just use $partition
timtay-microsoft Sep 10, 2025
e6cef75
de-dup + qos 1 clarification
timtay-microsoft Sep 10, 2025
f4929a2
fixup
timtay-microsoft Sep 10, 2025
8322019
Optionally delay acknowledgements
timtay-microsoft Sep 10, 2025
1c6c3ad
cancellation user properties so far
timtay-microsoft Sep 10, 2025
e8d75e4
more cancellation user properties support
timtay-microsoft Sep 12, 2025
645f306
message level timeout is back
timtay-microsoft Sep 15, 2025
3e0f863
fixup
timtay-microsoft Sep 15, 2025
886906c
fixup
timtay-microsoft Sep 15, 2025
040940d
timeout vs cancellation
timtay-microsoft Sep 15, 2025
c7cba20
more
timtay-microsoft Sep 15, 2025
8f72c15
isLast
timtay-microsoft Sep 16, 2025
5c5d4b3
unused
timtay-microsoft Sep 16, 2025
1e59bd9
expiry interval note
timtay-microsoft Sep 16, 2025
266a809
message expiry purpose
timtay-microsoft Sep 17, 2025
10f55dc
broker behavior
timtay-microsoft Sep 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 187 additions & 0 deletions doc/dev/adr/0025-rpc-streaming.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# ADR 25: RPC Streaming

## Context

Users have expressed a desire to allow more than one response per RPC invocation. This would enable scenarios like:

- Execute long-running commands while still being responsive
- Allow users to report status over time for a long-running command

## Requirements

- Allow for an arbitrary number of command responses for a single command invocation
- The total number of responses does not need to be known before the first response is sent
- When exposed to the user, each response includes an index of where it was in the stream and an optional response Id
- Allow for multiple separate commands to be streamed simultaneously
- Allow for invoker to cancel streamed responses mid-stream

## Non-requirements

- Different payload shapes per command response
- "Client Streaming" RPC (multiples requests -> One command response)
- Bi-directional streaming RPC (multiples requests -> multiple responses)
- Allow for executor to cancel streamed responses mid-stream

## State of the art

gRPC supports these patterns for RPC:
- [Unary RPC](https://grpc.io/docs/what-is-grpc/core-concepts/#unary-rpc) (1 request message, 1 response message)
- [Server streaming RPC](https://grpc.io/docs/what-is-grpc/core-concepts/#server-streaming-rpc) (1 request message, many response messages)
- [Client streaming RPC](https://grpc.io/docs/what-is-grpc/core-concepts/#server-streaming-rpc) (many request messages, one response message)
- [Bi-directional streaming RPC](https://grpc.io/docs/what-is-grpc/core-concepts/#bidirectional-streaming-rpc) (many request messages, many response messages)

[gRPC also allows for either the client or server to cancel an RPC at any time](https://grpc.io/docs/what-is-grpc/core-concepts/#cancelling-an-rpc)

## Decision

### API design, .NET

#### Invoker side

Our command invoker base class will now include a new method ```InvokeCommandWithStreaming``` to go with the existing ```InvokeCommand``` method.

This new method will take the same parameters as ```InvokeCommand``` but will return an asynchronously iterable list (or callback depending on language?) of command response objects.

```csharp
public abstract class CommandInvoker<TReq, TResp>
where TReq : class
where TResp : class
{
// Single response
public Task<ExtendedResponse<TResp>> InvokeCommandAsync(TReq request, ...) {...}

// Many responses, responses may be staggered
public IAsyncEnumerable<StreamingExtendedResponse<TResp>> InvokeStreamingCommandAsync(TReq request, ...) {...}
}
```

Additionally, this new method will return an extended version of the ```ExtendedResponse``` wrapper that will include the streaming-specific information about each response:

```csharp
public class StreamingExtendedResponse<TResp> : ExtendedResponse<TResp>
where TResp : class
{
/// <summary>
/// An optional Id for this response (relative to the other responses in this response stream)
/// </summary>
/// <remarks>
/// Users are allowed to provide Ids for each response, only for specific responses, or for none of the responses.
/// </remarks>
public string? StreamingResponseId { get; set; }

/// <summary>
/// The index of this response relative to the other responses in this response stream. Starts at 0.
/// </summary>
public int StreamingResponseIndex { get; set; }

/// <summary>
/// If true, this response is the final response in this response stream.
/// </summary>
public bool IsLastResponse { get; set; }
}
```

#### Executor side

On the executor side, we will define a separate callback that executes whenever a streaming command is invoked. Instead of returning the single response, this callback will return the asynchronously iterable list of responses. Importantly, this iterable may still be added to by the user after this callback has finished.

```csharp
public abstract class CommandExecutor<TReq, TResp> : IAsyncDisposable
where TReq : class
where TResp : class
{
/// <summary>
/// The callback to execute each time a non-streaming command request is received.
/// </summary>
/// <remarks>
/// This callback may be null if this command executor only supports commands that stream responses.
/// </remarks>
public Func<ExtendedRequest<TReq>, CancellationToken, Task<ExtendedResponse<TResp>>>? OnCommandReceived { get; set; }

/// <summary>
/// The callback to execute each time a command request that expects streamed responses is received.
/// </summary>
/// <remarks>
/// The callback provides the request itself and requires the user to return one to many responses. This callback may be null
/// if this command executors doesn't have any streaming commands.
/// </remarks>
public Func<ExtendedRequest<TReq>, CancellationToken, Task<IAsyncEnumerable<StreamingExtendedResponse<TResp>>>>? OnStreamingCommandReceived { get; set; }
}

```

With this design, commands that use streaming are defined at codegen time. Codegen layer changes will be defined in a separate ADR, though.

### MQTT layer protocol

#### Command invoker side

- The command invoker's request message will include an MQTT user property with name "__streamResp" and value "true".
- Executor needs to know if it can stream the response, and this is the flag that affirms it
- The command invoker will listen for command responses with the correlation data that matches the invoked method's correlation data until it receives a response with the "__isLastResp" flag set to "true"
- The command invoker will acknowledge all messages it receives that match the correlation data of the command request

#### Command executor side

- The command executor receives a command with "__streamResp" flag set to "true"
- All command responses will use the same MQTT message correlation data as the request provided so that the invoker can map responses to the appropriate command invocation.
- Each streamed response must contain an MQTT user property with name "__streamIndex" and value equal to the index of this response relative to the other responses (0 for the first response, 1 for the second response, etc.)
- Each streamed response may contain an MQTT user property with name "__streamRespId" and value equal to that response's streaming response Id. This is an optional and user-provided value.
- The final command response will include an MQTT user property "__isLastResp" with value "true" to signal that it is the final response in the stream.
- A streaming command is allowed to have a single response. It must include the "__isLastResp" flag in that first/final response
- Cache is only updated once the stream has completed and it is updated to include all of the responses (in order) for the command so they can be re-played if the streaming command is invoked again by the same client

- The command executor receives a command **without** "__streamResp" flag set to "true"
- The command must be responded to without streaming

### Cancellation support

To avoid scenarios where long-running streaming responses are no longer wanted, we will want to support cancelling streaming RPC calls.

#### Invoker side

- The command invoker may cancel a streaming RPC call at an arbitrary time by sending an MQTT message with:
- The same MQTT topic as the invoked method
- The same correlation data as the invoked method
- The user property "__stopRpc" set to "true".
- No payload
- The command invoker should still listen on the response topic for a response from the executor which may still contain a successful response (if cancellation was received after the command completed successfully) or a response signalling that cancellation succeeded

#### Executor side

Upon receiving an MQTT message with the "__stopRpc" flag set to "true" that correlates to an actively executing streaming command, the command executor should:
- Notify the application layer that that RPC has been canceled if it is still running
- Send an MQTT message to the appropriate response topic with error code "canceled" to notify the invoker that the RPC has stopped and no further responses will be sent.

If the executor receives a cancellation request for a streaming command that has already completed, then the cancellation request should be ignored.

### Protocol version update

This RPC streaming feature is not backwards compatible (new invoker can't initiate what it believes is a streaming RPC call on an old executor), so it requires a bump in our RPC protocol version from "1.0" to "2.0".

## Alternative designs considered

- Allow the command executor to decide at run time of each command if it will stream responses independent of the command invoker's request
- This would force users to always call the ```InvokeCommandWithStreaming``` API on the command invoker side and that returned object isn't as easy to use for single responses
- Treat streaming RPC as a separate protocol from RPC, give it its own client like ```CommandInvoker``` and ```TelemetrySender```
- There is a lot of code re-use between RPC and streaming RPC so this would make implementation very inconvenient
- This would introduce another protocol to version. Future RPC changes would likely be relevant to RPC streaming anyways, so this feels redundant.

## Error cases

- RPC executor dies before sending the final stream response.
- Command invoker throws time out exception waiting on the next response
- RPC executor receives command request with "__streamResp", but that executor doesn't understand streaming requests because it uses an older protocol version
- Command executor responds with "not supported protocol" error code since the request carried protocol version 2.0
- RPC executor receives command request with "__streamResp", and the executor understands that it is a streaming request (protocol versions align) but that particular command doesn't support streaming
- RPC executor treats it like a non-streaming command, but adds the "__isLastResp" flag to the one and only response
- RPC invoker tries to invoke a non-streaming command that the executor requires streaming on
- Atypical case since codegen will prevent this
- But, for the sake of non-codegen users, executor returns "invalid header" error pointing to the "__streamResp" header
- Invoker understands that, if the "invalid header" value is "__streamResp", it attempted a invoke a streaming method

## Open Questions

- When to ack the streaming request?
- In normal RPC, request is Ack'd only after the method finishes invocation. Waiting until a streamed RPC finishes could clog up Acks since streaming requests can take a while.
- Ack after first response is generated?
6 changes: 6 additions & 0 deletions doc/reference/error-model.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@
ExecutionException,
MqttError,
UnsupportedVersion,
Canceled,
}
```

Expand Down Expand Up @@ -263,6 +264,7 @@
EXECUTION_EXCEPTION,
MQTT_ERROR,
UNSUPPORTED_VERSION,
CANCELED,
}
```

Expand Down Expand Up @@ -327,6 +329,7 @@
ExecutionException,
MqttError,
UnsupportedVersion,
Canceled,
}
```

Expand Down Expand Up @@ -400,6 +403,7 @@
ExecutionError
MqttError
UnsupportedVersion
Canceled
}
```

Expand All @@ -424,7 +428,7 @@
}
```

The `AkriMqttError` struct must provide an `Error()` method, but the detais are omitted from this document:

Check warning on line 431 in doc/reference/error-model.md

View workflow job for this annotation

GitHub Actions / CI-spelling

Unknown word (detais) Suggestions: (detail, detain, details, detains, deaths)

```go
func (err AkriMqttError) Error() string {
Expand Down Expand Up @@ -455,6 +459,7 @@
EXECUTION_EXCEPTION = 10
MQTT_ERROR = 11
UNSUPPORTED_VERSION = 12
CANCELED = 13
```

The Akri.Mqtt error type is defined as follows:
Expand Down Expand Up @@ -570,6 +575,7 @@
| 400 | Bad Request | false | no | | invalid payload |
| 408 | Request Timeout | false | yes | yes | timeout |
| 415 | Unsupported Media Type | false | yes | yes | invalid header |
| 452 | Request Cancelled | false | no | no | canceled |
| 500 | Internal Server Error | false | no | | unknown error |
| 500 | Internal Server Error | false | yes | | internal logic error |
| 500 | Internal Server Error | true | maybe | | execution error |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@
// TODO remove this once akri service is code gen'd to expect srcId instead of invId
internal const string CommandInvokerId = ReservedPrefix + "invId";

/// <summary>
/// Inidicates that an RPC request expects the executor to stream one or many responses.

Check warning on line 83 in dotnet/src/Azure.Iot.Operations.Protocol/AkriSystemProperties.cs

View workflow job for this annotation

GitHub Actions / CI-spelling

Unknown word (Inidicates) Suggestions: (indicates, indicate, initiates, indices, indicts)
/// </summary>
internal const string IsStreamingCommand = ReservedPrefix + "streamResp";

/// <summary>
/// Inidicates that an RPC request should be cancelled if it is still executing

Check warning on line 88 in dotnet/src/Azure.Iot.Operations.Protocol/AkriSystemProperties.cs

View workflow job for this annotation

GitHub Actions / CI-spelling

Unknown word (Inidicates) Suggestions: (indicates, indicate, initiates, indices, indicts)
/// </summary>
internal const string CancelCommand = ReservedPrefix + "stopRpc";

internal static bool IsReservedUserProperty(string name)
{
return name.Equals(Timestamp, StringComparison.Ordinal)
Expand All @@ -91,7 +101,8 @@
|| name.Equals(SupportedMajorProtocolVersions, StringComparison.Ordinal)
|| name.Equals(RequestedProtocolVersion, StringComparison.Ordinal)
|| name.Equals(SourceId, StringComparison.Ordinal)
|| name.Equals(CommandInvokerId, StringComparison.Ordinal);
|| name.Equals(CommandInvokerId, StringComparison.Ordinal)
|| name.Equals(IsStreamingCommand, StringComparison.Ordinal);
}
}
}
17 changes: 16 additions & 1 deletion dotnet/src/Azure.Iot.Operations.Protocol/RPC/CommandExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,22 @@
/// </remarks>
public TimeSpan ExecutionTimeout { get; set; }

public required Func<ExtendedRequest<TReq>, CancellationToken, Task<ExtendedResponse<TResp>>> OnCommandReceived { get; set; }
/// <summary>
/// The callback to execute each time a non-streaming command request is received.
/// </summary>
/// <remarks>
/// This callback may be null if this command executor only supports commands that stream responses.
/// </remarks>
public Func<ExtendedRequest<TReq>, CancellationToken, Task<ExtendedResponse<TResp>>>? OnCommandReceived { get; set; }

/// <summary>
/// The callback to execute each time a command request that expects streamed responses is received.
/// </summary>
/// <remarks>
/// The callback provides the request itself and requires the user to return one to many responses. This callback may be null
/// if this command executors doesn't have any streaming commands.
/// </remarks>
public Func<ExtendedRequest<TReq>, CancellationToken, Task<IAsyncEnumerable<StreamingExtendedResponse<TResp>>>>? OnStreamingCommandReceived { get; set; }

public string? ExecutorId { get; init; }

Expand Down Expand Up @@ -229,7 +244,7 @@

try
{
ExtendedResponse<TResp> extended = await Task.Run(() => OnCommandReceived(extendedRequest, commandCts.Token)).WaitAsync(ExecutionTimeout).ConfigureAwait(false);

Check failure on line 247 in dotnet/src/Azure.Iot.Operations.Protocol/RPC/CommandExecutor.cs

View workflow job for this annotation

GitHub Actions / CI-dotnet

Dereference of a possibly null reference.

Check failure on line 247 in dotnet/src/Azure.Iot.Operations.Protocol/RPC/CommandExecutor.cs

View workflow job for this annotation

GitHub Actions / CI-dotnet

Dereference of a possibly null reference.

Check failure on line 247 in dotnet/src/Azure.Iot.Operations.Protocol/RPC/CommandExecutor.cs

View workflow job for this annotation

GitHub Actions / CI-dotnet

Dereference of a possibly null reference.

Check failure on line 247 in dotnet/src/Azure.Iot.Operations.Protocol/RPC/CommandExecutor.cs

View workflow job for this annotation

GitHub Actions / CI-dotnet

Dereference of a possibly null reference.

Check failure on line 247 in dotnet/src/Azure.Iot.Operations.Protocol/RPC/CommandExecutor.cs

View workflow job for this annotation

GitHub Actions / CI-cross-language (rust-server)

Dereference of a possibly null reference.

Check failure on line 247 in dotnet/src/Azure.Iot.Operations.Protocol/RPC/CommandExecutor.cs

View workflow job for this annotation

GitHub Actions / CI-cross-language (rust-server)

Dereference of a possibly null reference.

Check failure on line 247 in dotnet/src/Azure.Iot.Operations.Protocol/RPC/CommandExecutor.cs

View workflow job for this annotation

GitHub Actions / CI-cross-language (rust-server)

Dereference of a possibly null reference.

Check failure on line 247 in dotnet/src/Azure.Iot.Operations.Protocol/RPC/CommandExecutor.cs

View workflow job for this annotation

GitHub Actions / CI-cross-language (rust-server)

Dereference of a possibly null reference.

Check failure on line 247 in dotnet/src/Azure.Iot.Operations.Protocol/RPC/CommandExecutor.cs

View workflow job for this annotation

GitHub Actions / CI-cross-language (rust-server)

Dereference of a possibly null reference.

Check failure on line 247 in dotnet/src/Azure.Iot.Operations.Protocol/RPC/CommandExecutor.cs

View workflow job for this annotation

GitHub Actions / CI-cross-language (go-server)

Dereference of a possibly null reference.

Check failure on line 247 in dotnet/src/Azure.Iot.Operations.Protocol/RPC/CommandExecutor.cs

View workflow job for this annotation

GitHub Actions / CI-cross-language (go-server)

Dereference of a possibly null reference.

Check failure on line 247 in dotnet/src/Azure.Iot.Operations.Protocol/RPC/CommandExecutor.cs

View workflow job for this annotation

GitHub Actions / CI-cross-language (go-server)

Dereference of a possibly null reference.

Check failure on line 247 in dotnet/src/Azure.Iot.Operations.Protocol/RPC/CommandExecutor.cs

View workflow job for this annotation

GitHub Actions / CI-cross-language (go-server)

Dereference of a possibly null reference.

Check failure on line 247 in dotnet/src/Azure.Iot.Operations.Protocol/RPC/CommandExecutor.cs

View workflow job for this annotation

GitHub Actions / CI-cross-language (go-server)

Dereference of a possibly null reference.

Check failure on line 247 in dotnet/src/Azure.Iot.Operations.Protocol/RPC/CommandExecutor.cs

View workflow job for this annotation

GitHub Actions / CI-cross-language (dotnet-server)

Dereference of a possibly null reference.

var serializedPayloadContext = _serializer.ToBytes(extended.Response);

Expand Down
17 changes: 17 additions & 0 deletions dotnet/src/Azure.Iot.Operations.Protocol/RPC/CommandInvoker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,23 @@ public async Task<ExtendedResponse<TResp>> InvokeCommandAsync(TReq request, Comm
}
}

/// <summary>
/// Invoke a command and receive a stream of responses.
/// </summary>
/// <param name="request">The payload of command request.</param>
/// <param name="metadata">The metadata of the command request.</param>
/// <param name="additionalTopicTokenMap">
/// The topic token replacement map to use in addition to <see cref="TopicTokenMap"/>. If this map
/// contains any keys that <see cref="TopicTokenMap"/> also has, then values specified in this map will take precedence.
/// </param>
/// <param name="commandTimeout">How long to wait for a command response. Note that each command executor also has a configurable timeout value that may be shorter than this value. <see cref="CommandExecutor{TReq, TResp}.ExecutionTimeout"/></param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The asynchronously stream of responses and their respective metadata.</returns>
public IAsyncEnumerable<StreamingExtendedResponse<TResp>> InvokeStreamingCommandAsync(TReq request, CommandRequestMetadata? metadata = null, Dictionary<string, string>? additionalTopicTokenMap = null, TimeSpan? commandTimeout = default, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}

/// <summary>
/// Dispose this object and the underlying mqtt client.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ public enum CommandStatusCode : int
/// <summary>Unsupported Media Type. The content type specified in the request is not supported by this implementation.</summary>
UnsupportedMediaType = 415,

/// <summary> The RPC was canceled prior to it finishing. </summary>
Canceled = 452,

/// <summary>Internal Server. Unknown error, internal logic error, or command processor error other than <see cref="InvocationException"/>.</summary>
InternalServerError = 500,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@

namespace Azure.Iot.Operations.Protocol.RPC
{
public struct ExtendedResponse<TResp>
public class ExtendedResponse<TResp>
where TResp : class
{
// These two user properties are used to communicate application level errors in an RPC response message. Code is mandatory, but data is optional.
public const string ApplicationErrorCodeUserDataKey = "AppErrCode";
public const string ApplicationErrorPayloadUserDataKey = "AppErrPayload";

public TResp Response { get; set; }

Check failure on line 17 in dotnet/src/Azure.Iot.Operations.Protocol/RPC/ExtendedResponse.cs

View workflow job for this annotation

GitHub Actions / CI-dotnet

Non-nullable property 'Response' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the property as nullable.

Check failure on line 17 in dotnet/src/Azure.Iot.Operations.Protocol/RPC/ExtendedResponse.cs

View workflow job for this annotation

GitHub Actions / CI-dotnet

Non-nullable property 'Response' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the property as nullable.

Check failure on line 17 in dotnet/src/Azure.Iot.Operations.Protocol/RPC/ExtendedResponse.cs

View workflow job for this annotation

GitHub Actions / CI-dotnet

Non-nullable property 'Response' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the property as nullable.

Check failure on line 17 in dotnet/src/Azure.Iot.Operations.Protocol/RPC/ExtendedResponse.cs

View workflow job for this annotation

GitHub Actions / CI-dotnet

Non-nullable property 'Response' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the property as nullable.

Check failure on line 17 in dotnet/src/Azure.Iot.Operations.Protocol/RPC/ExtendedResponse.cs

View workflow job for this annotation

GitHub Actions / CI-cross-language (rust-server)

Non-nullable property 'Response' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the property as nullable.

Check failure on line 17 in dotnet/src/Azure.Iot.Operations.Protocol/RPC/ExtendedResponse.cs

View workflow job for this annotation

GitHub Actions / CI-cross-language (rust-server)

Non-nullable property 'Response' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the property as nullable.

Check failure on line 17 in dotnet/src/Azure.Iot.Operations.Protocol/RPC/ExtendedResponse.cs

View workflow job for this annotation

GitHub Actions / CI-cross-language (rust-server)

Non-nullable property 'Response' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the property as nullable.

Check failure on line 17 in dotnet/src/Azure.Iot.Operations.Protocol/RPC/ExtendedResponse.cs

View workflow job for this annotation

GitHub Actions / CI-cross-language (rust-server)

Non-nullable property 'Response' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the property as nullable.

Check failure on line 17 in dotnet/src/Azure.Iot.Operations.Protocol/RPC/ExtendedResponse.cs

View workflow job for this annotation

GitHub Actions / CI-cross-language (rust-server)

Non-nullable property 'Response' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the property as nullable.

Check failure on line 17 in dotnet/src/Azure.Iot.Operations.Protocol/RPC/ExtendedResponse.cs

View workflow job for this annotation

GitHub Actions / CI-cross-language (go-server)

Non-nullable property 'Response' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the property as nullable.

Check failure on line 17 in dotnet/src/Azure.Iot.Operations.Protocol/RPC/ExtendedResponse.cs

View workflow job for this annotation

GitHub Actions / CI-cross-language (go-server)

Non-nullable property 'Response' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the property as nullable.

Check failure on line 17 in dotnet/src/Azure.Iot.Operations.Protocol/RPC/ExtendedResponse.cs

View workflow job for this annotation

GitHub Actions / CI-cross-language (go-server)

Non-nullable property 'Response' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the property as nullable.

Check failure on line 17 in dotnet/src/Azure.Iot.Operations.Protocol/RPC/ExtendedResponse.cs

View workflow job for this annotation

GitHub Actions / CI-cross-language (go-server)

Non-nullable property 'Response' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the property as nullable.

Check failure on line 17 in dotnet/src/Azure.Iot.Operations.Protocol/RPC/ExtendedResponse.cs

View workflow job for this annotation

GitHub Actions / CI-cross-language (go-server)

Non-nullable property 'Response' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the property as nullable.

Check failure on line 17 in dotnet/src/Azure.Iot.Operations.Protocol/RPC/ExtendedResponse.cs

View workflow job for this annotation

GitHub Actions / CI-cross-language (dotnet-server)

Non-nullable property 'Response' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the property as nullable.

public CommandResponseMetadata? ResponseMetadata { get; set; }

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Text.Json;
using System.Text.Json.Nodes;

namespace Azure.Iot.Operations.Protocol.RPC
{
public class StreamingExtendedResponse<TResp> : ExtendedResponse<TResp>
where TResp : class
{
/// <summary>
/// An optional Id for this response (relative to the other responses in this response stream)
/// </summary>
/// <remarks>
/// Users are allowed to provide Ids for each response, only for specific responses, or for none of the responses.
/// </remarks>
public string? StreamingResponseId { get; set; }

/// <summary>
/// The index of this response relative to the other responses in this response stream. Starts at 0.
/// </summary>
public int StreamingResponseIndex { get; set; }

/// <summary>
/// If true, this response is the final response in this response stream.
/// </summary>
public bool IsLastResponse { get; set; }
}
}
Loading