Skip to content

Commit beca6ea

Browse files
committed
Reworked http callback to be a create call then just use wait.forToken()
1 parent c0d01e4 commit beca6ea

File tree

2 files changed

+109
-195
lines changed
  • packages/trigger-sdk/src/v3
  • references/hello-world/src/trigger

2 files changed

+109
-195
lines changed

packages/trigger-sdk/src/v3/wait.ts

Lines changed: 102 additions & 162 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,14 @@ import {
55
ApiPromise,
66
ApiRequestOptions,
77
CompleteWaitpointTokenResponseBody,
8+
CreateWaitpointHttpCallbackResponseBody,
89
CreateWaitpointTokenRequestBody,
910
CreateWaitpointTokenResponse,
1011
CreateWaitpointTokenResponseBody,
1112
CursorPagePromise,
1213
flattenAttributes,
13-
HttpCallbackResult,
1414
ListWaitpointTokensQueryParams,
1515
mergeRequestOptions,
16-
Prettify,
1716
runtime,
1817
SemanticInternalAttributes,
1918
taskContext,
@@ -82,6 +81,106 @@ function createToken(
8281
return apiClient.createWaitpointToken(options ?? {}, $requestOptions);
8382
}
8483

84+
/**
85+
* This creates an HTTP callback that allows you to start some work on another API (or one of your own services)
86+
* and continue the run when a callback URL we give you is hit with the result.
87+
*
88+
* You should send the callback URL to the other service, and then that service will
89+
* make a request to the callback URL with the result.
90+
*
91+
* @example
92+
*
93+
* ```ts
94+
* // Create a waitpoint and pass the callback URL to the other service
95+
const { token, data } = await wait.createHttpCallback(
96+
async (url) => {
97+
//pass the provided URL to Replicate's webhook
98+
return replicate.predictions.create({
99+
version: "27b93a2413e7f36cd83da926f3656280b2931564ff050bf9575f1fdf9bcd7478",
100+
input: {
101+
prompt: "A painting of a cat by Any Warhol",
102+
},
103+
// pass the provided URL to Replicate's webhook, so they can "callback"
104+
webhook: url,
105+
webhook_events_filter: ["completed"],
106+
});
107+
},
108+
{
109+
timeout: "10m",
110+
}
111+
);
112+
113+
// Now you can wait for the token to complete
114+
// This will pause the run until the token is completed (by the other service calling the callback URL)
115+
const prediction = await wait.forToken<Prediction>(token);
116+
117+
if (!prediction.ok) {
118+
throw new Error("Failed to create prediction");
119+
}
120+
121+
//the value of prediction is the body of the webook that Replicate sent
122+
const result = prediction.output;
123+
* ```
124+
*
125+
* @param callback A function that gives you a URL you should send to the other service (it will call back with the result)
126+
* @param options - The options for the waitpoint.
127+
* @param requestOptions - The request options for the waitpoint.
128+
* @returns A promise that returns the token and anything you returned from your callback.
129+
*/
130+
async function createHttpCallback<TCallbackResult>(
131+
callback: (url: string) => Promise<TCallbackResult>,
132+
options?: CreateWaitpointTokenRequestBody,
133+
requestOptions?: ApiRequestOptions
134+
): Promise<{
135+
/** The token that you can use to wait for the callback */
136+
token: CreateWaitpointHttpCallbackResponseBody;
137+
/** Whatever you returned from the function */
138+
data: TCallbackResult;
139+
}> {
140+
const apiClient = apiClientManager.clientOrThrow();
141+
142+
return tracer.startActiveSpan(
143+
`wait.createHttpCallback()`,
144+
async (span) => {
145+
const waitpoint = await apiClient.createWaitpointHttpCallback(options ?? {}, requestOptions);
146+
147+
span.setAttribute("id", waitpoint.id);
148+
span.setAttribute("isCached", waitpoint.isCached);
149+
span.setAttribute("url", waitpoint.url);
150+
151+
const callbackResult = await tracer.startActiveSpan(
152+
`callback()`,
153+
async () => {
154+
return callback(waitpoint.url);
155+
},
156+
{
157+
attributes: {
158+
[SemanticInternalAttributes.STYLE_ICON]: "function",
159+
id: waitpoint.id,
160+
url: waitpoint.url,
161+
isCached: waitpoint.isCached,
162+
},
163+
}
164+
);
165+
166+
return { token: waitpoint, data: callbackResult };
167+
},
168+
{
169+
attributes: {
170+
[SemanticInternalAttributes.STYLE_ICON]: "wait-http-callback",
171+
idempotencyKey: options?.idempotencyKey,
172+
idempotencyKeyTTL: options?.idempotencyKeyTTL,
173+
timeout: options?.timeout
174+
? typeof options.timeout === "string"
175+
? options.timeout
176+
: options.timeout.toISOString()
177+
: undefined,
178+
tags: options?.tags,
179+
},
180+
}
181+
);
182+
}
183+
85184
/**
86185
* Lists waitpoint tokens with optional filtering and pagination.
87186
* You can iterate over all the items in the result using a for-await-of loop (you don't need to think about pagination).
@@ -678,161 +777,7 @@ export const wait = {
678777
}
679778
});
680779
},
681-
/**
682-
* This allows you to start some work on another API (or one of your own services)
683-
* and continue the run when a callback URL we give you is hit with the result.
684-
*
685-
* You should send the callback URL to the other service, and then that service will
686-
* make a request to the callback URL with the result.
687-
*
688-
* @example
689-
*
690-
* ```ts
691-
* //wait for the prediction to complete
692-
const prediction = await wait.forHttpCallback<Prediction>(
693-
async (url) => {
694-
//pass the provided URL to Replicate's webhook
695-
await replicate.predictions.create({
696-
version: "27b93a2413e7f36cd83da926f3656280b2931564ff050bf9575f1fdf9bcd7478",
697-
input: {
698-
prompt: "A painting of a cat by Any Warhol",
699-
},
700-
// pass the provided URL to Replicate's webhook, so they can "callback"
701-
webhook: url,
702-
webhook_events_filter: ["completed"],
703-
});
704-
},
705-
{
706-
timeout: "10m",
707-
}
708-
);
709-
710-
if (!prediction.ok) {
711-
throw new Error("Failed to create prediction");
712-
}
713-
714-
//the value of prediction is the body of the webook that Replicate sent
715-
const result = prediction.output;
716-
* ```
717-
*
718-
* @param callback A function that gives you a URL you can use to send the result to.
719-
* @param options - The options for the waitpoint.
720-
* @param requestOptions - The request options for the waitpoint.
721-
* @returns A promise that resolves to the result of the waitpoint. You can use `.unwrap()` to get the result and an error will throw.
722-
*/
723-
forHttpCallback<TResult>(
724-
callback: (url: string) => Promise<void>,
725-
options?: CreateWaitpointTokenRequestBody & {
726-
/**
727-
* If set to true, this will cause the waitpoint to release the current run from the queue's concurrency.
728-
*
729-
* This is useful if you want to allow other runs to execute while waiting
730-
*
731-
* @default false
732-
*/
733-
releaseConcurrency?: boolean;
734-
},
735-
requestOptions?: ApiRequestOptions
736-
): ManualWaitpointPromise<TResult> {
737-
return new ManualWaitpointPromise<TResult>(async (resolve, reject) => {
738-
try {
739-
const ctx = taskContext.ctx;
740-
741-
if (!ctx) {
742-
throw new Error("wait.forHttpCallback can only be used from inside a task.run()");
743-
}
744-
745-
const apiClient = apiClientManager.clientOrThrow();
746-
747-
const waitpoint = await apiClient.createWaitpointHttpCallback(
748-
options ?? {},
749-
requestOptions
750-
);
751-
752-
const result = await tracer.startActiveSpan(
753-
`wait.forHttpCallback()`,
754-
async (span) => {
755-
const [error] = await tryCatch(callback(waitpoint.url));
756-
757-
if (error) {
758-
throw new Error(`You threw an error in your callback: ${error.message}`, {
759-
cause: error,
760-
});
761-
}
762-
763-
const response = await apiClient.waitForWaitpointToken({
764-
runFriendlyId: ctx.run.id,
765-
waitpointFriendlyId: waitpoint.id,
766-
releaseConcurrency: options?.releaseConcurrency,
767-
});
768-
769-
if (!response.success) {
770-
throw new Error(`Failed to wait for wait for HTTP callback ${waitpoint.id}`);
771-
}
772-
773-
const result = await runtime.waitUntil(waitpoint.id);
774-
775-
const data = result.output
776-
? await conditionallyImportAndParsePacket(
777-
{ data: result.output, dataType: result.outputType ?? "application/json" },
778-
apiClient
779-
)
780-
: undefined;
781-
782-
if (result.ok) {
783-
return {
784-
ok: result.ok,
785-
output: data,
786-
};
787-
} else {
788-
const error = new WaitpointTimeoutError(data?.message ?? "Timeout error");
789-
790-
span.recordException(error);
791-
span.setStatus({
792-
code: SpanStatusCode.ERROR,
793-
});
794-
795-
return {
796-
ok: result.ok,
797-
error,
798-
};
799-
}
800-
},
801-
{
802-
attributes: {
803-
[SemanticInternalAttributes.STYLE_ICON]: "wait",
804-
[SemanticInternalAttributes.ENTITY_TYPE]: "waitpoint",
805-
[SemanticInternalAttributes.ENTITY_ID]: waitpoint.id,
806-
...accessoryAttributes({
807-
items: [
808-
{
809-
text: waitpoint.id,
810-
variant: "normal",
811-
},
812-
],
813-
style: "codepath",
814-
}),
815-
id: waitpoint.id,
816-
isCached: waitpoint.isCached,
817-
idempotencyKey: options?.idempotencyKey,
818-
idempotencyKeyTTL: options?.idempotencyKeyTTL,
819-
timeout: options?.timeout
820-
? typeof options.timeout === "string"
821-
? options.timeout
822-
: options.timeout.toISOString()
823-
: undefined,
824-
tags: options?.tags,
825-
url: waitpoint.url,
826-
},
827-
}
828-
);
829-
830-
resolve(result);
831-
} catch (error) {
832-
reject(error);
833-
}
834-
});
835-
},
780+
createHttpCallback,
836781
};
837782

