Skip to content

Commit 689f70c

Browse files
authored
feat: Improved S3 client (#169)
* Dropped s3-sync-client with custom code * Removed s3-sync-client as dependency
1 parent 9c7f7f6 commit 689f70c

File tree

6 files changed

+132
-90
lines changed

6 files changed

+132
-90
lines changed

apps/artisan/package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
"iso-language-codes": "^2.0.0",
2323
"mime-types": "^2.1.35",
2424
"parse-filepath": "^1.0.2",
25-
"s3-sync-client": "^4.3.1",
2625
"shared": "workspace:*",
2726
"zod": "^3.24.2"
2827
},

apps/artisan/src/lib/s3.ts

Lines changed: 123 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,23 @@
1-
import { createReadStream } from "node:fs";
2-
import { GetObjectCommand, S3 } from "@aws-sdk/client-s3";
1+
import { exists, mkdir, writeFile } from "node:fs/promises";
2+
import { dirname, join } from "node:path";
3+
import {
4+
DeleteObjectCommand,
5+
GetObjectCommand,
6+
paginateListObjectsV2,
7+
S3,
8+
} from "@aws-sdk/client-s3";
39
import { Upload } from "@aws-sdk/lib-storage";
410
import { getSignedUrl } from "@aws-sdk/s3-request-presigner";
511
import { ConfiguredRetryStrategy } from "@smithy/util-retry";
12+
import { Glob } from "bun";
613
import { lookup } from "mime-types";
7-
import { S3SyncClient } from "s3-sync-client";
14+
import { assert } from "shared/assert";
815
import { env } from "../env";
9-
import type { PutObjectCommandInput } from "@aws-sdk/client-s3";
10-
import type { CommandInput } from "s3-sync-client";
1116

12-
const retryStrategy = new ConfiguredRetryStrategy(5, 60_000);
17+
const retryStrategy = new ConfiguredRetryStrategy(
18+
5,
19+
(attempt) => attempt ** 2 * 1000,
20+
);
1321

