Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions examples/activity-sequence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import { TaskHubGrpcWorker } from "../src/worker/task-hub-grpc-worker";
const result3 = yield ctx.callActivity(hello, "London");
cities.push(result3);

ctx.setCustomStatus("sequence done");

return cities;
};

Expand Down
7 changes: 7 additions & 0 deletions src/task/context/orchestration-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,11 @@ export abstract class OrchestrationContext {
* @param saveEvents {boolean} A flag indicating whether to add any unprocessed external events in the new orchestration history.
*/
abstract continueAsNew(newInput: any, saveEvents: boolean): void;

/**
* Sets the custom status
*
* @param status {string} The new custom status
*/
abstract setCustomStatus(status: string): void;
}
14 changes: 14 additions & 0 deletions src/worker/orchestration-execute-result.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

import * as pb from "../proto/orchestrator_service_pb";

export class OrchestrationExecuteResult {
actions: pb.OrchestratorAction[];
customStatus: string;

constructor(actions: pb.OrchestratorAction[], customStatus: string) {
this.actions = actions;
this.customStatus = customStatus;
}
}
5 changes: 3 additions & 2 deletions src/worker/orchestration-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { enumValueToKey } from "../utils/enum.util";
import { getOrchestrationStatusStr, isEmpty } from "../utils/pb-helper.util";
import { OrchestratorNotRegisteredError } from "./exception/orchestrator-not-registered-error";
import { StopIterationError } from "./exception/stop-iteration-error";
import { OrchestrationExecuteResult } from "./orchestration-execute-result";
import { Registry } from "./registry";
import { RuntimeOrchestrationContext } from "./runtime-orchestration-context";

Expand All @@ -36,7 +37,7 @@ export class OrchestrationExecutor {
instanceId: string,
oldEvents: pb.HistoryEvent[],
newEvents: pb.HistoryEvent[],
): Promise<pb.OrchestratorAction[]> {
): Promise<OrchestrationExecuteResult> {
if (!newEvents?.length) {
throw new OrchestrationStateError("The new history event list must have at least one event in it");
}
Expand Down Expand Up @@ -79,7 +80,7 @@ export class OrchestrationExecutor {
const actions = ctx.getActions();
console.log(`${instanceId}: Returning ${actions.length} action(s)`);

return actions;
return new OrchestrationExecuteResult(actions, ctx._customStatus);
}

private async processEvent(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise<void> {
Expand Down
6 changes: 6 additions & 0 deletions src/worker/runtime-orchestration-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export class RuntimeOrchestrationContext extends OrchestrationContext {
_pendingEvents: Record<string, CompletableTask<any>[]>;
_newInput?: any;
_saveEvents: any;
_customStatus: string;

constructor(instanceId: string) {
super();
Expand All @@ -45,6 +46,7 @@ export class RuntimeOrchestrationContext extends OrchestrationContext {
this._pendingEvents = {};
this._newInput = undefined;
this._saveEvents = false;
this._customStatus = "";
}

get instanceId(): string {
Expand Down Expand Up @@ -330,4 +332,8 @@ export class RuntimeOrchestrationContext extends OrchestrationContext {

this.setContinuedAsNew(newInput, saveEvents);
}

setCustomStatus(status: string) {
this._customStatus = status;
}
}
7 changes: 5 additions & 2 deletions src/worker/task-hub-grpc-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,14 @@ export class TaskHubGrpcWorker {

try {
const executor = new OrchestrationExecutor(this._registry);
const actions = await executor.execute(req.getInstanceid(), req.getPasteventsList(), req.getNeweventsList());
const result = await executor.execute(req.getInstanceid(), req.getPasteventsList(), req.getNeweventsList());

res = new pb.OrchestratorResponse();
res.setInstanceid(req.getInstanceid());
res.setActionsList(actions);
res.setActionsList(result.actions);
const cs = new StringValue();
cs.setValue(result.customStatus);
res.setCustomstatus(cs);
} catch (e: any) {
console.error(e);
console.log(`An error occurred while trying to execute instance '${req.getInstanceid()}': ${e.message}`);
Expand Down
3 changes: 3 additions & 0 deletions test/e2e/orchestration.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ describe("Durable Functions", () => {
numbers.push(current);
}

ctx.setCustomStatus("foobaz");

return numbers;
};

Expand All @@ -81,6 +83,7 @@ describe("Durable Functions", () => {
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED);
expect(state?.serializedInput).toEqual(JSON.stringify(1));
expect(state?.serializedOutput).toEqual(JSON.stringify([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]));
expect(state?.serializedCustomStatus).toEqual(JSON.stringify("foobaz"));
}, 31000);

it("should be able to run fan-out/fan-in", async () => {
Expand Down
Loading