Skip to content

Commit 6bc59c8

Browse files
committed
updatestest
1 parent 315dc3d commit 6bc59c8

File tree

4 files changed

+40
-24
lines changed

4 files changed

+40
-24
lines changed

eng/targets/Release.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
</PropertyGroup>
1818

1919
<PropertyGroup>
20-
<VersionPrefix>1.12.0</VersionPrefix>
20+
<VersionPrefix>1.13.0</VersionPrefix>
2121
<VersionSuffix></VersionSuffix>
2222
</PropertyGroup>
2323

test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,7 @@ public async Task PurgeInstances_WithFilter_EndToEnd()
229229
[InlineData(true)]
230230
public async Task RestartAsync_EndToEnd(bool restartWithNewInstanceId)
231231
{
232+
using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(1)); // Reduced timeout to 1 minute
232233
await using HostTestLifetime server = await this.StartAsync();
233234

234235
// Start an initial orchestration with shouldThrow = false to ensure it completes successfully
@@ -238,8 +239,8 @@ public async Task RestartAsync_EndToEnd(bool restartWithNewInstanceId)
238239
// Wait for it to start and then complete
239240
await server.Client.WaitForInstanceStartAsync(originalInstanceId, default);
240241
await server.Client.RaiseEventAsync(originalInstanceId, "event", default);
241-
await server.Client.WaitForInstanceCompletionAsync(originalInstanceId, default);
242-
242+
await server.Client.WaitForInstanceCompletionAsync(originalInstanceId, cts.Token);
243+
243244
// Verify the original orchestration completed
244245
OrchestrationMetadata? originalMetadata = await server.Client.GetInstanceAsync(originalInstanceId, true);
245246
originalMetadata.Should().NotBeNull();
@@ -258,32 +259,29 @@ public async Task RestartAsync_EndToEnd(bool restartWithNewInstanceId)
258259
restartedInstanceId.Should().Be(originalInstanceId);
259260
}
260261

261-
// Wait for the restarted orchestration to start
262-
await server.Client.WaitForInstanceStartAsync(restartedInstanceId, default);
263-
264-
// Verify the restarted orchestration has the same input
265-
OrchestrationMetadata? restartedMetadata = await server.Client.GetInstanceAsync(restartedInstanceId, true);
266-
restartedMetadata.Should().NotBeNull();
267-
restartedMetadata!.Name.Should().Be(OrchestrationName);
268-
restartedMetadata.SerializedInput.Should().Be("false");
269-
270262
// Complete the restarted orchestration
271-
await server.Client.RaiseEventAsync(restartedInstanceId, "event", default);
272-
await server.Client.WaitForInstanceCompletionAsync(restartedInstanceId, default);
263+
await server.Client.RaiseEventAsync(restartedInstanceId, "event");
264+
265+
// Wait for completion (with shorter timeout)
266+
using var completionCts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
267+
await server.Client.WaitForInstanceCompletionAsync(restartedInstanceId, completionCts.Token);
273268

274269
// Verify the restarted orchestration completed
275-
restartedMetadata = await server.Client.GetInstanceAsync(restartedInstanceId, true);
270+
var restartedMetadata = await server.Client.GetInstanceAsync(restartedInstanceId, true);
276271
restartedMetadata.Should().NotBeNull();
272+
restartedMetadata!.Name.Should().Be(OrchestrationName);
273+
restartedMetadata.SerializedInput.Should().Be("false");
277274
restartedMetadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed);
278275
}
279276

280277
[Fact]
281278
public async Task RestartAsync_InstanceNotFound_ThrowsArgumentException()
282279
{
280+
using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(1)); // 1-minute timeout
283281
await using HostTestLifetime server = await this.StartAsync();
284282

285283
// Try to restart a non-existent orchestration
286-
Func<Task> restartAction = () => server.Client.RestartAsync("non-existent-instance-id");
284+
Func<Task> restartAction = () => server.Client.RestartAsync("non-existent-instance-id", cancellation: cts.Token);
287285

288286
// Should throw ArgumentException
289287
await restartAction.Should().ThrowAsync<ArgumentException>()

test/Grpc.IntegrationTests/GrpcSidecar/Grpc/TaskHubGrpcServer.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,12 +339,20 @@ static P.GetInstanceResponse CreateGetInstanceResponse(OrchestrationState state,
339339
{
340340
// Get the original orchestration state
341341
IList<OrchestrationState> states = await this.client.GetOrchestrationStateAsync(request.InstanceId, false);
342+
342343
if (states == null || states.Count == 0)
343344
{
344345
throw new RpcException(new Status(StatusCode.NotFound, $"An orchestration with the instanceId {request.InstanceId} was not found."));
345346
}
346347

347348
OrchestrationState state = states[0];
349+
350+
// Check if the state is null
351+
if (state == null)
352+
{
353+
throw new RpcException(new Status(StatusCode.NotFound, $"An orchestration with the instanceId {request.InstanceId} was not found."));
354+
}
355+
348356
string newInstanceId = request.RestartWithNewInstanceId ? Guid.NewGuid().ToString("N") : request.InstanceId;
349357

350358
// Create a new orchestration instance

test/Grpc.IntegrationTests/GrpcSidecar/InMemoryOrchestrationService.cs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -444,8 +444,21 @@ public void AddMessage(TaskMessage message)
444444
SerializedInstanceState state = this.store.GetOrAdd(instanceId, id => new SerializedInstanceState(id, executionId));
445445
lock (state)
446446
{
447+
bool isRestart = state.ExecutionId != null && state.ExecutionId != executionId;
448+
447449
if (message.Event is ExecutionStartedEvent startEvent)
448450
{
451+
// For restart scenarios, clear the history and reset the state
452+
if (isRestart && state.IsCompleted)
453+
{
454+
state.HistoryEventsJson.Clear();
455+
state.ExecutionId = executionId; // Always update the execution ID
456+
state.IsCompleted = false;
457+
}
458+
459+
// Add the ExecutionStartedEvent to history so the orchestration worker has proper context
460+
state.HistoryEventsJson.Add(startEvent);
461+
449462
OrchestrationState newStatusRecord = new()
450463
{
451464
OrchestrationInstance = startEvent.OrchestrationInstance,
@@ -460,7 +473,6 @@ public void AddMessage(TaskMessage message)
460473
};
461474

462475
state.StatusRecordJson = JsonValue.Create(newStatusRecord);
463-
state.HistoryEventsJson.Clear();
464476
state.IsCompleted = false;
465477
}
466478
else if (state.IsCompleted)
@@ -661,13 +673,11 @@ public async ValueTask<SerializedInstanceState> TakeNextAsync(CancellationToken
661673

662674
public void Schedule(SerializedInstanceState state)
663675
{
664-
// TODO: There is a race condition here. If another thread is calling TakeNextAsync
665-
// and removed the queue item before updating the dictionary, then we'll fail
666-
// to update the readyToRunQueue and the orchestration will get stuck.
667-
if (this.readyInstances.TryAdd(state.InstanceId, state))
668-
{
669-
this.readyToRunQueue.Writer.TryWrite(state);
670-
}
676+
// Always update the dictionary entry (for restart scenarios)
677+
this.readyInstances[state.InstanceId] = state;
678+
679+
// Always try to write to the queue
680+
this.readyToRunQueue.Writer.TryWrite(state);
671681
}
672682
}
673683

0 commit comments

Comments
 (0)