Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
eb975a4
asdf
timtay-microsoft Jun 26, 2025
4d27162
noodling so far
timtay-microsoft Jun 30, 2025
0163c76
sadf
timtay-microsoft Jun 30, 2025
73b12d7
maybe
timtay-microsoft Jun 30, 2025
4080104
doc
timtay-microsoft Jun 30, 2025
4ac4003
more
timtay-microsoft Jul 7, 2025
af7c829
asdf
timtay-microsoft Jul 7, 2025
215eb64
asdf
timtay-microsoft Jul 17, 2025
615ba26
thoughts
timtay-microsoft Jul 18, 2025
ad17af3
more
timtay-microsoft Jul 18, 2025
a2c2f6e
notes
timtay-microsoft Jul 22, 2025
6bf1bcc
caching
timtay-microsoft Jul 22, 2025
180d85e
more thoughts
timtay-microsoft Jul 24, 2025
599a431
Merge branch 'main' into timtay/streaming
timtay-microsoft Jul 25, 2025
a1d2e21
no new error code, some gRPC notes
timtay-microsoft Jul 25, 2025
e680263
save impl for later
timtay-microsoft Jul 25, 2025
629dc2f
ordering q
timtay-microsoft Jul 25, 2025
f2a2c60
wording
timtay-microsoft Jul 25, 2025
710a425
links
timtay-microsoft Jul 25, 2025
0006c02
backwards
timtay-microsoft Jul 25, 2025
b457888
first thoughts on cancellation, re-order doc a bit
timtay-microsoft Jul 25, 2025
f16ad43
more notes, more re-ordering
timtay-microsoft Jul 25, 2025
e7c98d5
cleanup
timtay-microsoft Jul 28, 2025
1c1964b
Only allow cancelling streaming commands
timtay-microsoft Jul 28, 2025
edecd39
Merge branch 'main' into timtay/streaming
timtay-microsoft Jul 28, 2025
61a5212
typo
timtay-microsoft Jul 28, 2025
79af301
code changes in another branch
timtay-microsoft Jul 28, 2025
c7fb69e
more
timtay-microsoft Jul 28, 2025
aa2dc81
more
timtay-microsoft Jul 28, 2025
0784a10
Update ExtendedResponse.cs
timtay-microsoft Jul 28, 2025
ff9efc9
Update AkriSystemProperties.cs
timtay-microsoft Jul 28, 2025
0ff4dbe
Update 0025-rpc-streaming.md
timtay-microsoft Jul 30, 2025
4aef0a4
Remove responseId concept. User will do this with their own user prop…
timtay-microsoft Aug 1, 2025
9dbb174
Incorporate a lot of feedback
timtay-microsoft Aug 1, 2025
49b57c7
cleanup
timtay-microsoft Aug 1, 2025
ac799bc
canceled error code instead of header
timtay-microsoft Aug 1, 2025
04f1ce9
timeout thoughts
timtay-microsoft Aug 1, 2025
14ca259
Merge branch 'main' into timtay/streaming
timtay-microsoft Aug 1, 2025
353a2fd
asdf
timtay-microsoft Aug 4, 2025
439c5f8
Merge branch 'timtay/streaming' of https://github.com/Azure/iot-opera…
timtay-microsoft Aug 4, 2025
e462a3f
reword
timtay-microsoft Aug 4, 2025
4c4cb0b
API fix
timtay-microsoft Aug 4, 2025
5050ada
not needed
timtay-microsoft Aug 4, 2025
9caa59d
non-req
timtay-microsoft Aug 4, 2025
58d8fc1
fix type
timtay-microsoft Aug 5, 2025
fead2ad
timeout musings
timtay-microsoft Aug 6, 2025
7c83f06
wording
timtay-microsoft Aug 7, 2025
fb9de69
Update 0025-rpc-streaming.md
timtay-microsoft Aug 7, 2025
29e1811
Update 0025-rpc-streaming.md
timtay-microsoft Aug 7, 2025
c15790c
Update 0025-rpc-streaming.md
timtay-microsoft Aug 7, 2025
98f8763
Update 0025-rpc-streaming.md
timtay-microsoft Aug 7, 2025
bf8c252
Address feedback
timtay-microsoft Sep 3, 2025
8e6a6d8
note
timtay-microsoft Sep 3, 2025
59afded
more
timtay-microsoft Sep 4, 2025
69d7781
More, complete streams at any time
timtay-microsoft Sep 5, 2025
4a5b22d
ExecutorId is mandatory
timtay-microsoft Sep 5, 2025
91f55f3
fix .NET APIs
timtay-microsoft Sep 6, 2025
0c7fd4a
disconnection considerations
timtay-microsoft Sep 6, 2025
66556a5
De-duping?
timtay-microsoft Sep 6, 2025
6b15e19
Revert "De-duping?"
timtay-microsoft Sep 6, 2025
bddf3f9
executorId in tests
timtay-microsoft Sep 8, 2025
f5dff71
Merge branch 'main' into timtay/streaming
timtay-microsoft Sep 8, 2025
c39c41a
fix
timtay-microsoft Sep 8, 2025
6c3f1ec
Merge branch 'timtay/streaming' of https://github.com/Azure/iot-opera…
timtay-microsoft Sep 8, 2025
abe8ae8
No more executor Id, just use $partition
timtay-microsoft Sep 10, 2025
e6cef75
de-dup + qos 1 clarification
timtay-microsoft Sep 10, 2025
f4929a2
fixup
timtay-microsoft Sep 10, 2025
8322019
Optionally delay acknowledgements
timtay-microsoft Sep 10, 2025
1c6c3ad
cancellation user properties so far
timtay-microsoft Sep 10, 2025
e8d75e4
more cancellation user properties support
timtay-microsoft Sep 12, 2025
645f306
message level timeout is back
timtay-microsoft Sep 15, 2025
3e0f863
fixup
timtay-microsoft Sep 15, 2025
886906c
fixup
timtay-microsoft Sep 15, 2025
040940d
timeout vs cancellation
timtay-microsoft Sep 15, 2025
c7cba20
more
timtay-microsoft Sep 15, 2025
8f72c15
isLast
timtay-microsoft Sep 16, 2025
5c5d4b3
unused
timtay-microsoft Sep 16, 2025
1e59bd9
expiry interval note
timtay-microsoft Sep 16, 2025
266a809
message expiry purpose
timtay-microsoft Sep 17, 2025
10f55dc
broker behavior
timtay-microsoft Sep 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
400 changes: 400 additions & 0 deletions doc/dev/adr/0025-rpc-streaming.md

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions doc/reference/error-model.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@
ExecutionException,
MqttError,
UnsupportedVersion,
Canceled,
}
```

Expand Down Expand Up @@ -263,6 +264,7 @@
EXECUTION_EXCEPTION,
MQTT_ERROR,
UNSUPPORTED_VERSION,
CANCELED,
}
```

