Skip to content

Commit 2e3552e

Browse files
jsedlakmikekistler
andauthored
Update streams-programming-apis.md implicit subscription section to b… (#41988)
* Update streams-programming-apis.md implicit subscription section to be more clear Adds information regarding implementing IAsyncObserver<T> and IStreamSubscriptionObserver for Implicit Streams instead of explicitly subscribing (which does not apply in the case of an implicit subscription) * Updates for clarity surrounding the various options. * Update docs/orleans/streaming/streams-programming-apis.md Suggestion / Word Correction Co-authored-by: Mike Kistler <[email protected]> --------- Co-authored-by: Mike Kistler <[email protected]>
1 parent 65ce618 commit 2e3552e

File tree

1 file changed

+10
-6
lines changed

1 file changed

+10
-6
lines changed

docs/orleans/streaming/streams-programming-apis.md

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -164,20 +164,24 @@ Below are the guidelines on how to write the subscription logic for various case
164164

165165
**Implicit subscriptions:**
166166

167-
For implicit subscriptions, the grain needs to subscribe to attach the processing logic. This should be done in the grain's `OnActivateAsync` method. The grain should simply execute `await stream.SubscribeAsync(OnNext ...)` in its `OnActivateAsync` method. That will cause this particular activation to attach the `OnNext` function to process that stream. The grain can optionally specify the `StreamSequenceToken` as an argument to `SubscribeAsync`, which will cause this implicit subscription to start consuming from that token. There is never a need for an implicit subscription to call `ResumeAsync`.
167+
For implicit subscriptions, the grain still needs to subscribe to attach the processing logic. This can be done in the consumer grain by implementing the `IStreamSubscriptionObserver` and `IAsyncObserver<T>` interfaces, allowing the grain to activate separately from subscribing. To subscribe to the stream, the grain creates a handle and calls `await handle.ResumeAsync(this)` in its `OnSubscribed(...)` method.
168+
169+
To process messages, the `IAsyncObserver<T>.OnNextAsync(...)` method is implemented to receive stream data and a sequence token. Alternatively, the `ResumeAsync` method may take a set of delegates representing the methods of the `IAsyncObserver<T>` interface, `onNextAsync`, `onErrorAsync`, and `onCompletedAsync`.
168170

169171
<!-- markdownlint-disable MD044 -->
170172
:::zone target="docs" pivot="orleans-7-0"
171173
<!-- markdownlint-enable MD044 -->
172174

173175
```csharp
174-
public override async Task OnActivateAsync(CancellationToken cancellationToken)
176+
public Task OnNextAsync(string item, StreamSequenceToken? token = null)
175177
{
176-
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
177-
var streamId = StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
178-
var stream = streamProvider.GetStream<string>(streamId);
178+
_logger.LogInformation($"Received an item from the stream: {item}");
179+
}
179180

180-
await stream.SubscribeAsync(OnNextAsync);
181+
public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
182+
{
183+
var handle = handleFactory.Create<string>();
184+
await handle.ResumeAsync(this);
181185
}
182186
```
183187

0 commit comments

Comments
 (0)