838783
function nameForWaitOptions(options: WaitForOptions): string {
@@ -898,8 +843,3 @@ function calculateDurationInMs(options: WaitForOptions): number {
898843

899844
throw new Error("Invalid options");
900845
}
901-
902-
type RequestOptions = {
903-
to: (url: string) => Promise<void>;
904-
timeout: WaitForOptions;
905-
};

references/hello-world/src/trigger/waits.ts

Lines changed: 7 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -152,10 +152,10 @@ export const waitHttpCallback = task({
152152
auth: process.env.REPLICATE_API_KEY,
153153
});
154154

155-
const prediction = await wait.forHttpCallback<Prediction>(
155+
const { token, data } = await wait.createHttpCallback(
156156
async (url) => {
157157
//pass the provided URL to Replicate's webhook
158-
await replicate.predictions.create({
158+
return replicate.predictions.create({
159159
version: "27b93a2413e7f36cd83da926f3656280b2931564ff050bf9575f1fdf9bcd7478",
160160
input: {
161161
prompt: "A painting of a cat by Any Warhol",
@@ -167,8 +167,12 @@ export const waitHttpCallback = task({
167167
},
168168
{
169169
timeout: "10m",
170+
tags: ["replicate"],
170171
}
171172
);
173+
logger.log("Create result", { token, data });
174+
175+
const prediction = await wait.forToken<Prediction>(token);
172176

173177
if (!prediction.ok) {
174178
throw new Error("Failed to create prediction");
@@ -180,39 +184,9 @@ export const waitHttpCallback = task({
180184
logger.log("Image URL", imageUrl);
181185

182186
//same again but with unwrapping
183-
const result2 = await wait
184-
.forHttpCallback<Prediction>(
185-
async (url) => {
186-
await replicate.predictions.create({
187-
version: "27b93a2413e7f36cd83da926f3656280b2931564ff050bf9575f1fdf9bcd7478",
188-
input: {
189-
prompt: "A painting of a cat by Any Warhol",
190-
},
191-
webhook: url,
192-
});
193-
},
194-
{
195-
timeout: "60s",
196-
}
197-
)
198-
.unwrap();
187+
const result2 = await wait.forToken<Prediction>(token).unwrap();
199188

200189
logger.log("Result2", { result2 });
201190
}
202-
203-
const result = await wait.forHttpCallback<{ foo: string }>(
204-
async (url) => {
205-
logger.log(`Wait for HTTP callback ${url}`);
206-
},
207-
{
208-
timeout: "60s",
209-
}
210-
);
211-
212-
if (!result.ok) {
213-
logger.log("Wait for HTTP callback failed", { error: result.error });
214-
} else {
215-
logger.log("Wait for HTTP callback completed", result);
216-
}
217191
},
218192
});

0 commit comments

Comments
 (0)