Skip to content

Commit b9edd2e

Browse files
committed
Initial implentation ofr wait.forHttpCallback()
1 parent 1c09fbe commit b9edd2e

File tree

6 files changed

+263
-42
lines changed

6 files changed

+263
-42
lines changed
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
import { json } from "@remix-run/server-runtime";
2+
import {
3+
type CreateWaitpointHttpCallbackResponseBody,
4+
CreateWaitpointTokenRequestBody,
5+
} from "@trigger.dev/core/v3";
6+
import { WaitpointId } from "@trigger.dev/core/v3/isomorphic";
7+
import { env } from "~/env.server";
8+
import { createWaitpointTag, MAX_TAGS_PER_WAITPOINT } from "~/models/waitpointTag.server";
9+
import {
10+
ApiWaitpointListPresenter,
11+
ApiWaitpointListSearchParams,
12+
} from "~/presenters/v3/ApiWaitpointListPresenter.server";
13+
import { type AuthenticatedEnvironment } from "~/services/apiAuth.server";
14+
import {
15+
createActionApiRoute,
16+
createLoaderApiRoute,
17+
} from "~/services/routeBuilders/apiBuilder.server";
18+
import { parseDelay } from "~/utils/delays";
19+
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
20+
import { engine } from "~/v3/runEngine.server";
21+
import { ServiceValidationError } from "~/v3/services/baseService.server";
22+
23+
export const loader = createLoaderApiRoute(
24+
{
25+
searchParams: ApiWaitpointListSearchParams,
26+
findResource: async () => 1, // This is a dummy function, we don't need to find a resource
27+
},
28+
async ({ searchParams, authentication }) => {
29+
const presenter = new ApiWaitpointListPresenter();
30+
const result = await presenter.call(authentication.environment, "HTTP_CALLBACK", searchParams);
31+
32+
return json(result);
33+
}
34+
);
35+
36+
const { action } = createActionApiRoute(
37+
{
38+
body: CreateWaitpointTokenRequestBody,
39+
maxContentLength: 1024 * 10, // 10KB
40+
method: "POST",
41+
},
42+
async ({ authentication, body }) => {
43+
try {
44+
const idempotencyKeyExpiresAt = body.idempotencyKeyTTL
45+
? resolveIdempotencyKeyTTL(body.idempotencyKeyTTL)
46+
: undefined;
47+
48+
const timeout = await parseDelay(body.timeout);
49+
50+
//upsert tags
51+
let tags: { id: string; name: string }[] = [];
52+
const bodyTags = typeof body.tags === "string" ? [body.tags] : body.tags;
53+
54+
if (bodyTags && bodyTags.length > MAX_TAGS_PER_WAITPOINT) {
55+
throw new ServiceValidationError(
56+
`Waitpoints can only have ${MAX_TAGS_PER_WAITPOINT} tags, you're trying to set ${bodyTags.length}.`
57+
);
58+
}
59+
60+
if (bodyTags && bodyTags.length > 0) {
61+
for (const tag of bodyTags) {
62+
const tagRecord = await createWaitpointTag({
63+
tag,
64+
environmentId: authentication.environment.id,
65+
projectId: authentication.environment.projectId,
66+
});
67+
if (tagRecord) {
68+
tags.push(tagRecord);
69+
}
70+
}
71+
}
72+
73+
const result = await engine.createManualWaitpoint({
74+
environmentId: authentication.environment.id,
75+
projectId: authentication.environment.projectId,
76+
idempotencyKey: body.idempotencyKey,
77+
idempotencyKeyExpiresAt,
78+
timeout,
79+
resolver: "HTTP_CALLBACK",
80+
tags: bodyTags,
81+
});
82+
83+
return json<CreateWaitpointHttpCallbackResponseBody>(
84+
{
85+
id: WaitpointId.toFriendlyId(result.waitpoint.id),
86+
url: `${
87+
env.API_ORIGIN ?? env.APP_ORIGIN
88+
}/api/v1/waitpoints/http-callback/${WaitpointId.toFriendlyId(
89+
result.waitpoint.id
90+
)}/callback`,
91+
isCached: result.isCached,
92+
},
93+
{ status: 200 }
94+
);
95+
} catch (error) {
96+
if (error instanceof ServiceValidationError) {
97+
return json({ error: error.message }, { status: 422 });
98+
} else if (error instanceof Error) {
99+
return json({ error: error.message }, { status: 500 });
100+
}
101+
102+
return json({ error: "Something went wrong" }, { status: 500 });
103+
}
104+
}
105+
);
106+
107+
export { action };

packages/core/src/v3/apiClient/index.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
CreateEnvironmentVariableRequestBody,
1313
CreateScheduleOptions,
1414
CreateUploadPayloadUrlResponseBody,
15+
CreateWaitpointHttpCallbackResponseBody,
1516
CreateWaitpointTokenRequestBody,
1617
CreateWaitpointTokenResponseBody,
1718
DeletedScheduleObject,
@@ -790,6 +791,24 @@ export class ApiClient {
790791
);
791792
}
792793

