Skip to content

Commit 71d88b9

Browse files
committed
Implement unwrap() for httpCallback
1 parent 1e5350a commit 71d88b9

File tree

2 files changed

+146
-97
lines changed
  • packages/trigger-sdk/src/v3
  • references/hello-world/src/trigger

2 files changed

+146
-97
lines changed

packages/trigger-sdk/src/v3/wait.ts

Lines changed: 126 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,30 @@
1+
import { SpanStatusCode } from "@opentelemetry/api";
12
import {
2-
SemanticInternalAttributes,
33
accessoryAttributes,
4-
runtime,
54
apiClientManager,
65
ApiPromise,
76
ApiRequestOptions,
7+
CompleteWaitpointTokenResponseBody,
88
CreateWaitpointTokenRequestBody,
9+
CreateWaitpointTokenResponse,
910
CreateWaitpointTokenResponseBody,
11+
CursorPagePromise,
12+
flattenAttributes,
13+
HttpCallbackResult,
14+
ListWaitpointTokensQueryParams,
1015
mergeRequestOptions,
11-
CompleteWaitpointTokenResponseBody,
12-
WaitpointTokenTypedResult,
1316
Prettify,
17+
runtime,
18+
SemanticInternalAttributes,
1419
taskContext,
15-
ListWaitpointTokensQueryParams,
16-
CursorPagePromise,
17-
WaitpointTokenItem,
18-
flattenAttributes,
20+
tryCatch,
1921
WaitpointListTokenItem,
20-
WaitpointTokenStatus,
2122
WaitpointRetrieveTokenResponse,
22-
CreateWaitpointTokenResponse,
23-
HttpCallbackSchema,
24-
HttpCallbackResultTypeFromSchema,
25-
HttpCallbackResult,
26-
tryCatch,
27-
inferSchemaIn,
28-
getSchemaParseFn,
23+
WaitpointTokenStatus,
24+
WaitpointTokenTypedResult,
2925
} from "@trigger.dev/core/v3";
30-
import { tracer } from "./tracer.js";
3126
import { conditionallyImportAndParsePacket } from "@trigger.dev/core/v3/utils/ioSerialization";
32-
import { SpanStatusCode } from "@opentelemetry/api";
27+
import { tracer } from "./tracer.js";
3328