1422
const client = new S3({
1523
endpoint: env.S3_ENDPOINT,
@@ -18,88 +26,136 @@ const client = new S3({
1826
accessKeyId: env.S3_ACCESS_KEY,
1927
secretAccessKey: env.S3_SECRET_KEY,
2028
},
21-
logger: console,
2229
retryStrategy,
2330
});
2431

25-
const { sync } = new S3SyncClient({ client, retryStrategy });
26-
27-
export async function syncFromS3(remotePath: string, localPath: string) {
28-
await sync(`s3://${env.S3_BUCKET}/${remotePath}`, localPath);
29-
}
30-
31-
export async function syncToS3(
32+
export async function s3UploadFile(
3233
localPath: string,
3334
remotePath: string,
34-
options?: {
35-
del?: boolean;
36-
public?: boolean;
37-
concurrency?: number;
38-
},
35+
aclPublic: boolean,
3936
) {
40-
const commandInput: CommandInput<PutObjectCommandInput> = (input) => {
41-
let contentType: string | undefined;
42-
if (input.Key) {
43-
contentType = lookup(input.Key) || "binary/octet-stream";
44-
}
45-
return {
46-
ContentType: contentType,
47-
ACL: options?.public ? "public-read" : "private",
48-
};
49-
};
50-
51-
await sync(localPath, `s3://${env.S3_BUCKET}/${remotePath}`, {
52-
del: options?.del,
53-
commandInput,
54-
maxConcurrentTransfers: options?.concurrency,
37+
const upload = new Upload({
38+
client,
39+
params: {
40+
Body: Bun.file(localPath).stream(),
41+
ContentType: lookup(localPath) || "binary/octet-stream",
42+
Bucket: env.S3_BUCKET,
43+
Key: remotePath,
44+
ACL: aclPublic ? "public-read" : "private",
45+
},
5546
});
47+
await upload.done();
5648
}
5749

58-
type UploadToS3File =
59-
| { type: "json"; data: object }
60-
| { type: "local"; path: string };
50+
async function s3DownloadFile(remotePath: string, localPath: string) {
51+
const command = new GetObjectCommand({
52+
Bucket: env.S3_BUCKET,
53+
Key: remotePath,
54+
});
6155

62-
export async function uploadToS3(
63-
remoteFilePath: string,
64-
file: UploadToS3File,
65-
onProgress?: (value: number) => void,
56+
const { Body } = await client.send(command);
57+
assert(Body);
58+
59+
await writeFile(localPath, Body.transformToWebStream());
60+
}
61+
62+
export async function s3DownloadFolder(remotePath: string, localPath: string) {
63+
const paginatedListObjects = paginateListObjectsV2(
64+
{ client },
65+
{
66+
Bucket: env.S3_BUCKET,
67+
Prefix: remotePath,
68+
},
69+
);
70+
71+
const filePaths: string[] = [];
72+
73+
for await (const data of paginatedListObjects) {
74+
data.Contents?.forEach((content) => {
75+
if (content.Key) {
76+
filePaths.push(content.Key);
77+
}
78+
});
79+
}
80+
81+
for (const filePath of filePaths) {
82+
const localFilePath = join(
83+
localPath,
84+
filePath.substring(remotePath.length + 1),
85+
);
86+
87+
const localFilePathDir = dirname(localFilePath);
88+
const folderExists = await exists(localFilePathDir);
89+
if (!folderExists) {
90+
await mkdir(localFilePathDir, { recursive: true });
91+
}
92+
93+
await s3DownloadFile(filePath, localFilePath);
94+
}
95+
}
96+
97+
export async function s3UploadFolder(
98+
localPath: string,
99+
remotePath: string,
100+
aclPublic: boolean,
66101
) {
67-
let params: Omit<PutObjectCommandInput, "Bucket" | "Key"> | undefined;
68-
69-
switch (file.type) {
70-
case "json":
71-
params = {
72-
Body: JSON.stringify(file.data, null, 2),
73-
ContentType: "application/json",
74-
};
75-
break;
76-
case "local":
77-
params = {
78-
Body: createReadStream(file.path),
79-
};
80-
break;
81-
default:
82-
return;
102+
await s3DeleteFolder(remotePath);
103+
104+
const glob = new Glob("**/*");
105+
106+
const files: string[] = [];
107+
for await (const file of glob.scan(localPath)) {
108+
files.push(file);
83109
}
84110

111+
for (const file of files) {
112+
await s3UploadFile(
113+
`${localPath}/${file}`,
114+
`${remotePath}/${file}`,
115+
aclPublic,
116+
);
117+
}
118+
}
119+
120+
export async function s3UploadJson(data: object, remotePath: string) {
85121
const upload = new Upload({
86122
client,
87123
params: {
88-
...params,
124+
Body: JSON.stringify(data, null, 2),
125+
ContentType: "application/json",
89126
Bucket: env.S3_BUCKET,
90-
Key: remoteFilePath,
127+
Key: remotePath,
91128
},
92129
});
130+
await upload.done();
131+
}
93132

94-
upload.on("httpUploadProgress", (event) => {
95-
if (event.loaded === undefined || event.total === undefined) {
96-
return;
97-
}
98-
const value = Math.round((event.loaded / event.total) * 100);
99-
onProgress?.(value);
100-
});
133+
async function s3DeleteFolder(remotePath: string) {
134+
const paginatedListObjects = paginateListObjectsV2(
135+
{ client },
136+
{
137+
Bucket: env.S3_BUCKET,
138+
Prefix: remotePath,
139+
},
140+
);
101141

102-
await upload.done();
142+
const filePaths: string[] = [];
143+
144+
for await (const data of paginatedListObjects) {
145+
data.Contents?.forEach((content) => {
146+
if (content.Key) {
147+
filePaths.push(content.Key);
148+
}
149+
});
150+
}
151+
152+
for (const filePath of filePaths) {
153+
const command = new DeleteObjectCommand({
154+
Bucket: env.S3_BUCKET,
155+
Key: filePath,
156+
});
157+
await client.send(command);
158+
}
103159
}
104160

105161
export async function getS3SignedUrl(
@@ -110,8 +166,6 @@ export async function getS3SignedUrl(
110166
Bucket: env.S3_BUCKET,
111167
Key: remoteFilePath,
112168
});
113-
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
114-
// @ts-ignore https://github.com/aws/aws-sdk-js-v3/issues/4451
115169
const url = await getSignedUrl(client, command, {
116170
expiresIn,
117171
});

apps/artisan/src/workers/ffmpeg.ts

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { ffmpeg } from "../lib/ffmpeg";
22
import { mapInputToPublicUrl } from "../lib/file-helpers";
3-
import { uploadToS3 } from "../lib/s3";
3+
import { s3UploadFile } from "../lib/s3";
44
import type { FfmpegData, FfmpegResult, Stream, WorkerCallback } from "bolt";
55

66
export const ffmpegCallback: WorkerCallback<FfmpegData, FfmpegResult> = async ({
@@ -55,15 +55,10 @@ export const ffmpegCallback: WorkerCallback<FfmpegData, FfmpegResult> = async ({
5555
`Uploading ${outDir}/${name} to transcode/${job.data.assetId}/${name}`,
5656
);
5757

58-
await uploadToS3(
58+
await s3UploadFile(
59+
`${outDir}/${name}`,
5960
`transcode/${job.data.assetId}/${name}`,
60-
{
61-
type: "local",
62-
path: `${outDir}/${name}`,
63-
},
64-
(value) => {
65-
progressTracker.set("upload", value);
66-
},
61+
false,
6762
);
6863

6964
return {

apps/artisan/src/workers/package.ts

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { addToQueue, outcomeQueue } from "bolt";
22
import { execa } from "execa";
33
import parseFilePath from "parse-filepath";
44
import { getBinaryPath, getMetaStruct } from "../lib/file-helpers";
5-
import { syncFromS3, syncToS3 } from "../lib/s3";
5+
import { s3DownloadFolder, s3UploadFolder } from "../lib/s3";
66
import type {
77
PackageData,
88
PackageResult,
@@ -57,7 +57,7 @@ export const packageCallback: WorkerCallback<
5757
async function handleStepInitial(job: Job<PackageData>, dir: WorkerDir) {
5858
const inDir = await dir.createTempDir();
5959

60-
await syncFromS3(`transcode/${job.data.assetId}`, inDir);
60+
await s3DownloadFolder(`transcode/${job.data.assetId}`, inDir);
6161

6262
job.log(`Synced folder in ${inDir}`);
6363

@@ -146,10 +146,7 @@ async function handleStepInitial(job: Job<PackageData>, dir: WorkerDir) {
146146
const s3Dir = `package/${job.data.assetId}/${job.data.name}`;
147147
job.log(`Uploading to ${s3Dir}`);
148148

149-
await syncToS3(outDir, s3Dir, {
150-
del: true,
151-
concurrency: job.data.concurrency,
152-
});
149+
await s3UploadFolder(outDir, s3Dir, job.data.public);
153150
}
154151

155152
async function handleJobOutcome(job: Job<PackageData>) {

apps/artisan/src/workers/transcode.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import {
1212
getDefaultAudioBitrate,
1313
getDefaultVideoBitrate,
1414
} from "../lib/default-values";
15-
import { uploadToS3 } from "../lib/s3";
15+
import { s3UploadJson } from "../lib/s3";
1616
import type { MetaStruct } from "../lib/file-helpers";
1717
import type {
1818
FfmpegResult,
@@ -162,10 +162,7 @@ async function handleStepMeta(job: Job<TranscodeData>, token?: string) {
162162

163163
await job.log(`Writing meta.json (${JSON.stringify(meta)})`);
164164

165-
await uploadToS3(`transcode/${job.data.assetId}/meta.json`, {
166-
type: "json",
167-
data: meta,
168-
});
165+
await s3UploadJson(meta, `transcode/${job.data.assetId}/meta.json`);
169166
}
170167

171168
async function handleStepOutcome(job: Job<TranscodeData>) {

bun.lockb

-1.59 KB
Binary file not shown.

0 commit comments

Comments
 (0)