Skip to content

Commit 5c546b8

Browse files
committed
Imported sidecar for tests
1 parent 43a1c37 commit 5c546b8

19 files changed

+3580
-712
lines changed

Directory.Packages.props

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
<ItemGroup>
3131
<PackageVersion Include="Microsoft.Azure.DurableTask.Core" Version="3.0.0" />
3232
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.DurableTask" Version="1.2.2" />
33-
<PackageVersion Include="Microsoft.DurableTask.Sidecar" Version="1.1.2" />
3433
</ItemGroup>
3534

3635
<!-- Grpc / Protobuf Packages -->

src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs

Lines changed: 524 additions & 523 deletions
Large diffs are not rendered by default.

test/Grpc.IntegrationTests/Grpc.IntegrationTests.csproj

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111

1212
<ItemGroup>
1313
<PackageReference Include="Grpc.AspNetCore.Server" />
14-
<PackageReference Include="Microsoft.DurableTask.Sidecar" />
1514
</ItemGroup>
1615

1716
</Project>
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
namespace Microsoft.DurableTask.Sidecar;
5+
6+
class AsyncManualResetEvent
7+
{
8+
readonly object mutex = new();
9+
TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
10+
11+
public AsyncManualResetEvent(bool isSignaled)
12+
{
13+
if (isSignaled)
14+
{
15+
this.tcs.TrySetCanceled();
16+
}
17+
}
18+
19+
public async Task<bool> WaitAsync(TimeSpan timeout, CancellationToken cancellationToken)
20+
{
21+
Task delayTask = Task.Delay(timeout, cancellationToken);
22+
Task waitTask = this.tcs.Task;
23+
24+
Task winner = await Task.WhenAny(waitTask, delayTask);
25+
26+
// Await ensures we get a TaskCancelledException if there was a cancellation.
27+
await winner;
28+
29+
return winner == waitTask;
30+
}
31+
32+
public bool IsSignaled => this.tcs.Task.IsCompleted;
33+
34+
/// <summary>
35+
/// Puts the event in the signaled state, unblocking any waiting threads.
36+
/// </summary>
37+
public bool Set()
38+
{
39+
lock (this.mutex)
40+
{
41+
return this.tcs.TrySetResult();
42+
}
43+
}
44+
45+
/// <summary>
46+
/// Puts this event into the unsignaled state, causing threads to block.
47+
/// </summary>
48+
public void Reset()
49+
{
50+
lock (this.mutex)
51+
{
52+
if (this.tcs.Task.IsCompleted)
53+
{
54+
this.tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
55+
}
56+
}
57+
}
58+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using DurableTask.Core;
5+
using DurableTask.Core.History;
6+
7+
namespace Microsoft.DurableTask.Sidecar.Dispatcher;
8+
9+
interface ITaskExecutor
10+
{
11+
/// <summary>
12+
/// When implemented by a concrete type, executes an orchestrator and returns the next set of orchestrator actions.
13+
/// </summary>
14+
/// <param name="instance">The instance ID information of the orchestrator to execute.</param>
15+
/// <param name="pastEvents">The history events for previous executions of this orchestration instance.</param>
16+
/// <param name="newEvents">The history events that have not yet been executed by this orchestration instance.</param>
17+
/// <returns>
18+
/// Returns a task containing the result of the orchestrator execution. These are effectively the side-effects of the
19+
/// orchestrator code, such as calling activities, scheduling timers, etc.
20+
/// </returns>
21+
Task<OrchestratorExecutionResult> ExecuteOrchestrator(
22+
OrchestrationInstance instance,
23+
IEnumerable<HistoryEvent> pastEvents,
24+
IEnumerable<HistoryEvent> newEvents);
25+
26+
/// <summary>
27+
/// When implemented by a concreate type, executes an activity task and returns its results.
28+
/// </summary>
29+
/// <param name="instance">The instance ID information of the orchestration that scheduled this activity task.</param>
30+
/// <param name="activityEvent">The metadata of the activity task execution, including the activity name and input.</param>
31+
/// <returns>Returns a task that contains the execution result of the activity.</returns>
32+
Task<ActivityExecutionResult> ExecuteActivity(
33+
OrchestrationInstance instance,
34+
TaskScheduledEvent activityEvent);
35+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
namespace Microsoft.DurableTask.Sidecar.Dispatcher;
5+
6+
/// <summary>
7+
/// A simple primitive that can be used to block logical threads until some condition occurs.
8+
/// </summary>
9+
interface ITrafficSignal
10+
{
11+
/// <summary>
12+
/// Provides a human-friendly reason for why the signal is in the "wait" state.
13+
/// </summary>
14+
string WaitReason { get; }
15+
16+
/// <summary>
17+
/// Blocks the caller until the <see cref="Set"/> method is called.
18+
/// </summary>
19+
/// <param name="waitTime">The amount of time to wait until the signal is unblocked.</param>
20+
/// <param name="cancellationToken">A cancellation token that can be used to interrupt a waiting caller.</param>
21+
/// <returns>
22+
/// Returns <c>true</c> if the traffic signal is all-clear; <c>false</c> if we timed-out waiting for the signal to clear.
23+
/// </returns>
24+
/// <exception cref="OperationCanceledException">
25+
/// Thrown if <paramref name="cancellationToken"/> is triggered while waiting.
26+
/// </exception>
27+
Task<bool> WaitAsync(TimeSpan waitTime, CancellationToken cancellationToken);
28+
}
29+
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using DurableTask.Core;
5+
using DurableTask.Core.History;
6+
using Microsoft.Extensions.Logging;
7+
8+
namespace Microsoft.DurableTask.Sidecar.Dispatcher;
9+
10+
class TaskActivityDispatcher : WorkItemDispatcher<TaskActivityWorkItem>
11+
{
12+
readonly IOrchestrationService service;
13+
readonly ITaskExecutor taskExecutor;
14+
15+
public TaskActivityDispatcher(ILogger log, ITrafficSignal trafficSignal, IOrchestrationService service, ITaskExecutor taskExecutor)
16+
: base(log, trafficSignal)
17+
{
18+
this.service = service;
19+
this.taskExecutor = taskExecutor;
20+
}
21+
22+
public override int MaxWorkItems => this.service.MaxConcurrentTaskActivityWorkItems;
23+
24+
public override Task AbandonWorkItemAsync(TaskActivityWorkItem workItem) =>
25+
this.service.AbandonTaskActivityWorkItemAsync(workItem);
26+
27+
public override Task<TaskActivityWorkItem?> FetchWorkItemAsync(TimeSpan timeout, CancellationToken cancellationToken) =>
28+
this.service.LockNextTaskActivityWorkItem(timeout, cancellationToken);
29+
30+
protected override async Task ExecuteWorkItemAsync(TaskActivityWorkItem workItem)
31+
{
32+
TaskScheduledEvent scheduledEvent = (TaskScheduledEvent)workItem.TaskMessage.Event;
33+
34+
// TODO: Error handling for internal errors (user code exceptions are handled by the executor).
35+
ActivityExecutionResult result = await this.taskExecutor.ExecuteActivity(
36+
instance: workItem.TaskMessage.OrchestrationInstance,
37+
activityEvent: scheduledEvent);
38+
39+
TaskMessage responseMessage = new()
40+
{
41+
Event = result.ResponseEvent,
42+
OrchestrationInstance = workItem.TaskMessage.OrchestrationInstance,
43+
};
44+
45+
await this.service.CompleteTaskActivityWorkItemAsync(workItem, responseMessage);
46+
}
47+
48+
public override int GetDelayInSecondsOnFetchException(Exception ex) =>
49+
this.service.GetDelayInSecondsAfterOnFetchException(ex);
50+
51+
public override string GetWorkItemId(TaskActivityWorkItem workItem) => workItem.Id;
52+
53+
// No-op
54+
public override Task ReleaseWorkItemAsync(TaskActivityWorkItem workItem) => Task.CompletedTask;
55+
56+
public override Task<TaskActivityWorkItem> RenewWorkItemAsync(TaskActivityWorkItem workItem) =>
57+
this.service.RenewTaskActivityWorkItemLockAsync(workItem);
58+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using DurableTask.Core;
5+
using Microsoft.Extensions.Logging;
6+
7+
namespace Microsoft.DurableTask.Sidecar.Dispatcher;
8+
9+
class TaskHubDispatcherHost
10+
{
11+
readonly TaskOrchestrationDispatcher orchestrationDispatcher;
12+
readonly TaskActivityDispatcher activityDispatcher;
13+
14+
readonly IOrchestrationService orchestrationService;
15+
readonly ILogger log;
16+
17+
public TaskHubDispatcherHost(
18+
ILoggerFactory loggerFactory,
19+
ITrafficSignal trafficSignal,
20+
IOrchestrationService orchestrationService,
21+
ITaskExecutor taskExecutor)
22+
{
23+
this.orchestrationService = orchestrationService ?? throw new ArgumentNullException(nameof(orchestrationService));
24+
this.log = loggerFactory.CreateLogger("Microsoft.DurableTask.Sidecar");
25+
26+
this.orchestrationDispatcher = new TaskOrchestrationDispatcher(log, trafficSignal, orchestrationService, taskExecutor);
27+
this.activityDispatcher = new TaskActivityDispatcher(log, trafficSignal, orchestrationService, taskExecutor);
28+
}
29+
30+
public async Task StartAsync(CancellationToken cancellationToken)
31+
{
32+
// Start any background processing in the orchestration service
33+
await this.orchestrationService.StartAsync();
34+
35+
// Start the dispatchers, which will allow orchestrations/activities to execute
36+
await Task.WhenAll(
37+
this.orchestrationDispatcher.StartAsync(cancellationToken),
38+
this.activityDispatcher.StartAsync(cancellationToken));
39+
}
40+
41+
public async Task StopAsync(CancellationToken cancellationToken)
42+
{
43+
// Stop the dispatchers from polling the orchestration service
44+
await Task.WhenAll(
45+
this.orchestrationDispatcher.StopAsync(cancellationToken),
46+
this.activityDispatcher.StopAsync(cancellationToken));
47+
48+
// Tell the storage provider to stop doing any background work.
49+
await this.orchestrationService.StopAsync();
50+
}
51+
}

0 commit comments

Comments
 (0)