3429
/**
3530
* This creates a waitpoint token.
@@ -383,6 +378,29 @@ function printWaitBelowThreshold() {
383378
);
384379
}
385380

381+
class ManualWaitpointPromise<TOutput> extends Promise<WaitpointTokenTypedResult<TOutput>> {
382+
constructor(
383+
executor: (
384+
resolve: (
385+
value: WaitpointTokenTypedResult<TOutput> | PromiseLike<WaitpointTokenTypedResult<TOutput>>
386+
) => void,
387+
reject: (reason?: any) => void
388+
) => void
389+
) {
390+
super(executor);
391+
}
392+
393+
unwrap(): Promise<TOutput> {
394+
return this.then((result) => {
395+
if (result.ok) {
396+
return result.output;
397+
} else {
398+
throw new WaitpointTimeoutError(result.error.message);
399+
}
400+
});
401+
}
402+
}
403+
386404
export const wait = {
387405
for: async (options: WaitForOptions) => {
388406
const ctx = taskContext.ctx;
@@ -689,7 +707,7 @@ export const wait = {
689707
* @param requestOptions
690708
* @returns
691709
*/
692-
async forHttpCallback<TResult>(
710+
forHttpCallback<TResult>(
693711
callback: (url: string) => Promise<void>,
694712
options?: CreateWaitpointTokenRequestBody & {
695713
/**
@@ -702,94 +720,105 @@ export const wait = {
702720
releaseConcurrency?: boolean;
703721
},
704722
requestOptions?: ApiRequestOptions
705-
): Promise<HttpCallbackResult<TResult>> {
706-
const ctx = taskContext.ctx;
723+
): ManualWaitpointPromise<TResult> {
724+
return new ManualWaitpointPromise<TResult>(async (resolve, reject) => {
725+
try {
726+
const ctx = taskContext.ctx;
707727

708-
if (!ctx) {
709-
throw new Error("wait.forHttpCallback can only be used from inside a task.run()");
710-
}
711-
712-
const apiClient = apiClientManager.clientOrThrow();
713-
714-
const waitpoint = await apiClient.createWaitpointHttpCallback(options ?? {}, requestOptions);
715-
716-
return tracer.startActiveSpan(
717-
`wait.forHttpCallback()`,
718-
async (span) => {
719-
const [error] = await tryCatch(callback(waitpoint.url));
720-
721-
if (error) {
722-
throw new Error(`You threw an error in your callback: ${error.message}`, {
723-
cause: error,
724-
});
728+
if (!ctx) {
729+
throw new Error("wait.forHttpCallback can only be used from inside a task.run()");
725730
}
726731

727-
const response = await apiClient.waitForWaitpointToken({
728-
runFriendlyId: ctx.run.id,
729-
waitpointFriendlyId: waitpoint.id,
730-
releaseConcurrency: options?.releaseConcurrency,
731-
});
732+
const apiClient = apiClientManager.clientOrThrow();
732733

733-
if (!response.success) {
734-
throw new Error(`Failed to wait for wait for HTTP callback ${waitpoint.id}`);
735-
}
734+
const waitpoint = await apiClient.createWaitpointHttpCallback(
735+
options ?? {},
736+
requestOptions
737+
);
736738

737-
const result = await runtime.waitUntil(waitpoint.id);
739+
const result = await tracer.startActiveSpan(
740+
`wait.forHttpCallback()`,
741+
async (span) => {
742+
const [error] = await tryCatch(callback(waitpoint.url));
738743

739-
const data = result.output
740-
? await conditionallyImportAndParsePacket(
741-
{ data: result.output, dataType: result.outputType ?? "application/json" },
742-
apiClient
743-
)
744-
: undefined;
744+
if (error) {
745+
throw new Error(`You threw an error in your callback: ${error.message}`, {
746+
cause: error,
747+
});
748+
}
745749

746-
if (result.ok) {
747-
return {
748-
ok: result.ok,
749-
output: data,
750-
};
751-
} else {
752-
const error = new WaitpointTimeoutError(data.message);
750+
const response = await apiClient.waitForWaitpointToken({
751+
runFriendlyId: ctx.run.id,
752+
waitpointFriendlyId: waitpoint.id,
753+
releaseConcurrency: options?.releaseConcurrency,
754+
});
753755

754-
span.recordException(error);
755-
span.setStatus({
756-
code: SpanStatusCode.ERROR,
757-
});
756+
if (!response.success) {
757+
throw new Error(`Failed to wait for wait for HTTP callback ${waitpoint.id}`);
758+
}
758759

759-
return {
760-
ok: result.ok,
761-
error,
762-
};
763-
}
764-
},
765-
{
766-
attributes: {
767-
[SemanticInternalAttributes.STYLE_ICON]: "wait",
768-
[SemanticInternalAttributes.ENTITY_TYPE]: "waitpoint",
769-
[SemanticInternalAttributes.ENTITY_ID]: waitpoint.id,
770-
...accessoryAttributes({
771-
items: [
772-
{
773-
text: waitpoint.id,
774-
variant: "normal",
775-
},
776-
],
777-
style: "codepath",
778-
}),
779-
id: waitpoint.id,
780-
isCached: waitpoint.isCached,
781-
idempotencyKey: options?.idempotencyKey,
782-
idempotencyKeyTTL: options?.idempotencyKeyTTL,
783-
timeout: options?.timeout
784-
? typeof options.timeout === "string"
785-
? options.timeout
786-
: options.timeout.toISOString()
787-
: undefined,
788-
tags: options?.tags,
789-
url: waitpoint.url,
790-
},
760+
const result = await runtime.waitUntil(waitpoint.id);
761+
762+
const data = result.output
763+
? await conditionallyImportAndParsePacket(
764+
{ data: result.output, dataType: result.outputType ?? "application/json" },
765+
apiClient
766+
)
767+
: undefined;
768+
769+
if (result.ok) {
770+
return {
771+
ok: result.ok,
772+
output: data,
773+
};
774+
} else {
775+
const error = new WaitpointTimeoutError(data.message);
776+
777+
span.recordException(error);
778+
span.setStatus({
779+
code: SpanStatusCode.ERROR,
780+
});
781+
782+
return {
783+
ok: result.ok,
784+
error,
785+
};
786+
}
787+
},
788+
{
789+
attributes: {
790+
[SemanticInternalAttributes.STYLE_ICON]: "wait",
791+
[SemanticInternalAttributes.ENTITY_TYPE]: "waitpoint",
792+
[SemanticInternalAttributes.ENTITY_ID]: waitpoint.id,
793+
...accessoryAttributes({
794+
items: [
795+
{
796+
text: waitpoint.id,
797+
variant: "normal",
798+
},
799+
],
800+
style: "codepath",
801+
}),
802+
id: waitpoint.id,
803+
isCached: waitpoint.isCached,
804+
idempotencyKey: options?.idempotencyKey,
805+
idempotencyKeyTTL: options?.idempotencyKeyTTL,
806+
timeout: options?.timeout
807+
? typeof options.timeout === "string"
808+
? options.timeout
809+
: options.timeout.toISOString()
810+
: undefined,
811+
tags: options?.tags,
812+
url: waitpoint.url,
813+
},
814+
}
815+
);
816+
817+
resolve(result);
818+
} catch (error) {
819+
reject(error);
791820
}
792-
);
821+
});
793822
},
794823
};
795824

references/hello-world/src/trigger/waits.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,26 @@ export const waitHttpCallback = task({
178178

179179
const imageUrl = prediction.output.output;
180180
logger.log("Image URL", imageUrl);
181+
182+
//same again but with unwrapping
183+
const result2 = await wait
184+
.forHttpCallback<Prediction>(
185+
async (url) => {
186+
await replicate.predictions.create({
187+
version: "27b93a2413e7f36cd83da926f3656280b2931564ff050bf9575f1fdf9bcd7478",
188+
input: {
189+
prompt: "A painting of a cat by Any Warhol",
190+
},
191+
webhook: url,
192+
});
193+
},
194+
{
195+
timeout: "60s",
196+
}
197+
)
198+
.unwrap();
199+
200+
logger.log("Result2", { result2 });
181201
}
182202

183203
const result = await wait.forHttpCallback<{ foo: string }>(

0 commit comments

Comments
 (0)