diff --git a/references/d3-chat/src/trigger/chat.ts b/references/d3-chat/src/trigger/chat.ts index aed8ebec11..67654bea9e 100644 --- a/references/d3-chat/src/trigger/chat.ts +++ b/references/d3-chat/src/trigger/chat.ts @@ -1,9 +1,16 @@ import { anthropic } from "@ai-sdk/anthropic"; import { openai } from "@ai-sdk/openai"; import { ai } from "@trigger.dev/sdk/ai"; -import { logger, metadata, schemaTask, tasks, wait } from "@trigger.dev/sdk/v3"; +import { logger, metadata, runs, schemaTask, tasks, wait } from "@trigger.dev/sdk/v3"; import { sql } from "@vercel/postgres"; -import { streamText, TextStreamPart, tool } from "ai"; +import { + CoreMessage, + createDataStream, + DataStreamWriter, + streamText, + TextStreamPart, + tool, +} from "ai"; import { nanoid } from "nanoid"; import { z } from "zod"; import { sendSQLApprovalMessage } from "../lib/slack"; @@ -267,3 +274,347 @@ export const interruptibleChat = schemaTask({ } }, }); + +async function createStreamWithProvider(params: { + model: ReturnType | ReturnType; + messages: CoreMessage[]; + message_request_id: string; + chat_id: string; + userId?: string; +}) { + const { model, messages, message_request_id, chat_id, userId } = params; + + return new Promise((resolve, reject) => { + const dataStreamResponse = createDataStream({ + execute: async (dataStream) => { + const result = streamText({ + model, + system: "This is the system prompt, please be nice.", + messages, + maxSteps: 20, + toolCallStreaming: true, + onError: (error) => { + logger.error("Error in chatStream task (streamText)", { + error: error instanceof Error ? error.message : "Unknown error", + stack: error instanceof Error ? error.stack : undefined, + provider: model.provider, + }); + reject(error); + }, + onChunk: async (chunk) => { + console.log("Chunk:", chunk); + }, + onFinish: async ({ response, reasoning }) => { + metadata.flush(); + logger.info("AI stream finished", { + chat_id, + userId, + messageCount: response.messages.length, + provider: model.provider, + }); + + if (userId) { + try { + // Pretend to save messages + await new Promise((resolve) => setTimeout(resolve, 1000)); + + logger.info("Successfully saved AI response messages", { + chat_id, + userId, + messageCount: response.messages.length, + message: JSON.stringify(response.messages, null, 2), + provider: model.provider, + }); + } catch (error) { + logger.error("Failed to save AI response messages", { + error: error instanceof Error ? error.message : "Unknown error", + stack: error instanceof Error ? error.stack : undefined, + chat_id, + userId, + provider: model.provider, + }); + } + } + }, + }); + + result.consumeStream(); + + result.mergeIntoDataStream(dataStream, { + sendReasoning: true, + }); + }, + onError: (error) => { + logger.error("Error in chatStream task (createDataStream)", { + error: error instanceof Error ? error.message : "Unknown error", + stack: error instanceof Error ? error.stack : undefined, + provider: model.provider, + }); + reject(error); + return error instanceof Error ? error.message : String(error); + }, + }); + + // Process the stream + (async () => { + try { + const stream = await metadata.stream("dataStream", dataStreamResponse); + let fullResponse = ""; + + for await (const chunk of stream) { + fullResponse += chunk; + } + + // Only resolve if we haven't rejected due to an error + resolve(fullResponse); + } catch (error) { + reject(error); + } + })(); + }); +} + +export const chatStream = schemaTask({ + id: "chat-stream", + description: "Stream data from the AI SDK and use tools", + schema: z.object({ + chat_id: z.string().default("chat"), + messages: z.array(z.unknown()).describe("Array of chat messages"), + message_request_id: z.string().describe("Unique identifier for the message request"), + model: z.string().default("claude-3-7-sonnet-20250219"), + userId: z.string().optional().describe("User ID for authentication"), + existingProject: z.boolean().default(false).describe("Whether the project already exists"), + }), + machine: "large-2x", + run: async ({ chat_id, messages, model, userId, message_request_id }) => { + logger.info("Running chat stream", { + chat_id, + messages, + model, + userId, + message_request_id, + }); + + try { + // First try with Anthropic + return await createStreamWithProvider({ + model: anthropic(model), + messages: messages as CoreMessage[], + message_request_id, + chat_id, + userId, + }); + } catch (error) { + logger.info("Anthropic stream failed, falling back to OpenAI", { + error: error instanceof Error ? error.message : "Unknown error", + stack: error instanceof Error ? error.stack : undefined, + chat_id, + userId, + message_request_id, + }); + + try { + // Fallback to OpenAI + return await createStreamWithProvider({ + model: openai("gpt-4"), + messages: messages as CoreMessage[], + message_request_id, + chat_id, + userId, + }); + } catch (fallbackError) { + logger.error("Both Anthropic and OpenAI streams failed", { + error: fallbackError instanceof Error ? fallbackError.message : "Unknown error", + stack: fallbackError instanceof Error ? fallbackError.stack : undefined, + chat_id, + userId, + message_request_id, + }); + throw fallbackError; + } + } + }, +}); + +export const chatStreamCaller = schemaTask({ + id: "chat-stream-caller", + description: "Call the chat stream", + schema: z.object({ + prompt: z.string().describe("The prompt to chat with the AI"), + }), + run: async ({ prompt }, { ctx }) => { + const result = await chatStream.trigger({ + messages: [ + { + role: "user", + content: prompt, + }, + ], + message_request_id: ctx.run.id, + }); + + const stream = await runs.fetchStream(result.id, "dataStream"); + + for await (const chunk of stream) { + console.log("Chunk:", chunk); + } + + return result; + }, +}); + +export const streamFetcher = schemaTask({ + id: "stream-fetcher", + description: "Fetch a stream", + schema: z.object({ + runId: z.string().describe("The run ID to fetch the stream for"), + streamId: z.string().describe("The stream ID to fetch"), + }), + run: async ({ runId, streamId }) => { + const result = await runs.fetchStream(runId, streamId); + + for await (const chunk of result) { + console.log("Chunk:", chunk); + } + + return result; + }, +}); + +export const chatStream2 = schemaTask({ + id: "chat-stream-2", + description: "Stream data from the AI SDK and use tools", + schema: z.object({ + chat_id: z.string().default("chat"), + messages: z.array(z.unknown()).describe("Array of chat messages"), + message_request_id: z.string().describe("Unique identifier for the message request"), + model: z.string().default("claude-3-7-sonnet-20250219"), + userId: z.string().optional().describe("User ID for authentication"), + existingProject: z.boolean().default(false).describe("Whether the project already exists"), + }), + machine: "large-2x", + run: async ({ chat_id, messages, model, userId, message_request_id }) => { + logger.info("Running chat stream", { + chat_id, + messages, + model, + userId, + message_request_id, + }); + + const dataStreamResponse = createDataStream({ + execute: async (dataStream) => { + streamTextWithModel( + dataStream, + anthropic(model), + messages as CoreMessage[], + chat_id, + openai("gpt-4"), + userId + ); + }, + }); + + const stream = await metadata.stream("dataStream", dataStreamResponse); + + for await (const chunk of stream) { + console.log("Chunk:", chunk); + } + }, +}); + +function streamTextWithModel( + dataStream: DataStreamWriter, + model: ReturnType | ReturnType, + messages: CoreMessage[], + chat_id: string, + fallbackModel?: ReturnType | ReturnType, + userId?: string +) { + const result = streamText({ + model, + system: "This is the system prompt, please be nice.", + messages, + maxSteps: 20, + toolCallStreaming: true, + onError: (error) => { + logger.error("Error in chatStream task (streamText)", { + error: error instanceof Error ? error.message : "Unknown error", + stack: error instanceof Error ? error.stack : undefined, + provider: model.provider, + }); + + if (fallbackModel) { + streamTextWithModel(dataStream, fallbackModel, messages, chat_id, undefined, userId); + } + }, + onChunk: async (chunk) => { + console.log("Chunk:", chunk); + }, + onFinish: async ({ response, reasoning }) => { + metadata.flush(); + logger.info("AI stream finished", { + chat_id, + userId, + messageCount: response.messages.length, + provider: model.provider, + }); + + if (userId) { + try { + // Pretend to save messages + await new Promise((resolve) => setTimeout(resolve, 1000)); + + logger.info("Successfully saved AI response messages", { + chat_id, + userId, + messageCount: response.messages.length, + message: JSON.stringify(response.messages, null, 2), + provider: model.provider, + }); + } catch (error) { + logger.error("Failed to save AI response messages", { + error: error instanceof Error ? error.message : "Unknown error", + stack: error instanceof Error ? error.stack : undefined, + chat_id, + userId, + provider: model.provider, + }); + } + } + }, + }); + + result.consumeStream(); + + result.mergeIntoDataStream(dataStream, { + sendReasoning: true, + }); +} + +export const chatStreamCaller2 = schemaTask({ + id: "chat-stream-caller-2", + description: "Call the chat stream", + schema: z.object({ + prompt: z.string().describe("The prompt to chat with the AI"), + }), + run: async ({ prompt }, { ctx }) => { + const result = await chatStream2.trigger({ + messages: [ + { + role: "user", + content: prompt, + }, + ], + message_request_id: ctx.run.id, + }); + + const stream = await runs.fetchStream(result.id, "dataStream"); + + for await (const chunk of stream) { + console.log("Chunk:", chunk); + } + + return result; + }, +});