794+
createWaitpointHttpCallback(
795+
options: CreateWaitpointTokenRequestBody,
796+
requestOptions?: ZodFetchOptions
797+
) {
798+
return zodfetch(
799+
CreateWaitpointHttpCallbackResponseBody,
800+
`${this.baseUrl}/engine/v1/waitpoints/http-callback`,
801+
{
802+
method: "POST",
803+
headers: this.#getHeaders(false),
804+
body: JSON.stringify(options),
805+
},
806+
{
807+
...mergeRequestOptions(this.defaultRequestOptions, requestOptions),
808+
}
809+
) as ApiPromise<CreateWaitpointHttpCallbackResponseBody>;
810+
}
811+
793812
async waitForDuration(
794813
runId: string,
795814
body: WaitForDurationRequestBody,

packages/core/src/v3/schemas/api.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1010,6 +1010,15 @@ export const WaitForWaitpointTokenResponseBody = z.object({
10101010
});
10111011
export type WaitForWaitpointTokenResponseBody = z.infer<typeof WaitForWaitpointTokenResponseBody>;
10121012

1013+
export const CreateWaitpointHttpCallbackResponseBody = z.object({
1014+
id: z.string(),
1015+
url: z.string(),
1016+
isCached: z.boolean(),
1017+
});
1018+
export type CreateWaitpointHttpCallbackResponseBody = z.infer<
1019+
typeof CreateWaitpointHttpCallbackResponseBody
1020+
>;
1021+
10131022
export const WaitForDurationRequestBody = z.object({
10141023
/**
10151024
* An optional idempotency key for the waitpoint.

packages/core/src/v3/types/waitpoints.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ export type HttpCallbackResultTypeFromSchema<TSchema extends HttpCallbackSchema>
66
export type HttpCallbackResult<TResult> =
77
| {
88
ok: true;
9-
result: TResult;
9+
output: TResult;
1010
}
1111
| {
1212
ok: false;

packages/trigger-sdk/src/v3/wait.ts

Lines changed: 111 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import {
2323
HttpCallbackSchema,
2424
HttpCallbackResultTypeFromSchema,
2525
HttpCallbackResult,
26+
tryCatch,
2627
} from "@trigger.dev/core/v3";
2728
import { tracer } from "./tracer.js";
2829
import { conditionallyImportAndParsePacket } from "@trigger.dev/core/v3/utils/ioSerialization";
@@ -312,47 +313,6 @@ async function completeToken<T>(
312313
return apiClient.completeWaitpointToken(tokenId, { data }, $requestOptions);
313314
}
314315

315-
async function forHttpCallback<TResult>(
316-
callback: (url: string) => Promise<void>,
317-
options?: {
318-
timeout?: string | Date | undefined;
319-
}
320-
): Promise<HttpCallbackResult<TResult>> {
321-
//TODO:
322-
// Support a schema passed in, infer the type, or a generic supplied type
323-
// Support a timeout passed in
324-
// 1. Make an API call to engine.trigger.dev/v1/waitpoints/http-callback/create. New Waitpoint type "HTTPCallback"
325-
// 2. Return the url and a waitpoint id (but don't block the run yet)
326-
// 3. Create a span for the main call
327-
// 4. Set the url and waitpoint entity type and id as attributes on the parent span
328-
// 5. Create a span around the callback
329-
// 6. Deal with errors thrown in the callback use `tryCatch()`
330-
// 7. If that callback is successfully called, wait for the waitpoint with an API call to engine.trigger.dev/v1/waitpoints/http-callback/{waitpointId}/block
331-
// 8. Wait for the waitpoint in the runtime
332-
// 9. On the backend when the API is hit, complete the waitpoint with the result api.trigger.dev/v1/waitpoints/http-callback/{waitpointId}/callback
333-
// 10. Receive the result here and import the packet, then get the result in the right format
334-
// 11. Make unwrap work
335-
336-
const url = "https://trigger.dev/wait/success";
337-
338-
const result = await callback(url);
339-
340-
return {
341-
ok: true,
342-
output: result,
343-
} as any;
344-
}
345-
346-
async function forHttpCallbackWithSchema<TSchema extends HttpCallbackSchema>(
347-
schema: TSchema,
348-
callback: (successUrl: string, failureUrl: string) => Promise<void>,
349-
options?: {
350-
timeout?: string | Date | undefined;
351-
}
352-
): Promise<HttpCallbackResult<HttpCallbackResultTypeFromSchema<TSchema>>> {
353-
return {} as any;
354-
}
355-
356316
export type CommonWaitOptions = {
357317
/**
358318
* An optional idempotency key for the waitpoint.
@@ -690,6 +650,116 @@ export const wait = {
690650
}
691651
);
692652
},
653+
async forHttpCallback<TResult>(
654+
callback: (url: string) => Promise<void>,
655+
options?: CreateWaitpointTokenRequestBody & {
656+
releaseConcurrency?: boolean;
657+
},
658+
requestOptions?: ApiRequestOptions
659+
): Promise<HttpCallbackResult<TResult>> {
660+
const ctx = taskContext.ctx;
661+
662+
if (!ctx) {
663+
throw new Error("wait.forHttpCallback can only be used from inside a task.run()");
664+
}
665+
666+
const apiClient = apiClientManager.clientOrThrow();
667+
668+
const waitpoint = await apiClient.createWaitpointHttpCallback(options ?? {}, requestOptions);
669+
670+
return tracer.startActiveSpan(
671+
`wait.forHttpCallback()`,
672+
async (span) => {
673+
const [error] = await tryCatch(callback(waitpoint.url));
674+
675+
if (error) {
676+
throw new Error(`You threw an error in your callback: ${error.message}`, {
677+
cause: error,
678+
});
679+
}
680+
681+
const response = await apiClient.waitForWaitpointToken({
682+
runFriendlyId: ctx.run.id,
683+
waitpointFriendlyId: waitpoint.id,
684+
releaseConcurrency: options?.releaseConcurrency,
685+
});
686+
687+
if (!response.success) {
688+
throw new Error(`Failed to wait for wait for HTTP callback ${waitpoint.id}`);
689+
}
690+
691+
const result = await runtime.waitUntil(waitpoint.id);
692+
693+
const data = result.output
694+
? await conditionallyImportAndParsePacket(
695+
{ data: result.output, dataType: result.outputType ?? "application/json" },
696+
apiClient
697+
)
698+
: undefined;
699+
700+
if (result.ok) {
701+
return {
702+
ok: result.ok,
703+
output: data,
704+
};
705+
} else {
706+
const error = new WaitpointTimeoutError(data.message);
707+
708+
span.recordException(error);
709+
span.setStatus({
710+
code: SpanStatusCode.ERROR,
711+
});
712+
713+
return {
714+
ok: result.ok,
715+
error,
716+
};
717+
}
718+
},
719+
{
720+
attributes: {
721+
[SemanticInternalAttributes.STYLE_ICON]: "wait-http-callback",
722+
[SemanticInternalAttributes.ENTITY_TYPE]: "waitpoint",
723+
[SemanticInternalAttributes.ENTITY_ID]: waitpoint.id,
724+
...accessoryAttributes({
725+
items: [
726+
{
727+
text: waitpoint.id,
728+
variant: "normal",
729+
},
730+
],
731+
style: "codepath",
732+
}),
733+
id: waitpoint.id,
734+
isCached: waitpoint.isCached,
735+
idempotencyKey: options?.idempotencyKey,
736+
idempotencyKeyTTL: options?.idempotencyKeyTTL,
737+
timeout: options?.timeout
738+
? typeof options.timeout === "string"
739+
? options.timeout
740+
: options.timeout.toISOString()
741+
: undefined,
742+
tags: options?.tags,
743+
url: waitpoint.url,
744+
},
745+
}
746+
);
747+
748+
//TODO:
749+
// Support a schema passed in, infer the type, or a generic supplied type
750+
// Support a timeout passed in
751+
// 1. Make an API call to engine.trigger.dev/v1/waitpoints/http-callback/create. New Waitpoint type "HTTPCallback"
752+
// 2. Return the url and a waitpoint id (but don't block the run yet)
753+
// 3. Create a span for the main call
754+
// 4. Set the url and waitpoint entity type and id as attributes on the parent span
755+
// 5. Create a span around the callback
756+
// 6. Deal with errors thrown in the callback use `tryCatch()`
757+
// 7. If that callback is successfully called, wait for the waitpoint with an API call to engine.trigger.dev/v1/waitpoints/http-callback/{waitpointId}/block
758+
// 8. Wait for the waitpoint in the runtime
759+
// 9. On the backend when the API is hit, complete the waitpoint with the result api.trigger.dev/v1/waitpoints/http-callback/{waitpointId}/callback
760+
// 10. Receive the result here and import the packet, then get the result in the right format
761+
// 11. Make unwrap work
762+
},
693763
};
694764

695765
function nameForWaitOptions(options: WaitForOptions): string {

references/hello-world/src/trigger/waits.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,3 +140,19 @@ export const waitForDuration = task({
140140
);
141141
},
142142
});
143+
144+
export const waitHttpCallback = task({
145+
id: "wait-http-callback",
146+
run: async () => {
147+
const result = await wait.forHttpCallback<{ foo: string }>(
148+
async (url) => {
149+
logger.log(`Wait for HTTP callback ${url}`);
150+
},
151+
{
152+
timeout: "60s",
153+
}
154+
);
155+
156+
logger.log("Wait for HTTP callback completed", result);
157+
},
158+
});

0 commit comments

Comments
 (0)