Skip to content

Commit 8244819

Browse files
authored
feat(scraper): add cloudflare queue producer and consumer (#32)
1 parent 07e61f9 commit 8244819

File tree

21 files changed

+536
-309
lines changed

21 files changed

+536
-309
lines changed

apps/scraper/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ node_modules/
1919
.env
2020
.env.production
2121
.dev.vars
22+
.db.env
2223

2324
# logs
2425
logs/

apps/scraper/drizzle-dev.config.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { config } from "dotenv";
22
import { defineConfig } from "drizzle-kit";
33

4-
config({ path: "./.env" });
4+
config({ path: "./.db.env" });
55

66
export default defineConfig({
77
out: "./src/drizzle/migrations",

apps/scraper/drizzle-prod.config.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import { config } from "dotenv";
33
import { defineConfig } from "drizzle-kit";
44

5-
config({ path: "./.env" });
5+
config({ path: "./.db.env" });
66

77
const accountId = process.env.CLOUDFLARE_ACCOUNT_ID!;
88
const databaseId = process.env.CLOUDFLARE_DATABASE_ID!;

apps/scraper/package.json

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,24 @@
99
"check:types": "tsc --noEmit",
1010
"db:studio:local": "drizzle-kit studio --config=drizzle-dev.config.ts",
1111
"db:studio:remote": "drizzle-kit studio --config=drizzle-prod.config.ts",
12+
"db:generate": "drizzle-kit generate --config=drizzle-dev.config.ts",
13+
"db:migrate:local": "wrangler d1 migrations apply scraper-ops --local",
14+
"db:migrate:remote": "wrangler d1 migrations apply scraper-ops --remote",
1215
"db:push:local": "drizzle-kit push --config=drizzle-dev.config.ts",
1316
"db:push:remote": "drizzle-kit push --config=drizzle-prod.config.ts",
1417
"cf-typegen": "wrangler types --env-interface CloudflareBindings"
1518
},
1619
"dependencies": {
20+
"@dev-team-fall-25/server": "workspace:*",
1721
"dotenv": "^17.2.3",
1822
"drizzle-orm": "^0.44.6",
1923
"hono": "^4.9.10",
20-
"zod": "^4.1.12",
21-
"@dev-team-fall-25/server": "workspace:*"
24+
"nanoid": "^5.1.6",
25+
"zod": "^4.1.12"
2226
},
2327
"devDependencies": {
2428
"@biomejs/biome": "2.2.5",
29+
"@libsql/client": "^0.15.15",
2530
"drizzle-kit": "^0.31.5",
2631
"wrangler": "^4.42.1"
2732
}

apps/scraper/src/drizzle/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { drizzle } from "drizzle-orm/d1";
22

3-
const createDB = async (env: CloudflareBindings) => {
3+
const getDB = (env: CloudflareBindings) => {
44
return drizzle(env.DB);
55
};
66

7-
export default createDB;
7+
export default getDB;

apps/scraper/src/drizzle/schema.ts

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
11
import { integer, sqliteTable, text } from "drizzle-orm/sqlite-core";
2+
import { nanoid } from "nanoid";
23

34
export const jobs = sqliteTable("jobs", {
4-
id: integer("id").primaryKey({ autoIncrement: true }),
5+
id: text("id")
6+
.primaryKey()
7+
.$defaultFn(() => nanoid()),
58
url: text("url").notNull(),
69
status: text("status", {
710
enum: ["pending", "processing", "completed", "failed"],
8-
}).notNull(),
11+
})
12+
.notNull()
13+
.default("pending"),
914
jobType: text("job_type", {
10-
enum: ["discovery", "program", "course"],
15+
enum: ["discover-programs", "discover-courses", "program", "course"],
1116
}).notNull(),
1217
createdAt: integer("created_at", { mode: "timestamp" })
1318
.notNull()
@@ -17,13 +22,14 @@ export const jobs = sqliteTable("jobs", {
1722
});
1823

1924
export const errorLogs = sqliteTable("error_logs", {
20-
id: integer("id").primaryKey({ autoIncrement: true }),
21-
jobId: integer("job_id").references(() => jobs.id),
25+
id: text("id")
26+
.primaryKey()
27+
.$defaultFn(() => nanoid()),
28+
jobId: text("job_id").references(() => jobs.id),
2229
errorType: text("error_type", {
23-
enum: ["network", "parsing", "validation", "timeout"],
30+
enum: ["network", "parsing", "validation", "timeout", "unknown"],
2431
}).notNull(),
2532
errorMessage: text("error_message").notNull(),
2633
stackTrace: text("stack_trace"),
27-
retryCount: integer("retry_count").notNull(),
2834
timestamp: integer("timestamp", { mode: "timestamp" }).notNull(),
2935
});

apps/scraper/src/index.ts

Lines changed: 189 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,204 @@
1-
/** biome-ignore-all lint/correctness/noUnusedFunctionParameters: disable for now as they haven't been implemented yet */
1+
import type {
2+
ZUpsertPrerequisites,
3+
ZUpsertRequirements,
4+
} from "@dev-team-fall-25/server/convex/http";
5+
import { eq } from "drizzle-orm";
26
import { Hono } from "hono";
7+
import type z from "zod";
8+
import getDB from "./drizzle";
9+
import { errorLogs, jobs } from "./drizzle/schema";
10+
import { ConvexApi } from "./lib/convex";
11+
import { JobError, type JobMessage } from "./lib/queue";
12+
import { discoverCourses, scrapeCourse } from "./modules/courses";
13+
import { discoverPrograms, scrapeProgram } from "./modules/programs";
314

415
const app = new Hono<{ Bindings: CloudflareBindings }>();
516

617
app.get("/", async (c) => {
7-
// const db = await createDB(c.env);
18+
// const db = await getDB(c.env);
819
// TODO: use hono to render a dashboard to monitor the scraping status
20+
return c.json({ status: "ok" });
921
});
1022

1123
export default {
1224
fetch: app.fetch,
1325

14-
async scheduled(event: ScheduledEvent, env: CloudflareBindings) {
15-
// const db = await createDB(env);
16-
// const api = new ConvexApi({
17-
// baseUrl: env.CONVEX_SITE_URL,
18-
// apiKey: env.CONVEX_API_KEY,
19-
// });
20-
// TODO: set up jobs for scraping a list of urls need to be scraped and add them to queue as "discovery"
26+
async scheduled(_event: ScheduledEvent, env: CloudflareBindings) {
27+
// NOTE: the worker will not execute anything for now until the flag for toggle scrapers are set up
28+
return;
29+
// biome-ignore lint/correctness/noUnreachable: WIP
30+
const db = getDB(env);
31+
32+
// FIXME: need to handle when programsUr or coursesUrl is empty array
33+
const programsUrl = new URL("/programs", env.SCRAPING_BASE_URL).toString();
34+
const coursesUrl = new URL("/courses", env.SCRAPING_BASE_URL).toString();
35+
36+
const [[programsJob], [coursesJob]] = await Promise.all([
37+
db
38+
.insert(jobs)
39+
.values({
40+
url: programsUrl,
41+
jobType: "discover-programs",
42+
})
43+
.returning(),
44+
db
45+
.insert(jobs)
46+
.values({
47+
url: coursesUrl,
48+
jobType: "discover-courses",
49+
})
50+
.returning(),
51+
]);
52+
53+
await Promise.all([
54+
env.SCRAPING_QUEUE.send({ jobId: programsJob.id }),
55+
env.SCRAPING_QUEUE.send({ jobId: coursesJob.id }),
56+
]);
2157
},
2258

23-
async queue(batch: MessageBatch<Error>, env: CloudflareBindings) {
24-
// const db = await createDB(env);
25-
// const api = new ConvexApi({
26-
// baseUrl: env.CONVEX_SITE_URL,
27-
// apiKey: env.CONVEX_API_KEY,
28-
// });
29-
// TODO: set up jobs for scrping given url and save structured data to convex database
59+
async queue(
60+
batch: MessageBatch<JobMessage>,
61+
env: CloudflareBindings,
62+
ctx: ExecutionContext,
63+
) {
64+
const db = getDB(env);
65+
const convex = new ConvexApi({
66+
baseUrl: env.CONVEX_SITE_URL,
67+
apiKey: env.CONVEX_API_KEY,
68+
});
69+
70+
for (const message of batch.messages) {
71+
const { jobId } = message.body;
72+
73+
const job = await db.select().from(jobs).where(eq(jobs.id, jobId)).get();
74+
75+
if (!job) {
76+
message.ack();
77+
continue;
78+
}
79+
80+
ctx.waitUntil(
81+
(async () => {
82+
try {
83+
await db
84+
.update(jobs)
85+
.set({ status: "processing", startedAt: new Date() })
86+
.where(eq(jobs.id, jobId));
87+
88+
switch (job.jobType) {
89+
case "discover-programs": {
90+
const programUrls = await discoverPrograms(job.url);
91+
const newJobs = await db
92+
.insert(jobs)
93+
.values(
94+
programUrls.map((url) => ({
95+
url,
96+
jobType: "program" as const,
97+
})),
98+
)
99+
.returning();
100+
101+
await env.SCRAPING_QUEUE.sendBatch(
102+
newJobs.map((j) => ({ body: { jobId: j.id } })),
103+
);
104+
break;
105+
}
106+
case "discover-courses": {
107+
const courseUrls = await discoverCourses(job.url);
108+
const newJobs = await db
109+
.insert(jobs)
110+
.values(
111+
courseUrls.map((url) => ({
112+
url,
113+
jobType: "course" as const,
114+
})),
115+
)
116+
.returning();
117+
118+
await env.SCRAPING_QUEUE.sendBatch(
119+
newJobs.map((j) => ({ body: { jobId: j.id } })),
120+
);
121+
break;
122+
}
123+
case "program": {
124+
const res = await scrapeProgram(job.url, db, env);
125+
126+
const programId = await convex.upsertProgram(res.program);
127+
128+
if (!programId) {
129+
throw new JobError(
130+
"Failed to upsert program: no ID returned",
131+
"validation",
132+
);
133+
}
134+
135+
// it is safe to assert the type here because the data will be validated before sending the request
136+
const newRequirements = res.requirements.map((req) => ({
137+
...req,
138+
programId: programId,
139+
})) as z.infer<typeof ZUpsertRequirements>;
140+
141+
if (res.requirements.length > 0) {
142+
await convex.upsertRequirements(newRequirements);
143+
}
144+
break;
145+
}
146+
case "course": {
147+
const res = await scrapeCourse(job.url, db, env);
148+
149+
const courseId = await convex.upsertCourse(res.course);
150+
151+
if (!courseId) {
152+
throw new JobError(
153+
"Failed to upsert course: no ID returned",
154+
"validation",
155+
);
156+
}
157+
158+
// it is safe to assert the type here because the data will be validated before sending the request
159+
const newPrerequisites = res.prerequisites.map((prereq) => ({
160+
...prereq,
161+
courseId: courseId,
162+
})) as z.infer<typeof ZUpsertPrerequisites>;
163+
164+
if (res.prerequisites.length > 0) {
165+
await convex.upsertPrerequisites(newPrerequisites);
166+
}
167+
break;
168+
}
169+
}
170+
171+
await db
172+
.update(jobs)
173+
.set({ status: "completed", completedAt: new Date() })
174+
.where(eq(jobs.id, jobId));
175+
176+
message.ack();
177+
} catch (error) {
178+
const jobError =
179+
error instanceof JobError
180+
? error
181+
: new JobError(
182+
error instanceof Error ? error.message : "Unknown error",
183+
);
184+
185+
await db.insert(errorLogs).values({
186+
jobId: jobId,
187+
errorType: jobError.type,
188+
errorMessage: jobError.message,
189+
stackTrace: jobError.stack || null,
190+
timestamp: new Date(),
191+
});
192+
193+
await db
194+
.update(jobs)
195+
.set({ status: "failed" })
196+
.where(eq(jobs.id, jobId));
197+
198+
message.retry();
199+
}
200+
})(),
201+
);
202+
}
30203
},
31204
};

0 commit comments

Comments
 (0)