-
Notifications
You must be signed in to change notification settings - Fork 18
[ADR] RPC streaming design doc #952
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
timtay-microsoft
wants to merge
80
commits into
main
Choose a base branch
from
timtay/streaming
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 22 commits
Commits
Show all changes
80 commits
Select commit
Hold shift + click to select a range
eb975a4
asdf
timtay-microsoft 4d27162
noodling so far
timtay-microsoft 0163c76
sadf
timtay-microsoft 73b12d7
maybe
timtay-microsoft 4080104
doc
timtay-microsoft 4ac4003
more
timtay-microsoft af7c829
asdf
timtay-microsoft 215eb64
asdf
timtay-microsoft 615ba26
thoughts
timtay-microsoft ad17af3
more
timtay-microsoft a2c2f6e
notes
timtay-microsoft 6bf1bcc
caching
timtay-microsoft 180d85e
more thoughts
timtay-microsoft 599a431
Merge branch 'main' into timtay/streaming
timtay-microsoft a1d2e21
no new error code, some gRPC notes
timtay-microsoft e680263
save impl for later
timtay-microsoft 629dc2f
ordering q
timtay-microsoft f2a2c60
wording
timtay-microsoft 710a425
links
timtay-microsoft 0006c02
backwards
timtay-microsoft b457888
first thoughts on cancellation, re-order doc a bit
timtay-microsoft f16ad43
more notes, more re-ordering
timtay-microsoft e7c98d5
cleanup
timtay-microsoft 1c1964b
Only allow cancelling streaming commands
timtay-microsoft edecd39
Merge branch 'main' into timtay/streaming
timtay-microsoft 61a5212
typo
timtay-microsoft 79af301
code changes in another branch
timtay-microsoft c7fb69e
more
timtay-microsoft aa2dc81
more
timtay-microsoft 0784a10
Update ExtendedResponse.cs
timtay-microsoft ff9efc9
Update AkriSystemProperties.cs
timtay-microsoft 0ff4dbe
Update 0025-rpc-streaming.md
timtay-microsoft 4aef0a4
Remove responseId concept. User will do this with their own user prop…
timtay-microsoft 9dbb174
Incorporate a lot of feedback
timtay-microsoft 49b57c7
cleanup
timtay-microsoft ac799bc
canceled error code instead of header
timtay-microsoft 04f1ce9
timeout thoughts
timtay-microsoft 14ca259
Merge branch 'main' into timtay/streaming
timtay-microsoft 353a2fd
asdf
timtay-microsoft 439c5f8
Merge branch 'timtay/streaming' of https://github.com/Azure/iot-opera…
timtay-microsoft e462a3f
reword
timtay-microsoft 4c4cb0b
API fix
timtay-microsoft 5050ada
not needed
timtay-microsoft 9caa59d
non-req
timtay-microsoft 58d8fc1
fix type
timtay-microsoft fead2ad
timeout musings
timtay-microsoft 7c83f06
wording
timtay-microsoft fb9de69
Update 0025-rpc-streaming.md
timtay-microsoft 29e1811
Update 0025-rpc-streaming.md
timtay-microsoft c15790c
Update 0025-rpc-streaming.md
timtay-microsoft 98f8763
Update 0025-rpc-streaming.md
timtay-microsoft bf8c252
Address feedback
timtay-microsoft 8e6a6d8
note
timtay-microsoft 59afded
more
timtay-microsoft 69d7781
More, complete streams at any time
timtay-microsoft 4a5b22d
ExecutorId is mandatory
timtay-microsoft 91f55f3
fix .NET APIs
timtay-microsoft 0c7fd4a
disconnection considerations
timtay-microsoft 66556a5
De-duping?
timtay-microsoft 6b15e19
Revert "De-duping?"
timtay-microsoft bddf3f9
executorId in tests
timtay-microsoft f5dff71
Merge branch 'main' into timtay/streaming
timtay-microsoft c39c41a
fix
timtay-microsoft 6c3f1ec
Merge branch 'timtay/streaming' of https://github.com/Azure/iot-opera…
timtay-microsoft abe8ae8
No more executor Id, just use $partition
timtay-microsoft e6cef75
de-dup + qos 1 clarification
timtay-microsoft f4929a2
fixup
timtay-microsoft 8322019
Optionally delay acknowledgements
timtay-microsoft 1c6c3ad
cancellation user properties so far
timtay-microsoft e8d75e4
more cancellation user properties support
timtay-microsoft 645f306
message level timeout is back
timtay-microsoft 3e0f863
fixup
timtay-microsoft 886906c
fixup
timtay-microsoft 040940d
timeout vs cancellation
timtay-microsoft c7cba20
more
timtay-microsoft 8f72c15
isLast
timtay-microsoft 5c5d4b3
unused
timtay-microsoft 1e59bd9
expiry interval note
timtay-microsoft 266a809
message expiry purpose
timtay-microsoft 10f55dc
broker behavior
timtay-microsoft File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,193 @@ | ||
| # ADR 25: RPC Streaming | ||
|
|
||
| ## Context | ||
|
|
||
| Users have expressed a desire to allow more than one response per RPC invocation. This would enable scenarios like: | ||
|
|
||
| - Execute long-running commands while still being responsive | ||
| - | ||
|
|
||
| ## Requirements | ||
| - Allow for an arbitrary number of command responses for a single command invocation | ||
| - The total number of responses does not need to be known before the first response is sent | ||
| - When exposed to the user, each response includes an index of where it was in the stream | ||
| - Allow for multiple separate commands to be streamed simultaneously | ||
| - Even the same command can be executed in parallel to itself? | ||
| - Allow for invoker to cancel streamed responses mid-stream | ||
|
|
||
| ## Non-requirements | ||
| - Different payload shapes per command response | ||
| - "Client Streaming" RPC (multiples requests -> One command response) | ||
| - Bi-directional streaming RPC (multiples requests -> multiple responses) | ||
| - Allow for executor to cancel streamed responses mid-stream | ||
|
|
||
| ## State of the art | ||
|
|
||
| gRPC supports these patterns for RPC: | ||
| - [Unary RPC](https://grpc.io/docs/what-is-grpc/core-concepts/#unary-rpc) (1 request message, 1 response message) | ||
| - [Server streaming RPC](https://grpc.io/docs/what-is-grpc/core-concepts/#server-streaming-rpc) (1 request message, many response messages) | ||
| - [Client streaming RPC](https://grpc.io/docs/what-is-grpc/core-concepts/#server-streaming-rpc) (many request messages, one response message) | ||
| - [Bi-directional streaming RPC](https://grpc.io/docs/what-is-grpc/core-concepts/#bidirectional-streaming-rpc) (many request messages, many response messages) | ||
|
|
||
| gRPC relies on the HTTP streaming protocol to delineate each message in the stream and to indicate the end of the stream. | ||
|
|
||
| [gRPC also allows for either the client or server to cancel an RPC at any time](https://grpc.io/docs/what-is-grpc/core-concepts/#cancelling-an-rpc) | ||
|
|
||
| ## Decision | ||
|
|
||
| ### API design, .NET | ||
|
|
||
| Our command invoker base class will now include a new method ```InvokeCommandWithStreaming``` to go with the existing ```InvokeCommand``` method. | ||
|
|
||
| This new method will take the same parameters as ```InvokeCommand``` but will return an asynchronously iterable list (or callback depending on language?) of command response objects. | ||
|
|
||
| ```csharp | ||
| public abstract class CommandInvoker<TReq, TResp> | ||
| where TReq : class | ||
| where TResp : class | ||
| { | ||
| // Single response | ||
| public Task<ExtendedResponse<TResp>> InvokeCommandAsync(TReq request, ...) {...} | ||
|
|
||
| // Many responses, responses may be staggered | ||
| public IAsyncEnumerable<StreamingExtendedResponse<TResp>> InvokeStreamingCommandAsync(TReq request, ...) {...} | ||
| } | ||
| ``` | ||
|
|
||
| Additionally, this new method will return an extended version of the ```ExtendedResponse``` wrapper that will include the streaming-specific information about each response: | ||
|
|
||
| ```csharp | ||
| public class StreamingExtendedResponse<TResp> : ExtendedResponse<TResp> | ||
| where TResp : class | ||
| { | ||
| /// <summary> | ||
| /// An optional Id for this response (relative to the other responses in this response stream) | ||
| /// </summary> | ||
| /// <remarks> | ||
| /// Users are allowed to provide Ids for each response, only for specific responses, or for none of the responses. | ||
| /// </remarks> | ||
| public string? StreamingResponseId { get; set; } | ||
|
|
||
| /// <summary> | ||
| /// The index of this response relative to the other responses in this response stream. Starts at 0. | ||
| /// </summary> | ||
| public int StreamingResponseIndex { get; set; } | ||
|
|
||
| /// <summary> | ||
| /// If true, this response is the final response in this response stream. | ||
| /// </summary> | ||
| public bool IsLastResponse { get; set; } | ||
| } | ||
| ``` | ||
|
|
||
| On the executor side, we will define a separate callback that executes whenever a streaming command is invoked. Instead of returning the single response, this callback will return the asynchronously iterable list of responses. Importantly, this iterable may still be added to by the user after this callback has finished. | ||
|
|
||
| ```csharp | ||
| public abstract class CommandExecutor<TReq, TResp> : IAsyncDisposable | ||
| where TReq : class | ||
| where TResp : class | ||
| { | ||
| /// <summary> | ||
| /// The callback to execute each time a non-streaming command request is received. | ||
| /// </summary> | ||
| /// <remarks> | ||
| /// This callback may be null if this command executor only supports commands that stream responses. | ||
| /// </remarks> | ||
| public Func<ExtendedRequest<TReq>, CancellationToken, Task<ExtendedResponse<TResp>>>? OnCommandReceived { get; set; } | ||
|
|
||
| /// <summary> | ||
| /// The callback to execute each time a command request that expects streamed responses is received. | ||
| /// </summary> | ||
| /// <remarks> | ||
| /// The callback provides the request itself and requires the user to return one to many responses. This callback may be null | ||
| /// if this command executors doesn't have any streaming commands. | ||
| /// </remarks> | ||
| public Func<ExtendedRequest<TReq>, CancellationToken, Task<IAsyncEnumerable<StreamingExtendedResponse<TResp>>>>? OnStreamingCommandReceived { get; set; } | ||
| } | ||
|
|
||
| ``` | ||
|
|
||
| With this design, commands that use streaming are defined at codegen time. Codegen layer changes will be defined in a separate ADR, though. | ||
timtay-microsoft marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| ### MQTT layer protocol | ||
|
|
||
| #### Command invoker side | ||
|
|
||
| - The command invoker's request message will include an MQTT user property with name "__streamResp" and value "true". | ||
| - Executor needs to know if it can stream the response, and this is the flag that tells it that | ||
| - The command invoker will listen for command responses with the correlation data that matches the invoked method's correlation data until it receives a response with the "__isLastResp" flag set to "true" | ||
| - The command invoker will acknowledge all messages it receives that match the correlation data of the command request | ||
|
|
||
| #### Command executor side | ||
|
|
||
| - The command executor receives a command with "__streamResp" flag set to "true" | ||
| - The command is given to the application layer in a way that allows the application to return at least one response | ||
| - All command responses will use the same MQTT message correlation data as the request provided so that the invoker can map responses to the appropriate command invocation. | ||
| - Each streamed response must contain an MQTT user property with name "__streamIndex" and value equal to the index of this response relative to the other responses (0 for the first response, 1 for the second response, etc.) | ||
| - Each streamed response may contain an MQTT user property with name "__streamRespId" and value equal to that response's streaming response Id. This is an optional and user-provided value. | ||
| - The final command response will include an MQTT user property "__isLastResp" with value "true" to signal that it is the final response in the stream. | ||
| - A streaming command is allowed to have a single response. It must include the "__isLastResp" flag in that first/final response | ||
| - Cache is only updated once the stream has completed and it is updated to include all of the responses (in order) for the command so they can be re-played if the streaming command is invoked again by the same client | ||
|
|
||
| - The command executor receives a command **without** "__streamResp" flag set to "true" | ||
| - The command must be responded to without streaming | ||
|
|
||
| ### Cancellation support | ||
|
|
||
| To avoid scenarios where long-running streaming responses are no longer wanted, we will want to support cancelling RPC calls. This feature is moreso applicable for RPC streaming, but the design allows for it to work for non-streaming RPC as well. | ||
|
|
||
| #### Invoker side | ||
|
|
||
| - The command invoker may cancel a normal or streaming RPC call at an arbitrary time by sending an MQTT message with: | ||
timtay-microsoft marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| - The same MQTT topic as the invoked method | ||
| - The same correlation data as the invoked method | ||
| - The user property "__cancelRpc" set to "true". | ||
| - No payload | ||
| - TODO what would API look like? gRPC uses cancellation token | ||
| - The command invoker should still listen on the response topic for a response from the executor which may still contain a successful response (if cancellation was received after the command completed successfully) | ||
|
|
||
| #### Executor side | ||
|
|
||
| Regardless of if an RPC is streaming or not, upon receiving an MQTT message with the "__cancelRpc" flag set to "true", the command executor should: | ||
| - Notify the application layer that that RPC has been canceled if it is still running | ||
| - Send an MQTT message to the appropriate response topic with error code "canceled" to notify the invoker that the RPC has stopped and no further responses will be sent. | ||
|
|
||
| If the executor receives a cancellation request for a command that has already completed, then the cancellation request should be ignored. | ||
|
|
||
| ### Protocol version update | ||
|
|
||
| This RPC streaming feature is not backwards compatible (new invoker can't initiate what it believes is a streaming RPC call on an old executor), so it requires a bump in our RPC protocol version from "1.0" to "2.0". | ||
|
|
||
| TODO: Start defining a doc in our repo that defines what features are present in what protocol version. | ||
|
|
||
| ## Example with code gen | ||
|
|
||
| TODO which existing client works well for long-running commands? Mem mon ("Report usage for 10 seconds at 1 second intervals")? | ||
|
|
||
| ## Alternative designs considered | ||
|
|
||
| - Allow the command executor to decide at run time of each command if it will stream responses independent of the command invoker's request | ||
| - This would force users to always call the ```InvokeCommandWithStreaming``` API on the command invoker side and that returned object isn't as easy to use for single responses | ||
| - Treat streaming RPC as a separate protocol from RPC, give it its own client like ```CommandInvoker``` and ```TelemetrySender``` | ||
| - There is a lot of code re-use between RPC and streaming RPC so this would make implementation very inconvenient | ||
| - This would introduce another protocol to version. Future RPC changes would likely be relevant to RPC streaming anyways, so this feels redundant. | ||
|
|
||
| ## Error cases | ||
|
|
||
| - RPC executor dies before sending the final stream response. | ||
| - Command invoker throws time out exception waiting on the next response | ||
| - RPC executor receives command request with "__streamResp", but that executor doesn't understand streaming requests because it uses an older protocol version | ||
| - Command executor responds with "not supported protocol" error code | ||
| - RPC executor receives command request with "__streamResp", and the executor understands that it is a streaming request (protocol versions align) but that particular command doesn't support streaming | ||
| - RPC executor treats it like a non-streaming command, but adds the "__isLastResp" flag to the one and only response | ||
| - RPC invoker tries to invoke a non-streaming command that the executor requires streaming on | ||
| - Atypical case since codegen will prevent this | ||
| - But, for the sake of non-codegen users, executor returns "invalid header" error pointing to the "__streamResp" header | ||
| - Invoker understands that, if the "invalid header" value is "__streamResp", it attempted a invoke a streaming method | ||
| - timeout per response vs overall? Both? | ||
|
|
||
timtay-microsoft marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ## Open Questions | ||
|
|
||
| - When to ack the streaming request? | ||
| - In normal RPC, request is Ack'd only after the method finishes invocation. Waiting until a streamed RPC finishes could clog up Acks since streaming requests can take a while. | ||
| - Ack after first response is generated? | ||
timtay-microsoft marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
31 changes: 31 additions & 0 deletions
31
dotnet/src/Azure.Iot.Operations.Protocol/RPC/StreamingExtendedResponse.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,31 @@ | ||
| // Copyright (c) Microsoft Corporation. | ||
| // Licensed under the MIT License. | ||
|
|
||
| using System; | ||
| using System.Text.Json; | ||
| using System.Text.Json.Nodes; | ||
|
|
||
| namespace Azure.Iot.Operations.Protocol.RPC | ||
| { | ||
| public class StreamingExtendedResponse<TResp> : ExtendedResponse<TResp> | ||
| where TResp : class | ||
| { | ||
| /// <summary> | ||
| /// An optional Id for this response (relative to the other responses in this response stream) | ||
| /// </summary> | ||
| /// <remarks> | ||
| /// Users are allowed to provide Ids for each response, only for specific responses, or for none of the responses. | ||
| /// </remarks> | ||
| public string? StreamingResponseId { get; set; } | ||
|
|
||
| /// <summary> | ||
| /// The index of this response relative to the other responses in this response stream. Starts at 0. | ||
| /// </summary> | ||
| public int StreamingResponseIndex { get; set; } | ||
|
|
||
| /// <summary> | ||
| /// If true, this response is the final response in this response stream. | ||
| /// </summary> | ||
| public bool IsLastResponse { get; set; } | ||
| } | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.