Skip to content

Commit 6df4084

Browse files
committed
schema version
1 parent 93fbfb4 commit 6df4084

File tree

4 files changed

+144
-4
lines changed

4 files changed

+144
-4
lines changed

packages/trigger-sdk/src/v3/wait.ts

Lines changed: 120 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import {
2424
HttpCallbackResultTypeFromSchema,
2525
HttpCallbackResult,
2626
tryCatch,
27+
inferSchemaIn,
28+
getSchemaParseFn,
2729
} from "@trigger.dev/core/v3";
2830
import { tracer } from "./tracer.js";
2931
import { conditionallyImportAndParsePacket } from "@trigger.dev/core/v3/utils/ioSerialization";
@@ -718,7 +720,7 @@ export const wait = {
718720
},
719721
{
720722
attributes: {
721-
[SemanticInternalAttributes.STYLE_ICON]: "wait-http-callback",
723+
[SemanticInternalAttributes.STYLE_ICON]: "wait",
722724
[SemanticInternalAttributes.ENTITY_TYPE]: "waitpoint",
723725
[SemanticInternalAttributes.ENTITY_ID]: waitpoint.id,
724726
...accessoryAttributes({
@@ -760,6 +762,123 @@ export const wait = {
760762
// 10. Receive the result here and import the packet, then get the result in the right format
761763
// 11. Make unwrap work
762764
},
765+
async forHttpCallbackWithSchema<TSchema extends HttpCallbackSchema>(
766+
schema: TSchema,
767+
callback: (url: string) => Promise<void>,
768+
options?: CreateWaitpointTokenRequestBody & {
769+
releaseConcurrency?: boolean;
770+
},
771+
requestOptions?: ApiRequestOptions
772+
): Promise<HttpCallbackResult<HttpCallbackResultTypeFromSchema<TSchema>>> {
773+
const ctx = taskContext.ctx;
774+
775+
if (!ctx) {
776+
throw new Error("wait.forHttpCallback can only be used from inside a task.run()");
777+
}
778+
779+
const apiClient = apiClientManager.clientOrThrow();
780+
781+
const waitpoint = await apiClient.createWaitpointHttpCallback(options ?? {}, requestOptions);
782+
783+
return tracer.startActiveSpan(
784+
`wait.forHttpCallback()`,
785+
async (span) => {
786+
const [error] = await tryCatch(callback(waitpoint.url));
787+
788+
if (error) {
789+
throw new Error(`You threw an error in your callback: ${error.message}`, {
790+
cause: error,
791+
});
792+
}
793+
794+
const response = await apiClient.waitForWaitpointToken({
795+
runFriendlyId: ctx.run.id,
796+
waitpointFriendlyId: waitpoint.id,
797+
releaseConcurrency: options?.releaseConcurrency,
798+
});
799+
800+
if (!response.success) {
801+
throw new Error(`Failed to wait for wait for HTTP callback ${waitpoint.id}`);
802+
}
803+
804+
const result = await runtime.waitUntil(waitpoint.id);
805+
806+
const data = result.output
807+
? await conditionallyImportAndParsePacket(
808+
{ data: result.output, dataType: result.outputType ?? "application/json" },
809+
apiClient
810+
)
811+
: undefined;
812+
813+
if (result.ok) {
814+
try {
815+
const parser = schema ? getSchemaParseFn<inferSchemaIn<TSchema>>(schema) : undefined;
816+
817+
if (!parser) {
818+
throw new Error("No parser found for schema");
819+
}
820+
821+
const parsedOutput = await parser(data);
822+
823+
return {
824+
ok: result.ok,
825+
output: parsedOutput,
826+
};
827+
} catch (error) {
828+
const err = error instanceof Error ? error : new Error(String(error));
829+
span.recordException(err);
830+
span.setStatus({
831+
code: SpanStatusCode.ERROR,
832+
});
833+
834+
return {
835+
ok: false,
836+
error: err,
837+
};
838+
}
839+
} else {
840+
const error = new WaitpointTimeoutError(data.message);
841+
842+
span.recordException(error);
843+
span.setStatus({
844+
code: SpanStatusCode.ERROR,
845+
});
846+
847+
return {
848+
ok: result.ok,
849+
error,
850+
};
851+
}
852+
},
853+
{
854+
attributes: {
855+
[SemanticInternalAttributes.STYLE_ICON]: "wait",
856+
[SemanticInternalAttributes.ENTITY_TYPE]: "waitpoint",
857+
[SemanticInternalAttributes.ENTITY_ID]: waitpoint.id,
858+
...accessoryAttributes({
859+
items: [
860+
{
861+
text: waitpoint.id,
862+
variant: "normal",
863+
},
864+
],
865+
style: "codepath",
866+
}),
867+
id: waitpoint.id,
868+
isCached: waitpoint.isCached,
869+
idempotencyKey: options?.idempotencyKey,
870+
idempotencyKeyTTL: options?.idempotencyKeyTTL,
871+
timeout: options?.timeout
872+
? typeof options.timeout === "string"
873+
? options.timeout
874+
: options.timeout.toISOString()
875+
: undefined,
876+
tags: options?.tags,
877+
url: waitpoint.url,
878+
},
879+
}
880+
);
881+
},
763882
};
764883

765884
function nameForWaitOptions(options: WaitForOptions): string {

pnpm-lock.yaml

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

references/hello-world/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
"trigger.dev": "workspace:*"
77
},
88
"dependencies": {
9-
"@trigger.dev/sdk": "workspace:*"
9+
"@trigger.dev/sdk": "workspace:*",
10+
"zod": "3.23.8"
1011
},
1112
"scripts": {
1213
"dev": "trigger dev"

references/hello-world/src/trigger/waits.ts

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { logger, wait, task, retry, idempotencyKeys, auth } from "@trigger.dev/sdk/v3";
2-
2+
import { z } from "zod";
33
type Token = {
44
status: "approved" | "pending" | "rejected";
55
};
@@ -153,6 +153,23 @@ export const waitHttpCallback = task({
153153
}
154154
);
155155

156-
logger.log("Wait for HTTP callback completed", result);
156+
if (!result.ok) {
157+
logger.log("Wait for HTTP callback failed", { error: result.error });
158+
} else {
159+
logger.log("Wait for HTTP callback completed", result);
160+
}
161+
162+
const result2 = await wait.forHttpCallbackWithSchema(
163+
z.object({ bar: z.string() }),
164+
async (url) => {
165+
logger.log(`Wait for HTTP callback ${url}`);
166+
}
167+
);
168+
169+
if (!result2.ok) {
170+
logger.log("Wait for HTTP callback failed", { error: result2.error });
171+
} else {
172+
logger.log("Wait for HTTP callback completed", result2);
173+
}
157174
},
158175
});

0 commit comments

Comments
 (0)