Expand Down Expand Up @@ -327,6 +329,7 @@
ExecutionException,
MqttError,
UnsupportedVersion,
Canceled,
}
```

Expand Down Expand Up @@ -399,6 +402,7 @@
ExecutionError
MqttError
UnsupportedVersion
Canceled
}
```

Expand All @@ -423,7 +427,7 @@
}
```

The `AkriMqttError` struct must provide an `Error()` method, but the detais are omitted from this document:

Check warning on line 430 in doc/reference/error-model.md

View workflow job for this annotation

GitHub Actions / CI-spelling

Unknown word (detais) Suggestions: (detail, detain, details, detains, deaths)

```go
func (err AkriMqttError) Error() string {
Expand Down Expand Up @@ -454,6 +458,7 @@
EXECUTION_EXCEPTION = 10
MQTT_ERROR = 11
UNSUPPORTED_VERSION = 12
CANCELED = 13
```

The Akri.Mqtt error type is defined as follows:
Expand Down Expand Up @@ -569,6 +574,7 @@
| 400 | Bad Request | false | no | | invalid payload |
| 408 | Request Timeout | false | yes | yes | timeout |
| 415 | Unsupported Media Type | false | yes | yes | invalid header |
| 452 | Request Cancelled | false | no | no | canceled |
| 500 | Internal Server Error | false | no | | unknown error |
| 500 | Internal Server Error | false | yes | | internal logic error |
| 500 | Internal Server Error | true | maybe | | execution error |
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) Microsoft Corporation.
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) Microsoft Corporation.
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Azure.Iot.Operations.Protocol.Streaming
{
/// <summary>
/// A stream of requests or responses that can be gracefully ended or canceled (with confirmation) at any time.
/// </summary>
/// <typeparam name="T">The type of the payload of the request/response stream</typeparam>
public interface IStreamContext<T>
where T : class
{
/// <summary>
/// The asynchronously readable entries in the stream
/// </summary>
IAsyncEnumerable<T> Entries { get; set; }

/// <summary>
/// Cancel this RPC streaming exchange.
/// </summary>
/// <param name="userData">
/// The optional user properties to include in this cancellation request. the receiving side of this cancellation request
/// will be given these properties alongside the notification that the streaming exchange has been canceled.
/// </param>
/// <param name="cancellationToken">Cancellation token to wait for confirmation from the receiving side that the cancellation succeeded.</param>
/// <remarks>
/// When called by the invoker, the executor will be notified about this cancellation and the executor will attempt
/// to stop any user-defined handling of the streaming request. When called by the executor, the invoker will be notified
/// and will cease sending requests and will throw an <see cref="AkriMqttException"/> with <see cref="AkriMqttException.Kind"/>
/// of <see cref="AkriMqttErrorKind.Cancellation"/>.
///
/// This method may be called by the streaming invoker or executor at any time. For instance, if the request stream
/// stalls unexpectedly, the executor can call this method to notify the invoker to stop sending requests.
/// Additionally, the invoker can call this method if its response stream has stalled unexpectedly.
/// </remarks>
Task CancelAsync(Dictionary<string, string>? userData = null, CancellationToken cancellationToken = default);

/// <summary>
/// The token that tracks if the streaming exchange has been cancelled by the other party and/or timed out.
/// </summary>
/// <remarks>
/// For instance, if the invoker side cancels the streaming exchange, the executor side callback's <see cref="IStreamContext{T}.CancellationToken"/>
/// will be triggered. If the executor side cancels the streaming exchange, the invoker side's returned <see cref="IStreamContext{T}.CancellationToken"/>
/// will be triggered.
///
/// To see if this was triggered because the stream exchange was cancelled, see <see cref="IsCanceled"/>. To see if it was triggered because
/// the stream exchange timed out, see <see cref="HasTimedOut"/>.
/// </remarks>
CancellationToken CancellationToken { get; }

/// <summary>
/// Get the user properties associated with a cancellation request started with <see cref="CancelAsync(Dictionary{string, string}?, CancellationToken)"/>.
/// </summary>
/// <returns>The user properties associated with a cancellation request</returns>
/// <remarks>
/// If the stream has not been cancelled, this will return null. If the stream has been cancelled, but no user properties were
/// provided in that cancellation request, this will return null.
/// </remarks>
Dictionary<string, string>? GetCancellationRequestUserProperties();

/// <summary>
/// True if this stream exchange has timed out. If a stream has timed out, <see cref="CancellationToken"/> will trigger as well.
/// </summary>
bool HasTimedOut { get; internal set; }

/// <summary>
/// True if this stream exchange has been canceled by the other party. If a stream has been cancelled, <see cref="CancellationToken"/> will trigger as well.
/// </summary>
bool IsCanceled { get; internal set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Threading.Tasks;

namespace Azure.Iot.Operations.Protocol.Streaming
{
/// <summary>
/// The payload and metadata associated with a single request in a request stream.
/// </summary>
/// <typeparam name="TReq">The type of the payload of the request</typeparam>
public class ReceivedStreamingExtendedRequest<TReq> : StreamingExtendedRequest<TReq>
where TReq : class
{
private readonly Task _acknowledgementFunc;

internal ReceivedStreamingExtendedRequest(TReq request, StreamMessageMetadata metadata, Task acknowledgementFunc)
: base(request, metadata)
{
_acknowledgementFunc = acknowledgementFunc;
}

public async Task AcknowledgeAsync()
{
await _acknowledgementFunc;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Threading.Tasks;

namespace Azure.Iot.Operations.Protocol.Streaming
{
/// <summary>
/// The payload and metadata associated with a single response in a response stream.
/// </summary>
/// <typeparam name="TResp">The type of the payload of the response</typeparam>
public class ReceivedStreamingExtendedResponse<TResp> : StreamingExtendedResponse<TResp>
where TResp : class
{
private readonly Task _acknowledgementFunc;

internal ReceivedStreamingExtendedResponse(TResp response, StreamMessageMetadata metadata, Task acknowledgementFunc)
: base(response, metadata)
{
_acknowledgementFunc = acknowledgementFunc;
}

public async Task AcknowledgeAsync()
{
await _acknowledgementFunc;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using Azure.Iot.Operations.Protocol.Models;

namespace Azure.Iot.Operations.Protocol.Streaming
{
/// <summary>
/// Metadata for a request stream as a whole.
/// </summary>
public class RequestStreamMetadata
{
/// <summary>
/// The correlationId for tracking this streaming request
/// </summary>
public Guid CorrelationId { get; set; }

/// <summary>
/// The Id of the client that invoked this streaming request
/// </summary>
public string? InvokerClientId { get; set; }

/// <summary>
/// The MQTT topic tokens used in this streaming request.
/// </summary>
public Dictionary<string, string> TopicTokens { get; } = new();

/// <summary>
/// The partition associated with this streaming request.
/// </summary>
public string? Partition { get; }

/// <summary>
/// The content type of all messages sent in this request stream.
/// </summary>
public string? ContentType { get; set; }

/// <summary>
/// The payload format indicator for all messages sent in this request stream.
/// </summary>
public MqttPayloadFormatIndicator PayloadFormatIndicator { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using Azure.Iot.Operations.Protocol.Models;

namespace Azure.Iot.Operations.Protocol.Streaming
{
/// <summary>
/// Metadata for a response stream as a whole.
/// </summary>
public class ResponseStreamMetadata
{
/// <summary>
/// The content type of all messages in this response stream
/// </summary>
public string? ContentType { get; set; }

/// <summary>
/// The payload format indicator for all messages in this response stream
/// </summary>
public MqttPayloadFormatIndicator PayloadFormatIndicator { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Collections.Generic;

namespace Azure.Iot.Operations.Protocol.Streaming
{
/// <summary>
/// Metadata for a specific message within a request stream
/// </summary>
public class StreamMessageMetadata
{
/// <summary>
/// The timestamp attached to this particular message
/// </summary>
public HybridLogicalClock? Timestamp { get; internal set; }

/// <summary>
/// User properties associated with this particular message
/// </summary>
public Dictionary<string, string> UserData { get; init; } = new();

/// <summary>
/// The index of this message within the stream as a whole
/// </summary>
/// <remarks>This value is automatically assigned when sending messages in a request/response stream and cannot be overriden.</remarks>

Check warning on line 26 in dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamMessageMetadata.cs

View workflow job for this annotation

GitHub Actions / CI-spelling

Misspelled word (overriden) Suggestions: (overridden*, override, overrides, overripen, overripe)
public int Index { get; internal set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
#pragma warning disable IDE0060 // Remove unused parameter
#pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring as nullable.

namespace Azure.Iot.Operations.Protocol.Streaming
{
public abstract class StreamingCommandExecutor<TReq, TResp> : IAsyncDisposable
where TReq : class
where TResp : class
{
/// <summary>
/// A streaming command was invoked
/// </summary>
/// <remarks>
/// The callback provides the stream of requests and requires the user to return one to many responses.
/// </remarks>
public required Func<IStreamContext<ReceivedStreamingExtendedRequest<TReq>>, RequestStreamMetadata, IAsyncEnumerable<StreamingExtendedResponse<TResp>>> OnStreamingCommandReceived { get; set; }

public string RequestTopicPattern { get; init; }

/// <summary>
/// The topic token replacement map that this executor will use by default. Generally, this will include the token values
/// for topic tokens such as "executorId" which should be the same for the duration of this command executor's lifetime.
/// </summary>
/// <remarks>
/// Tokens replacement values can also be specified when starting the executor by specifying the additionalTopicToken map in <see cref="StartAsync(int?, Dictionary{string, string}?, CancellationToken)"/>.
/// </remarks>
public Dictionary<string, string> TopicTokenMap { get; protected set; }

/// <summary>
/// If true, this executor will acknowledge the MQTT message associated with each streaming request as soon as it arrives.
/// If false, the user must call <see cref="ReceivedStreamingExtendedRequest{TReq}.AcknowledgeAsync"/> once they are done processing
/// each request message.
/// </summary>
/// <remarks>
/// Generally, delaying acknowledgement allows for re-delivery by the broker in cases where the executor crashes or restarts unexpectedly.
/// However, MQTT acknowledgements must be delivered in order, so delaying these acknowledgements may affect the flow of acknowledgements
/// being sent by other processes using this same MQTT client. Additionally, the MQTT broker has a limit on the number of un-acknowledged messages
/// that are allowed to be in-flight at a single moment, so delaying too many acknowledgements may halt all further MQTT traffic on the underlying
/// MQTT client.
/// </remarks>
public bool AutomaticallyAcknowledgeRequests { get; set; } = true;

public Task StartAsync(CancellationToken cancellationToken = default)
{
// TODO: derive the expected request topic (like command executor does)

// TODO: subscribe to the shared subscription prefixed request topic

throw new NotImplementedException();
}

public Task StopAsync(CancellationToken cancellationToken = default)
{
// TODO: Unsubscribe from the request topic derived in StartAsync

throw new NotImplementedException();

Check failure on line 63 in dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandExecutor.cs

View workflow job for this annotation

GitHub Actions / dotnet test report

Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests ► CanStreamRequestsAndResponsesSimultaneously

Failed test found in: dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/TestResults/test-results.trx Error: System.NotImplementedException : The method or operation is not implemented.
Raw output
System.NotImplementedException : The method or operation is not implemented.
   at Azure.Iot.Operations.Protocol.Streaming.StreamingCommandExecutor`2.StopAsync(CancellationToken cancellationToken) in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandExecutor.cs:line 63
   at Azure.Iot.Operations.Protocol.Streaming.StreamingCommandExecutor`2.DisposeAsync() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandExecutor.cs:line 68
   at Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests.CanStreamRequestsAndResponsesSimultaneously() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/StreamingIntegrationTests.cs:line 286
   at Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests.CanStreamRequestsAndResponsesSimultaneously() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/StreamingIntegrationTests.cs:line 286
   at Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests.CanStreamRequestsAndResponsesSimultaneously() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/StreamingIntegrationTests.cs:line 286
--- End of stack trace from previous location ---

Check failure on line 63 in dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandExecutor.cs

View workflow job for this annotation

GitHub Actions / dotnet test report

Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests ► ExecutorCanCancelWhileStreamingRequests

Failed test found in: dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/TestResults/test-results.trx Error: System.NotImplementedException : The method or operation is not implemented.
Raw output
System.NotImplementedException : The method or operation is not implemented.
   at Azure.Iot.Operations.Protocol.Streaming.StreamingCommandExecutor`2.StopAsync(CancellationToken cancellationToken) in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandExecutor.cs:line 63
   at Azure.Iot.Operations.Protocol.Streaming.StreamingCommandExecutor`2.DisposeAsync() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandExecutor.cs:line 68
   at Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests.ExecutorCanCancelWhileStreamingRequests() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/StreamingIntegrationTests.cs:line 194
   at Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests.ExecutorCanCancelWhileStreamingRequests() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/StreamingIntegrationTests.cs:line 194
   at Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests.ExecutorCanCancelWhileStreamingRequests() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/StreamingIntegrationTests.cs:line 194
--- End of stack trace from previous location ---

Check failure on line 63 in dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandExecutor.cs

View workflow job for this annotation

GitHub Actions / dotnet test report

Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests ► ExecutorCanCancelWhileStreamingResponses

Failed test found in: dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/TestResults/test-results.trx Error: System.NotImplementedException : The method or operation is not implemented.
Raw output
System.NotImplementedException : The method or operation is not implemented.
   at Azure.Iot.Operations.Protocol.Streaming.StreamingCommandExecutor`2.StopAsync(CancellationToken cancellationToken) in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandExecutor.cs:line 63
   at Azure.Iot.Operations.Protocol.Streaming.StreamingCommandExecutor`2.DisposeAsync() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandExecutor.cs:line 68
   at Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests.ExecutorCanCancelWhileStreamingResponses() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/StreamingIntegrationTests.cs:line 232
   at Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests.ExecutorCanCancelWhileStreamingResponses() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/StreamingIntegrationTests.cs:line 232
   at Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests.ExecutorCanCancelWhileStreamingResponses() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/StreamingIntegrationTests.cs:line 232
--- End of stack trace from previous location ---

Check failure on line 63 in dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandExecutor.cs

View workflow job for this annotation

GitHub Actions / dotnet test report

Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests ► ExecutorCanCompleteResponseStreamWithYieldBreak

Failed test found in: dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/TestResults/test-results.trx Error: System.NotImplementedException : The method or operation is not implemented.
Raw output
System.NotImplementedException : The method or operation is not implemented.
   at Azure.Iot.Operations.Protocol.Streaming.StreamingCommandExecutor`2.StopAsync(CancellationToken cancellationToken) in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandExecutor.cs:line 63
   at Azure.Iot.Operations.Protocol.Streaming.StreamingCommandExecutor`2.DisposeAsync() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandExecutor.cs:line 68
   at Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests.ExecutorCanCompleteResponseStreamWithYieldBreak() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/StreamingIntegrationTests.cs:line 349
   at Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests.ExecutorCanCompleteResponseStreamWithYieldBreak() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/StreamingIntegrationTests.cs:line 349
   at Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests.ExecutorCanCompleteResponseStreamWithYieldBreak() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/StreamingIntegrationTests.cs:line 349
--- End of stack trace from previous location ---

Check failure on line 63 in dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandExecutor.cs

View workflow job for this annotation

GitHub Actions / dotnet test report

Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests ► InvokerAndExecutorCanDelayAcknowledgements

Failed test found in: dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/TestResults/test-results.trx Error: System.NotImplementedException : The method or operation is not implemented.
Raw output
System.NotImplementedException : The method or operation is not implemented.
   at Azure.Iot.Operations.Protocol.Streaming.StreamingCommandExecutor`2.StopAsync(CancellationToken cancellationToken) in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandExecutor.cs:line 63
   at Azure.Iot.Operations.Protocol.Streaming.StreamingCommandExecutor`2.DisposeAsync() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandExecutor.cs:line 68
   at Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests.InvokerAndExecutorCanDelayAcknowledgements() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/StreamingIntegrationTests.cs:line 377
   at Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests.InvokerAndExecutorCanDelayAcknowledgements() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/StreamingIntegrationTests.cs:line 377
   at Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests.InvokerAndExecutorCanDelayAcknowledgements() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/StreamingIntegrationTests.cs:line 377
--- End of stack trace from previous location ---

Check failure on line 63 in dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandExecutor.cs

View workflow job for this annotation

GitHub Actions / dotnet test report

Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests ► InvokerCanCancelWhileStreamingRequests

Failed test found in: dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/TestResults/test-results.trx Error: System.NotImplementedException : The method or operation is not implemented.
Raw output
System.NotImplementedException : The method or operation is not implemented.
   at Azure.Iot.Operations.Protocol.Streaming.StreamingCommandExecutor`2.StopAsync(CancellationToken cancellationToken) in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandExecutor.cs:line 63
   at Azure.Iot.Operations.Protocol.Streaming.StreamingCommandExecutor`2.DisposeAsync() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandExecutor.cs:line 68
   at Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests.InvokerCanCancelWhileStreamingRequests() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/StreamingIntegrationTests.cs:line 125
   at Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests.InvokerCanCancelWhileStreamingRequests() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/StreamingIntegrationTests.cs:line 125
   at Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests.InvokerCanCancelWhileStreamingRequests() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/StreamingIntegrationTests.cs:line 125
--- End of stack trace from previous location ---

Check failure on line 63 in dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandExecutor.cs

View workflow job for this annotation

GitHub Actions / dotnet test report

Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests ► InvokerCanCancelWhileStreamingResponses

Failed test found in: dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/TestResults/test-results.trx Error: System.NotImplementedException : The method or operation is not implemented.
Raw output
System.NotImplementedException : The method or operation is not implemented.
   at Azure.Iot.Operations.Protocol.Streaming.StreamingCommandExecutor`2.StopAsync(CancellationToken cancellationToken) in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandExecutor.cs:line 63
   at Azure.Iot.Operations.Protocol.Streaming.StreamingCommandExecutor`2.DisposeAsync() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandExecutor.cs:line 68
   at Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests.InvokerCanCancelWhileStreamingResponses() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/StreamingIntegrationTests.cs:line 147
   at Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests.InvokerCanCancelWhileStreamingResponses() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/StreamingIntegrationTests.cs:line 147
   at Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests.InvokerCanCancelWhileStreamingResponses() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/StreamingIntegrationTests.cs:line 147
--- End of stack trace from previous location ---

Check failure on line 63 in dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandExecutor.cs

View workflow job for this annotation

GitHub Actions / dotnet test report

Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests ► InvokerCanCompleteRequestStreamWithYieldBreak

Failed test found in: dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/TestResults/test-results.trx Error: System.NotImplementedException : The method or operation is not implemented.
Raw output
System.NotImplementedException : The method or operation is not implemented.
   at Azure.Iot.Operations.Protocol.Streaming.StreamingCommandExecutor`2.StopAsync(CancellationToken cancellationToken) in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandExecutor.cs:line 63
   at Azure.Iot.Operations.Protocol.Streaming.StreamingCommandExecutor`2.DisposeAsync() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandExecutor.cs:line 68
   at Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests.InvokerCanCompleteRequestStreamWithYieldBreak() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/StreamingIntegrationTests.cs:line 324
   at Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests.InvokerCanCompleteRequestStreamWithYieldBreak() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/StreamingIntegrationTests.cs:line 324
   at Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests.InvokerCanCompleteRequestStreamWithYieldBreak() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/StreamingIntegrationTests.cs:line 324
--- End of stack trace from previous location ---

Check failure on line 63 in dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandExecutor.cs

View workflow job for this annotation

GitHub Actions / dotnet test report

Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests ► StreamRequestsAndResponsesInSerial(multipleRequests: False, multipleResponses: False)

Failed test found in: dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/TestResults/test-results.trx dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/TestResults/test-results.trx dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/TestResults/test-results.trx dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/TestResults/test-results.trx Error: System.NotImplementedException : The method or operation is not implemented.
Raw output
System.NotImplementedException : The method or operation is not implemented.
   at Azure.Iot.Operations.Protocol.Streaming.StreamingCommandExecutor`2.StopAsync(CancellationToken cancellationToken) in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandExecutor.cs:line 63
   at Azure.Iot.Operations.Protocol.Streaming.StreamingCommandExecutor`2.DisposeAsync() in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/src/Azure.Iot.Operations.Protocol/Streaming/StreamingCommandExecutor.cs:line 68
   at Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests.StreamRequestsAndResponsesInSerial(Boolean multipleRequests, Boolean multipleResponses) in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/StreamingIntegrationTests.cs:line 94
   at Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests.StreamRequestsAndResponsesInSerial(Boolean multipleRequests, Boolean multipleResponses) in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/StreamingIntegrationTests.cs:line 94
   at Azure.Iot.Operations.Protocol.IntegrationTests.StreamingIntegrationTests.StreamRequestsAndResponsesInSerial(Boolean multipleRequests, Boolean multipleResponses) in /home/runner/work/iot-operations-sdks/iot-operations-sdks/dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/StreamingIntegrationTests.cs:line 94
--- End of stack trace from previous location ---
}

public async ValueTask DisposeAsync()
{
await StopAsync();

GC.SuppressFinalize(this);
}
}
}
#pragma warning restore IDE0060 // Remove unused parameter
#pragma warning restore CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring as nullable.
Loading
Loading