Skip to content

Commit 919e895

Browse files
authored
Implement ListInstanceIDs, GetInstanceHistory, and RerunWorkflowFromEvent workflow RPCs (#1738)
* Implement ListInstanceIDs, GetInstanceHistory, and RerunWorkflowFromEvent workflow RPCs Add the three missing workflow RPCs that are defined in the proto but were not yet exposed through the SDK client. This includes the full stack: abstract base class, gRPC implementation, public interface, public client, model types, logging, and tests. Signed-off-by: joshvanl <me@joshvanl.dev> * Review comments Signed-off-by: joshvanl <me@joshvanl.dev> --------- Signed-off-by: joshvanl <me@joshvanl.dev>
1 parent dcfc63f commit 919e895

File tree

13 files changed

+933
-0
lines changed

13 files changed

+933
-0
lines changed

src/Dapr.Workflow/Client/ProtoConverters.cs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,42 @@ public static WorkflowRuntimeStatus ToRuntimeStatus(OrchestrationStatus status)
5252
OrchestrationStatus.Stalled => WorkflowRuntimeStatus.Stalled,
5353
_ => WorkflowRuntimeStatus.Unknown
5454
};
55+
56+
/// <summary>
57+
/// Converts a proto <see cref="HistoryEvent"/> to <see cref="WorkflowHistoryEvent"/>.
58+
/// </summary>
59+
public static WorkflowHistoryEvent ToWorkflowHistoryEvent(HistoryEvent historyEvent) =>
60+
new(historyEvent.EventId,
61+
ToHistoryEventType(historyEvent.EventTypeCase),
62+
historyEvent.Timestamp?.ToDateTime() ?? DateTime.MinValue);
63+
64+
/// <summary>
65+
/// Converts the proto history event type to <see cref="WorkflowHistoryEventType"/>.
66+
/// </summary>
67+
public static WorkflowHistoryEventType ToHistoryEventType(HistoryEvent.EventTypeOneofCase eventType)
68+
=> eventType switch
69+
{
70+
HistoryEvent.EventTypeOneofCase.ExecutionStarted => WorkflowHistoryEventType.ExecutionStarted,
71+
HistoryEvent.EventTypeOneofCase.ExecutionCompleted => WorkflowHistoryEventType.ExecutionCompleted,
72+
HistoryEvent.EventTypeOneofCase.ExecutionTerminated => WorkflowHistoryEventType.ExecutionTerminated,
73+
HistoryEvent.EventTypeOneofCase.TaskScheduled => WorkflowHistoryEventType.TaskScheduled,
74+
HistoryEvent.EventTypeOneofCase.TaskCompleted => WorkflowHistoryEventType.TaskCompleted,
75+
HistoryEvent.EventTypeOneofCase.TaskFailed => WorkflowHistoryEventType.TaskFailed,
76+
HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceCreated => WorkflowHistoryEventType.SubOrchestrationInstanceCreated,
77+
HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceCompleted => WorkflowHistoryEventType.SubOrchestrationInstanceCompleted,
78+
HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceFailed => WorkflowHistoryEventType.SubOrchestrationInstanceFailed,
79+
HistoryEvent.EventTypeOneofCase.TimerCreated => WorkflowHistoryEventType.TimerCreated,
80+
HistoryEvent.EventTypeOneofCase.TimerFired => WorkflowHistoryEventType.TimerFired,
81+
HistoryEvent.EventTypeOneofCase.OrchestratorStarted => WorkflowHistoryEventType.OrchestratorStarted,
82+
HistoryEvent.EventTypeOneofCase.OrchestratorCompleted => WorkflowHistoryEventType.OrchestratorCompleted,
83+
HistoryEvent.EventTypeOneofCase.EventSent => WorkflowHistoryEventType.EventSent,
84+
HistoryEvent.EventTypeOneofCase.EventRaised => WorkflowHistoryEventType.EventRaised,
85+
HistoryEvent.EventTypeOneofCase.GenericEvent => WorkflowHistoryEventType.GenericEvent,
86+
HistoryEvent.EventTypeOneofCase.HistoryState => WorkflowHistoryEventType.HistoryState,
87+
HistoryEvent.EventTypeOneofCase.ContinueAsNew => WorkflowHistoryEventType.ContinueAsNew,
88+
HistoryEvent.EventTypeOneofCase.ExecutionSuspended => WorkflowHistoryEventType.ExecutionSuspended,
89+
HistoryEvent.EventTypeOneofCase.ExecutionResumed => WorkflowHistoryEventType.ExecutionResumed,
90+
HistoryEvent.EventTypeOneofCase.ExecutionStalled => WorkflowHistoryEventType.ExecutionStalled,
91+
_ => WorkflowHistoryEventType.Unknown
92+
};
5593
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// ------------------------------------------------------------------------
2+
// Copyright 2026 The Dapr Authors
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
// ------------------------------------------------------------------------
13+
14+
namespace Dapr.Workflow.Client;
15+
16+
/// <summary>
17+
/// Options for the <see cref="Dapr.Workflow.IDaprWorkflowClient.RerunWorkflowFromEventAsync"/> operation.
18+
/// </summary>
19+
public sealed class RerunWorkflowFromEventOptions
20+
{
21+
/// <summary>
22+
/// Gets or sets the new instance ID to use for the rerun workflow instance.
23+
/// If not specified, a random instance ID will be generated.
24+
/// </summary>
25+
public string? NewInstanceId { get; set; }
26+
27+
/// <summary>
28+
/// Gets or sets the optional input to provide when rerunning the workflow, applied at the
29+
/// next activity event. When set, <see cref="OverwriteInput"/> must also be set to <c>true</c>.
30+
/// </summary>
31+
public object? Input { get; set; }
32+
33+
/// <summary>
34+
/// Gets or sets a value indicating whether the workflow's input at the rerun point (for the
35+
/// next activity event) should be overwritten with <see cref="Input"/>.
36+
/// </summary>
37+
public bool OverwriteInput { get; set; }
38+
}

