Skip to content

Commit 341e531

Browse files
committed
consolidate video processing jobs into a single job to limit time spent downloading video and other common things
1 parent 575fbcf commit 341e531

File tree

10 files changed

+673
-809
lines changed

10 files changed

+673
-809
lines changed

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
"@trigger.dev/react-hooks": "3.3.6",
4343
"@trigger.dev/sdk": "3.3.6",
4444
"@upstash/redis": "^1.34.3",
45+
"@vercel/functions": "^1.5.2",
4546
"@vidstack/react": "^1.12.11",
4647
"axios": "^1.7.7",
4748
"class-variance-authority": "^0.7.0",

pnpm-lock.yaml

Lines changed: 16 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/app/p/[videoId]/data.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { GetObjectCommand, S3Client } from "@aws-sdk/client-s3";
66
import { getSignedUrl } from "@aws-sdk/s3-request-presigner";
77
import { clerkClient } from "@clerk/nextjs/server";
88
import { Redis } from "@upstash/redis";
9+
import { waitUntil } from "@vercel/functions";
910
import dayjs from "dayjs";
1011
import utc from "dayjs/plugin/utc";
1112
import { createSigner } from "fast-jwt";
@@ -116,9 +117,11 @@ export async function getVideoData(videoId: string) {
116117

117118
const videoData = await getVideoDataFromDb(videoId);
118119

119-
redis.hset(`video:${videoId}`, videoData).then(() => {
120-
redis.expire(`video:${videoId}`, 60 * 60 * 24);
121-
});
120+
waitUntil(
121+
redis.hset(`video:${videoId}`, videoData).then(() => {
122+
redis.expire(`video:${videoId}`, 60 * 60 * 24);
123+
}),
124+
);
122125

123126
return {
124127
...videoData,

src/app/videos/actions.ts

Lines changed: 62 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,8 @@ import {
99
import { db } from "@/lib/db";
1010
import { env } from "@/lib/env";
1111
import { users, videos } from "@/lib/schema";
12-
import type { initialUploadTask } from "@/trigger/initial-upload";
13-
import type { thumbnailTrackTask } from "@/trigger/thumbnail-track";
14-
import type { transcodingTask } from "@/trigger/transcoding";
1512
import type { videoDeletionTask } from "@/trigger/video-deletion";
13+
import type { Step, videoProcessingTask } from "@/trigger/video-processing";
1614
import {
1715
DeleteObjectCommand,
1816
GetObjectCommand,
@@ -76,9 +74,16 @@ export async function deleteVideo(videoId: string) {
7674
return { success: false, message: "Not authorized" };
7775
}
7876

79-
// TODO: use realtime to inform the user better
80-
81-
await tasks.trigger<typeof videoDeletionTask>("video-deletion", { videoId });
77+
try {
78+
await tasks.trigger<typeof videoDeletionTask>("video-deletion", {
79+
videoId,
80+
});
81+
} catch {
82+
return {
83+
success: false,
84+
message: "Failed to delete video",
85+
};
86+
}
8287

8388
return {
8489
success: true,
@@ -233,35 +238,36 @@ export async function uploadComplete(key: string, title: string, mimeType: strin
233238
videoId = nanoid(8);
234239
}
235240

236-
await db
237-
.update(users)
238-
.set({
239-
totalStorageUsed: userData.totalStorageUsed + (headResponse?.ContentLength ?? 0),
240-
})
241-
.where(eq(users.id, userId));
242-
243-
const [videoData] = await db
244-
.insert(videos)
245-
.values({
246-
id: videoId,
247-
authorId: userId,
248-
nativeFileKey: key,
249-
fileSizeBytes: headResponse?.ContentLength ?? 0,
250-
title: title.substring(0, VIDEO_TITLE_MAX_LENGTH),
251-
isProcessing: userData.accountTier !== "free",
252-
deletionDate:
253-
userData.accountTier === "free"
254-
? sql.raw(`now() + INTERVAL '${FREE_PLAN_VIDEO_RETENION_DAYS} days'`)
255-
: null,
256-
sources: [
257-
{
258-
isNative: true,
259-
key: key,
260-
type: mimeType === "video/quicktime" ? "video/mp4" : mimeType,
261-
},
262-
],
263-
})
264-
.returning();
241+
const [[videoData]] = await db.batch([
242+
db
243+
.insert(videos)
244+
.values({
245+
id: videoId,
246+
authorId: userId,
247+
nativeFileKey: key,
248+
fileSizeBytes: headResponse?.ContentLength ?? 0,
249+
title: title.substring(0, VIDEO_TITLE_MAX_LENGTH),
250+
isProcessing: userData.accountTier !== "free",
251+
deletionDate:
252+
userData.accountTier === "free"
253+
? sql.raw(`now() + INTERVAL '${FREE_PLAN_VIDEO_RETENION_DAYS} days'`)
254+
: null,
255+
sources: [
256+
{
257+
isNative: true,
258+
key: key,
259+
type: mimeType === "video/quicktime" ? "video/mp4" : mimeType,
260+
},
261+
],
262+
})
263+
.returning(),
264+
db
265+
.update(users)
266+
.set({
267+
totalStorageUsed: userData.totalStorageUsed + (headResponse?.ContentLength ?? 0),
268+
})
269+
.where(eq(users.id, userId)),
270+
]);
265271

266272
try {
267273
const cachedVideos = await redis.hget<(typeof videoData)[]>(`videos:${userId}`, "videos");
@@ -276,37 +282,33 @@ export async function uploadComplete(key: string, title: string, mimeType: strin
276282
}
277283

278284
try {
279-
const promises: Promise<unknown>[] = [
280-
tasks.trigger<typeof initialUploadTask>(
281-
"initial-upload",
282-
{ videoId: videoData.id },
283-
{
284-
tags: [userId, `initial-upload-${videoId}`, `video_${videoData.id}`],
285-
},
286-
),
287-
];
285+
const videoProcessingSteps: Step[] = ["video-duration", "thumbnails"];
288286

289287
if (userData.accountTier !== "free") {
290-
promises.push(
291-
tasks.trigger<typeof transcodingTask>(
292-
"transcoding",
293-
{ videoId: videoData.id },
294-
{ tags: [userId, `video_${videoData.id}`] },
295-
),
296-
);
288+
videoProcessingSteps.push("transcoding");
297289
}
298290

299291
if (!videoData.isPrivate) {
300-
promises.push(
301-
tasks.trigger<typeof thumbnailTrackTask>(
302-
"thumbnail-track",
303-
{ videoId },
304-
{ tags: [userId, `video_${videoId}`] },
305-
),
306-
);
292+
videoProcessingSteps.push("thumbnail-track");
307293
}
308294

309-
await Promise.all(promises);
295+
for (let i = 0; i < 3; i++) {
296+
try {
297+
await tasks.trigger<typeof videoProcessingTask>(
298+
"video-processing",
299+
{
300+
videoId,
301+
steps: videoProcessingSteps,
302+
},
303+
{
304+
tags: [`video-processing-${videoData.id}`],
305+
},
306+
);
307+
break;
308+
} catch (err) {
309+
if (i === 2) throw err;
310+
}
311+
}
310312
} catch (e) {
311313
console.error(e);
312314

@@ -336,7 +338,7 @@ export async function uploadComplete(key: string, title: string, mimeType: strin
336338
success: true,
337339
triggerAccessToken: await triggerAuth.createPublicToken({
338340
scopes: {
339-
read: { tags: `initial-upload-${videoId}` },
341+
read: { tags: `video-processing-${videoId}` },
340342
},
341343
}),
342344
video: {

src/app/videos/components/videos-board.tsx

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -167,19 +167,24 @@ type ThumbnailPlaceholderProps = {
167167
};
168168

169169
function ThumbnailPlaceholder(props: ThumbnailPlaceholderProps) {
170-
const { runs } = useRealtimeRunsWithTag(`initial-upload-${props.videoId}`);
170+
const { runs } = useRealtimeRunsWithTag(`video-processing-${props.videoId}`);
171+
171172
useEffect(() => {
172173
for (const run of runs) {
173-
if (run.output?.success) {
174+
if (
175+
run.metadata &&
176+
run.metadata.videoId === props.videoId &&
177+
run.metadata.smallThumbnailUrl
178+
) {
174179
useUserVideoDatastore.setState((state) => ({
175180
videos: state.videos.map((v) => {
176-
if (v.id === props.videoId && run.output) {
181+
if (v.id === props.videoId && run.metadata) {
177182
return {
178183
...v,
179-
smallThumbnailUrl: run.output.smallThumbnailUrl,
180-
videoLengthSeconds: run.output.videoLengthSeconds,
184+
smallThumbnailUrl: run.metadata.smallThumbnailUrl as string,
181185
};
182186
}
187+
183188
return v;
184189
}),
185190
}));

src/app/videos/data.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { videos } from "@/lib/schema";
77
import { safeParseAccountTier } from "@/lib/utils";
88
import { auth } from "@clerk/nextjs/server";
99
import { Redis } from "@upstash/redis";
10+
import { waitUntil } from "@vercel/functions";
1011
import { desc } from "drizzle-orm";
1112
import { redirect } from "next/navigation";
1213

@@ -61,9 +62,11 @@ export async function fetchVideosData() {
6162
if (!cachedData) {
6263
userData = await getFreshVideoData(userId);
6364

64-
redis.hset(cacheKey, userData).then(() => {
65-
redis.expire(cacheKey, 86400);
66-
});
65+
waitUntil(
66+
redis.hset(cacheKey, userData).then(() => {
67+
redis.expire(cacheKey, 86400);
68+
}),
69+
);
6770
} else {
6871
userData = {
6972
accountTier: safeParseAccountTier(cachedData.accountTier),

0 commit comments

Comments
 (0)