Skip to content

Commit 8d3c1d4

Browse files
authored
fix infinite loop when use continueasnew after wait external event (#183)
1 parent 81a8e7e commit 8d3c1d4

File tree

3 files changed

+73
-6
lines changed

3 files changed

+73
-6
lines changed

client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -787,12 +787,17 @@ private void addCarryoverEvents(CompleteOrchestrationAction.Builder builder) {
787787
// We don't check the event in the pass event list to avoid duplicated events.
788788
Set<HistoryEvent> externalEvents = new HashSet<>(this.unprocessedEvents);
789789
List<HistoryEvent> newEvents = this.historyEventPlayer.getNewEvents();
790+
int currentHistoryIndex = this.historyEventPlayer.getCurrentHistoryIndex();
791+
792+
// Only add events that haven't been processed to the carryOverEvents
793+
// currentHistoryIndex will point to the first unprocessed event
794+
for (int i = currentHistoryIndex; i < newEvents.size(); i++) {
795+
HistoryEvent historyEvent = newEvents.get(i);
796+
if (historyEvent.getEventTypeCase() == HistoryEvent.EventTypeCase.EVENTRAISED) {
797+
externalEvents.add(historyEvent);
798+
}
799+
}
790800

791-
Set<HistoryEvent> filteredEvents = newEvents.stream()
792-
.filter(e -> e.getEventTypeCase() == HistoryEvent.EventTypeCase.EVENTRAISED)
793-
.collect(Collectors.toSet());
794-
795-
externalEvents.addAll(filteredEvents);
796801
externalEvents.forEach(builder::addCarryoverEvents);
797802
}
798803

@@ -946,7 +951,11 @@ public boolean moveNext() {
946951
}
947952

948953
List<HistoryEvent> getNewEvents() {
949-
return newEvents;
954+
return this.newEvents;
955+
}
956+
957+
int getCurrentHistoryIndex() {
958+
return this.currentHistoryIndex;
950959
}
951960
}
952961

samples-azure-functions/src/main/java/com/functions/ContinueAsNew.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.microsoft.azure.functions.annotation.FunctionName;
99
import com.microsoft.azure.functions.annotation.HttpTrigger;
1010
import com.microsoft.durabletask.DurableTaskClient;
11+
import com.microsoft.durabletask.Task;
1112
import com.microsoft.durabletask.TaskOrchestrationContext;
1213
import com.microsoft.durabletask.azurefunctions.DurableClientContext;
1314
import com.microsoft.durabletask.azurefunctions.DurableClientInput;
@@ -37,4 +38,29 @@ public void eternalOrchestrator(@DurableOrchestrationTrigger(name = "runtimeStat
3738
ctx.createTimer(Duration.ofSeconds(2)).await();
3839
ctx.continueAsNew(null);
3940
}
41+
42+
@FunctionName("ContinueAsNewExternalEvent")
43+
public HttpResponseMessage continueAsNewExternalEvent(
44+
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
45+
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
46+
final ExecutionContext context) {
47+
context.getLogger().info("Java HTTP trigger processed a request.");
48+
49+
DurableTaskClient client = durableContext.getClient();
50+
String instanceId = client.scheduleNewOrchestrationInstance("EternalEvent");
51+
context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);
52+
return durableContext.createCheckStatusResponse(request, instanceId);
53+
}
54+
55+
@FunctionName("EternalEvent")
56+
public void eternalEvent(@DurableOrchestrationTrigger(name = "runtimeState") TaskOrchestrationContext ctx)
57+
{
58+
System.out.println("Waiting external event...");
59+
Task<Void> event = ctx.waitForExternalEvent("event");
60+
Task<Void> timer = ctx.createTimer(Duration.ofSeconds(10));
61+
Task<?> result = ctx.anyOf(event, timer).await();
62+
if (result == event) {
63+
ctx.continueAsNew(null);
64+
}
65+
}
4066
}

samples-azure-functions/src/test/java/com/functions/EndToEndTests.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.time.Instant;
1515
import java.util.HashSet;
1616
import java.util.Set;
17+
import java.util.concurrent.TimeUnit;
1718

1819
import static io.restassured.RestAssured.get;
1920
import static io.restassured.RestAssured.post;
@@ -99,6 +100,37 @@ public void continueAsNew() throws InterruptedException {
99100
statusResponse.jsonPath().get("runtimeStatus");
100101
}
101102

103+
@Test
104+
public void continueAsNewExternalEvent() throws InterruptedException {
105+
String startOrchestrationPath = "api/ContinueAsNewExternalEvent";
106+
Response response = post(startOrchestrationPath);
107+
JsonPath jsonPath = response.jsonPath();
108+
109+
// send external event, it will cause the continue-as-new.
110+
String sendEventPostUri = jsonPath.get("sendEventPostUri");
111+
sendEventPostUri = sendEventPostUri.replace("{eventName}", "event");
112+
113+
// empty request body
114+
RestAssured
115+
.given()
116+
.contentType(ContentType.JSON) // Set the request content type
117+
.body("{}")
118+
.post(sendEventPostUri)
119+
.then()
120+
.statusCode(202);
121+
122+
//wait 5 seconds for the continue-as-new to start new orchestration
123+
TimeUnit.SECONDS.sleep(5);
124+
125+
response = post(startOrchestrationPath);
126+
jsonPath = response.jsonPath();
127+
String statusQueryGetUri = jsonPath.get("statusQueryGetUri");
128+
// polling 20 seconds
129+
// assert that the orchestration completed as expected, not enter an infinite loop
130+
boolean completed = pollingCheck(statusQueryGetUri, "Completed", null, Duration.ofSeconds(20));
131+
assertTrue(completed);
132+
}
133+
102134
@ParameterizedTest
103135
@ValueSource(booleans = {true, false})
104136
public void restart(boolean restartWithNewInstanceId) throws InterruptedException {

0 commit comments

Comments
 (0)