Skip to content

Commit 9778651

Browse files
authored
feat: migrate dashboard reindexing to qstash logic for async handling (#3701)
1 parent ff7210c commit 9778651

File tree

9 files changed

+207
-210
lines changed

9 files changed

+207
-210
lines changed

packages/fern-dashboard/src/app/actions/toggleAskAi.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,5 +75,5 @@ export async function getToggleStatus({
7575
}): Promise<string> {
7676
const faiClient = getFaiClient({ token: process.env.FERN_TOKEN ?? "" });
7777
const response = await faiClient.settings.getToggleStatus({ domain });
78-
return response.status || "unknown";
78+
return response.status || "failed";
7979
}

packages/fern-dashboard/src/components/settings/ToggleAskAiButton.tsx

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,6 @@ export function ToggleAskAiButton({
9999
pollIntervalRef.current = setInterval(() => {
100100
void pollJobStatus();
101101
}, 7000);
102-
103-
void pollJobStatus();
104102
}, [pollJobStatus]);
105103

106104
useEffect(() => {

packages/fern-docs/bundle/src/app/[host]/[domain]/api/fern-docs/revalidate/route.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import {
3939
queueTurbopufferReindex,
4040
} from "@/server/queue-reindex";
4141

42-
export const maxDuration = 600; // 10 minutes timeout
42+
export const maxDuration = 800; // 13 minutes timeout
4343

4444
export async function GET(
4545
req: NextRequest,
@@ -94,7 +94,7 @@ export async function GET(
9494
// reindex unless explicitly disabled
9595
req.nextUrl.searchParams.get("reindex") !== "false"
9696
) {
97-
reindexPromise = reindex(docs, host, domain)
97+
reindexPromise = reindex(docs, host, domain, maxDuration)
9898
.then((services) => {
9999
controller.enqueue(
100100
`reindex-queued:services=${services.join(",")}\n`
@@ -358,7 +358,8 @@ export async function GET(
358358
async function reindex(
359359
docs: DocsV2Read.LoadDocsForUrlResponse,
360360
host: string,
361-
domain: string
361+
domain: string,
362+
maxDuration: number
362363
) {
363364
const { basePath } = docs.baseUrl;
364365

@@ -371,7 +372,12 @@ async function reindex(
371372
).ask_ai_enabled;
372373

373374
if (isAskAiEnabled) {
374-
await queueTurbopufferReindex(host, withoutStaging(domain), basePath);
375+
await queueTurbopufferReindex(
376+
host,
377+
withoutStaging(domain),
378+
basePath,
379+
maxDuration
380+
);
375381
return ["algolia", "turbopuffer"];
376382
}
377383
return ["algolia"];
Lines changed: 21 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,13 @@
11
import { NextRequest, NextResponse } from "next/server";
22

3-
import { createOpenAI } from "@ai-sdk/openai";
4-
5-
import { createCachedDocsLoader } from "@fern-api/docs-loader";
6-
import {
7-
fdrEnvironment,
8-
fernToken_admin,
9-
getFaiOrigin,
10-
openaiApiKey,
11-
turbopufferApiKey,
12-
} from "@fern-api/docs-server/env-variables";
133
import { isLocal } from "@fern-api/docs-server/isLocal";
144
import { isSelfHosted } from "@fern-api/docs-server/isSelfHosted";
15-
import { postToSlack } from "@fern-api/docs-server/slack";
16-
import { Gate, withBasicTokenAnonymous } from "@fern-api/docs-server/withRbac";
5+
import { loadWithUrl } from "@fern-api/docs-server/loadWithUrl";
176
import { getDocsDomainEdge } from "@fern-api/docs-server/xfernhost/edge";
18-
import { slugToHref, withoutStaging } from "@fern-api/docs-utils";
19-
import { FernAIClient } from "@fern-api/fai-sdk";
20-
import { getAuthEdgeConfig, getEdgeFlags } from "@fern-docs/edge-config";
21-
import {
22-
getFernDocsIndexName,
23-
getTurbopufferNamespace,
24-
getTurbopufferVectorizer,
25-
turbopufferUpsertTask,
26-
} from "@fern-docs/search-ask-fern";
7+
import { withoutStaging } from "@fern-api/docs-utils";
278

28-
import { JobManager, createJobResponse } from "@/jobs";
9+
import { createJobResponse } from "@/jobs";
10+
import { queueTurbopufferStartReindex } from "@/server/queue-reindex";
2911

3012
export const maxDuration = 800; // 13 minutes
3113

@@ -42,111 +24,22 @@ export async function GET(req: NextRequest): Promise<NextResponse> {
4224
const deleteExisting =
4325
req.nextUrl.searchParams.get("deleteExisting") === "true";
4426

45-
const fernDocsIndexName = getFernDocsIndexName();
46-
const namespace = getTurbopufferNamespace(domain, fernDocsIndexName);
47-
48-
const job_id = await JobManager.createJob(domain);
49-
50-
JobManager.executeJob(domain, async () => {
51-
const openai = createOpenAI({ apiKey: openaiApiKey() });
52-
const embeddingModel = openai.embedding("text-embedding-3-large");
53-
54-
try {
55-
const loader = await createCachedDocsLoader(host, domain);
56-
const metadata = await loader.getMetadata();
57-
if (metadata == null) {
58-
throw new Error("Documentation not found");
59-
}
60-
if (metadata.isPreview) {
61-
return {
62-
message: "Preview sites are not indexed",
63-
added: 0,
64-
domain,
65-
namespace,
66-
};
67-
}
68-
69-
const [authEdgeConfig, edgeFlags] = await Promise.all([
70-
getAuthEdgeConfig(domain),
71-
getEdgeFlags(domain),
72-
]);
73-
74-
console.log("Starting turbopuffer upsert for namespace: ", namespace);
75-
76-
const numInserted = await turbopufferUpsertTask({
77-
apiKey: turbopufferApiKey(),
78-
namespace,
79-
payload: {
80-
environment: fdrEnvironment(),
81-
fernToken: fernToken_admin(),
82-
domain: withoutStaging(domain),
83-
...edgeFlags,
84-
},
85-
vectorizer: getTurbopufferVectorizer(embeddingModel),
86-
authed: (node) => {
87-
if (authEdgeConfig == null) {
88-
return false;
89-
}
90-
return (
91-
withBasicTokenAnonymous(authEdgeConfig, slugToHref(node.slug)) ===
92-
Gate.DENY
93-
);
94-
},
95-
deleteExisting,
96-
});
97-
98-
const faiClient = new FernAIClient({
99-
baseUrl: getFaiOrigin(),
100-
});
101-
102-
console.log("Syncing index to query index for domain: ", domain);
103-
104-
const syncResponse = await faiClient.index.syncIndexToQueryIndex(domain, {
105-
index_name: fernDocsIndexName,
106-
});
107-
108-
console.log("Synced finished for domain: ", domain);
109-
110-
const pollJobStatus = async (jobId: string): Promise<void> => {
111-
while (true) {
112-
const statusResponse = await faiClient.index.getJobStatus(jobId);
113-
const { status, success, error } = statusResponse;
114-
115-
if (status === "completed") {
116-
if (success === false) {
117-
throw new Error(`Sync job failed: ${error || "Unknown error"}`);
118-
}
119-
break;
120-
} else if (status === "failed") {
121-
throw new Error(`Sync job failed: ${error || "Unknown error"}`);
122-
}
123-
124-
await new Promise((resolve) => setTimeout(resolve, 15000));
125-
}
126-
};
127-
128-
await pollJobStatus(syncResponse.job_id);
129-
130-
return {
131-
added: numInserted,
132-
domain,
133-
namespace,
134-
message: "Turbopuffer reindex completed successfully",
135-
};
136-
} catch (error) {
137-
console.error(`[turbopuffer-start] ${JSON.stringify(error)}`);
138-
139-
postToSlack(
140-
"#search-notifs",
141-
`:rotating_light: [TURBOPUFFER-START] Failed to reindex for ${domain} with the following error: ${String(error)}`,
142-
"turbopuffer-reindex-start"
143-
);
144-
145-
throw error;
146-
}
147-
}).catch((error) => {
148-
console.error(`Job ${job_id} execution failed:`, error);
149-
});
27+
const docs = await loadWithUrl(domain);
28+
const { basePath } = docs.baseUrl;
29+
30+
const messageId = await queueTurbopufferStartReindex(
31+
host,
32+
withoutStaging(domain),
33+
basePath,
34+
deleteExisting,
35+
maxDuration
36+
);
37+
38+
if (!messageId) {
39+
return NextResponse.json("Failed to queue turbopuffer reindex", {
40+
status: 400,
41+
});
42+
}
15043

151-
return createJobResponse("Turbopuffer reindex job started", job_id);
44+
return createJobResponse(messageId, "in_progress");
15245
}

packages/fern-docs/bundle/src/app/[host]/[domain]/api/fern-docs/search/v2/reindex/turbopuffer/status/route.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ import { NextRequest, NextResponse } from "next/server";
22

33
import { isLocal } from "@fern-api/docs-server/isLocal";
44
import { isSelfHosted } from "@fern-api/docs-server/isSelfHosted";
5-
import { getDocsDomainEdge } from "@fern-api/docs-server/xfernhost/edge";
65

7-
import { JobManager, createJobStatusResponse } from "@/jobs";
6+
import { createJobStatusResponse } from "@/jobs";
7+
import { getMessageStatus } from "@/server/queue";
88

99
export async function GET(req: NextRequest): Promise<NextResponse> {
1010
if (isLocal() || isSelfHosted()) {
@@ -23,7 +23,6 @@ export async function GET(req: NextRequest): Promise<NextResponse> {
2323
);
2424
}
2525

26-
const domain = getDocsDomainEdge(req);
27-
const job = await JobManager.getJobStatus(domain);
28-
return createJobStatusResponse(job);
26+
const status = await getMessageStatus(jobId);
27+
return createJobStatusResponse(jobId, status);
2928
}
Lines changed: 28 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { NextResponse } from "next/server";
22

33
import { kv } from "@vercel/kv";
4-
import { randomUUID } from "crypto";
54

65
interface JobStatus {
76
id: string;
@@ -13,80 +12,47 @@ interface JobStatus {
1312
result?: any;
1413
}
1514

16-
export class JobManager {
17-
static async createJob(domain: string): Promise<string> {
18-
const job: JobStatus = {
19-
id: randomUUID(),
20-
status: "pending",
21-
created_at: Date.now(),
22-
};
23-
24-
await kv.hset(domain, { [`tpuf_job`]: job });
25-
return job.id;
26-
}
27-
28-
static async executeJob<T>(
29-
domain: string,
30-
task: () => Promise<T>
31-
): Promise<void> {
32-
const job = await kv.hget<JobStatus>(domain, `tpuf_job`);
33-
if (!job) {
34-
throw new Error("Job not found");
35-
}
36-
37-
job.status = "in_progress";
38-
job.started_at = Date.now();
39-
await kv.hset(domain, { [`tpuf_job`]: job });
40-
41-
try {
42-
const result = await task();
43-
job.status = "completed";
44-
job.completed_at = Date.now();
45-
job.result = result;
46-
} catch (error) {
47-
job.status = "failed";
48-
job.completed_at = Date.now();
49-
job.error = error instanceof Error ? error.message : String(error);
50-
}
51-
52-
await kv.hset(domain, { [`tpuf_job`]: job });
53-
}
54-
55-
static async getJobStatus(domain: string): Promise<JobStatus | undefined> {
56-
const job = await kv.hget<JobStatus>(domain, `tpuf_job`);
57-
return job ?? undefined;
58-
}
15+
export async function createJob(
16+
domain: string,
17+
messageId: string
18+
): Promise<string> {
19+
const job: JobStatus = {
20+
id: messageId,
21+
status: "in_progress",
22+
created_at: Date.now(),
23+
};
24+
25+
await kv.hset(domain, { [`tpuf_job`]: job });
26+
return job.id;
27+
}
5928

60-
static async getJob(domain: string): Promise<JobStatus | undefined> {
61-
const job = await kv.hget<JobStatus>(domain, `tpuf_job`);
62-
return job ?? undefined;
63-
}
29+
export async function getJobStatus(
30+
domain: string
31+
): Promise<JobStatus | undefined> {
32+
const job = await kv.hget<JobStatus>(domain, `tpuf_job`);
33+
return job ?? undefined;
6434
}
6535

6636
export function createJobResponse(
67-
message: string = "Job created successfully",
68-
job_id: string
37+
job_id: string,
38+
status: "completed" | "failed" | "in_progress"
6939
) {
7040
return NextResponse.json({
71-
message,
7241
job_id,
42+
status,
7343
});
7444
}
7545

76-
export function createJobStatusResponse(job: JobStatus | undefined) {
77-
if (!job) {
46+
export function createJobStatusResponse(
47+
jobId: string,
48+
status: "completed" | "failed" | "in_progress"
49+
) {
50+
if (!status) {
7851
return NextResponse.json({ error: "Job not found" }, { status: 404 });
7952
}
8053

8154
return NextResponse.json({
82-
id: job.id,
83-
status: job.status,
84-
completed: job.status === "completed",
85-
failed: job.status === "failed",
86-
created_at: job.created_at,
87-
started_at: job.started_at,
88-
completed_at: job.completed_at,
89-
error: job.error,
90-
result: job.result,
55+
id: jobId,
56+
status,
9157
});
9258
}

0 commit comments

Comments
 (0)