src/Dapr.Workflow/Client/WorkflowClient.cs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// ------------------------------------------------------------------------
1313

1414
using System;
15+
using System.Collections.Generic;
1516
using System.Threading;
1617
using System.Threading.Tasks;
1718

@@ -182,6 +183,42 @@ public abstract Task ResumeWorkflowAsync(string instanceId, string? reason = nul
182183
/// <c>false</c>.</returns>
183184
public abstract Task<bool> PurgeInstanceAsync(string instanceId, CancellationToken cancellationToken = default);
184185

186+
/// <summary>
187+
/// Lists workflow instance IDs with optional pagination.
188+
/// </summary>
189+
/// <param name="continuationToken">The continuation token from a previous call, or <c>null</c> for the first page.</param>
190+
/// <param name="pageSize">The maximum number of instance IDs to return, or <c>null</c> for no limit.</param>
191+
/// <param name="cancellationToken">A token that can be used to cancel the operation.</param>
192+
/// <returns>A page of instance IDs and an optional continuation token for the next page.</returns>
193+
public abstract Task<WorkflowInstancePage> ListInstanceIdsAsync(
194+
string? continuationToken = null,
195+
int? pageSize = null,
196+
CancellationToken cancellationToken = default);
197+
198+
/// <summary>
199+
/// Gets the full history of a workflow instance.
200+
/// </summary>
201+
/// <param name="instanceId">The instance ID of the workflow to get history for.</param>
202+
/// <param name="cancellationToken">A token that can be used to cancel the operation.</param>
203+
/// <returns>The list of history events for the workflow instance.</returns>
204+
public abstract Task<IReadOnlyList<WorkflowHistoryEvent>> GetInstanceHistoryAsync(
205+
string instanceId,
206+
CancellationToken cancellationToken = default);
207+
208+
/// <summary>
209+
/// Reruns a workflow from a specific event ID, creating a new workflow instance.
210+
/// </summary>
211+
/// <param name="sourceInstanceId">The instance ID of the source workflow to rerun from.</param>
212+
/// <param name="eventId">The event ID to rerun from.</param>
213+
/// <param name="options">Optional configuration for the rerun operation.</param>
214+
/// <param name="cancellationToken">A token that can be used to cancel the operation.</param>
215+
/// <returns>The instance ID of the new workflow instance.</returns>
216+
public abstract Task<string> RerunWorkflowFromEventAsync(
217+
string sourceInstanceId,
218+
uint eventId,
219+
RerunWorkflowFromEventOptions? options = null,
220+
CancellationToken cancellationToken = default);
221+
185222
/// <inheritdoc />
186223
public abstract ValueTask DisposeAsync();
187224
}

src/Dapr.Workflow/Client/WorkflowGrpcClient.cs

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
// ------------------------------------------------------------------------
1313

1414
using System;
15+
using System.Collections.Generic;
16+
using System.Linq;
1517
using System.Threading;
1618
using System.Threading.Tasks;
1719
using Dapr.Common;
@@ -224,6 +226,101 @@ public override async Task<bool> PurgeInstanceAsync(string instanceId, Cancellat
224226
return purged;
225227
}
226228

229+
/// <inheritdoc />
230+
public override async Task<WorkflowInstancePage> ListInstanceIdsAsync(
231+
string? continuationToken = null,
232+
int? pageSize = null,
233+
CancellationToken cancellationToken = default)
234+
{
235+
var request = new grpc.ListInstanceIDsRequest();
236+
237+
if (continuationToken is not null)
238+
{
239+
request.ContinuationToken = continuationToken;
240+
}
241+
242+
if (pageSize.HasValue)
243+
{
244+
ArgumentOutOfRangeException.ThrowIfLessThanOrEqual(pageSize.Value, 0, nameof(pageSize));
245+
request.PageSize = (uint)pageSize.Value;
246+
}
247+
248+
var grpcCallOptions = CreateCallOptions(cancellationToken);
249+
var response = await grpcClient.ListInstanceIDsAsync(request, grpcCallOptions);
250+
251+
logger.LogListInstanceIds(response.InstanceIds.Count);
252+
253+
return new WorkflowInstancePage(
254+
response.InstanceIds.ToList().AsReadOnly(),
255+
response.HasContinuationToken ? response.ContinuationToken : null);
256+
}
257+
258+
/// <inheritdoc />
259+
public override async Task<IReadOnlyList<WorkflowHistoryEvent>> GetInstanceHistoryAsync(
260+
string instanceId,
261+
CancellationToken cancellationToken = default)
262+
{
263+
ArgumentException.ThrowIfNullOrEmpty(instanceId);
264+
265+
var request = new grpc.GetInstanceHistoryRequest
266+
{
267+
InstanceId = instanceId
268+
};
269+
270+
var grpcCallOptions = CreateCallOptions(cancellationToken);
271+
var response = await grpcClient.GetInstanceHistoryAsync(request, grpcCallOptions);
272+
273+
var events = response.Events
274+
.Select(ProtoConverters.ToWorkflowHistoryEvent)
275+
.ToList()
276+
.AsReadOnly();
277+
278+
logger.LogGetInstanceHistory(instanceId, events.Count);
279+
280+
return events;
281+
}
282+
283+
/// <inheritdoc />
284+
public override async Task<string> RerunWorkflowFromEventAsync(
285+
string sourceInstanceId,
286+
uint eventId,
287+
RerunWorkflowFromEventOptions? options = null,
288+
CancellationToken cancellationToken = default)
289+
{
290+
ArgumentException.ThrowIfNullOrEmpty(sourceInstanceId);
291+
292+
if (options is { Input: not null, OverwriteInput: false })
293+
{
294+
throw new ArgumentException(
295+
$"{nameof(RerunWorkflowFromEventOptions.OverwriteInput)} must be true when {nameof(RerunWorkflowFromEventOptions.Input)} is set.",
296+
nameof(options));
297+
}
298+
299+
var request = new grpc.RerunWorkflowFromEventRequest
300+
{
301+
SourceInstanceID = sourceInstanceId,
302+
EventID = eventId,
303+
OverwriteInput = options?.OverwriteInput ?? false
304+
};
305+
306+
if (options?.NewInstanceId is not null)
307+
{
308+
request.NewInstanceID = options.NewInstanceId;
309+
}
310+
311+
if (options is { OverwriteInput: true })
312+
{
313+
request.Input = SerializeToJson(options.Input);
314+
}
315+
316+
var grpcCallOptions = CreateCallOptions(cancellationToken);
317+
var response = await grpcClient.RerunWorkflowFromEventAsync(request, grpcCallOptions);
318+
319+
logger.LogRerunWorkflowFromEvent(sourceInstanceId, eventId, response.NewInstanceID);
320+
321+
return response.NewInstanceID;
322+
}
323+
227324
/// <inheritdoc />
228325
public override ValueTask DisposeAsync()
229326
{
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// ------------------------------------------------------------------------
2+
// Copyright 2026 The Dapr Authors
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
// ------------------------------------------------------------------------
13+
14+
using System;
15+
16+
namespace Dapr.Workflow.Client;
17+
18+
/// <summary>
19+
/// Represents a single event in a workflow instance's history.
20+
/// </summary>
21+
/// <param name="EventId">The unique event ID within the workflow instance history.</param>
22+
/// <param name="EventType">The type of history event.</param>
23+
/// <param name="Timestamp">The timestamp when the event occurred.</param>
24+
public sealed record WorkflowHistoryEvent(
25+
int EventId,
26+
WorkflowHistoryEventType EventType,
27+
DateTime Timestamp);
28+
29+
/// <summary>
30+
/// Represents the type of a workflow history event.
31+
/// </summary>
32+
public enum WorkflowHistoryEventType
33+
{
34+
/// <summary>Unknown event type.</summary>
35+
Unknown = 0,
36+
37+
/// <summary>The workflow execution started.</summary>
38+
ExecutionStarted,
39+
40+
/// <summary>The workflow execution completed.</summary>
41+
ExecutionCompleted,
42+
43+
/// <summary>The workflow execution was terminated.</summary>
44+
ExecutionTerminated,
45+
46+
/// <summary>A task (activity) was scheduled.</summary>
47+
TaskScheduled,
48+
49+
/// <summary>A task (activity) completed successfully.</summary>
50+
TaskCompleted,
51+
52+
/// <summary>A task (activity) failed.</summary>
53+
TaskFailed,
54+
55+
/// <summary>A sub-orchestration instance was created.</summary>
56+
SubOrchestrationInstanceCreated,
57+
58+
/// <summary>A sub-orchestration instance completed.</summary>
59+
SubOrchestrationInstanceCompleted,
60+
61+
/// <summary>A sub-orchestration instance failed.</summary>
62+
SubOrchestrationInstanceFailed,
63+
64+
/// <summary>A timer was created.</summary>
65+
TimerCreated,
66+
67+
/// <summary>A timer fired.</summary>
68+
TimerFired,
69+
70+
/// <summary>An orchestrator started processing.</summary>
71+
OrchestratorStarted,
72+
73+
/// <summary>An orchestrator completed processing.</summary>
74+
OrchestratorCompleted,
75+
76+
/// <summary>An event was sent to another instance.</summary>
77+
EventSent,
78+
79+
/// <summary>An external event was raised.</summary>
80+
EventRaised,
81+
82+
/// <summary>A generic event.</summary>
83+
GenericEvent,
84+
85+
/// <summary>A history state event.</summary>
86+
HistoryState,
87+
88+
/// <summary>The workflow continued as new.</summary>
89+
ContinueAsNew,
90+
91+
/// <summary>The workflow execution was suspended.</summary>
92+
ExecutionSuspended,
93+
94+
/// <summary>The workflow execution was resumed.</summary>
95+
ExecutionResumed,
96+
97+
/// <summary>The workflow execution stalled.</summary>
98+
ExecutionStalled,
99+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// ------------------------------------------------------------------------
2+
// Copyright 2026 The Dapr Authors
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
// ------------------------------------------------------------------------
13+
14+
using System.Collections.Generic;
15+
16+
namespace Dapr.Workflow.Client;
17+
18+
/// <summary>
19+
/// Represents a page of workflow instance IDs returned by a list operation.
20+
/// </summary>
21+
/// <param name="InstanceIds">The workflow instance IDs in this page.</param>
22+
/// <param name="ContinuationToken">
23+
/// The continuation token used to retrieve the next page, or <c>null</c> if there are no more pages.
24+
/// </param>
25+
public sealed record WorkflowInstancePage(
26+
IReadOnlyList<string> InstanceIds,
27+
string? ContinuationToken);

0 commit comments

Comments
 (0)