Skip to content

Commit 9b1ecc6

Browse files
authored
Optmized GCS S3 operations (#627)
* Optmized GCS S3 operations Signed-off-by: manojdeep14 <manojdeep14d@gmail.com> * Fixed lint issues Signed-off-by: manojdeep14 <manojdeep14d@gmail.com> --------- Signed-off-by: manojdeep14 <manojdeep14d@gmail.com>
1 parent 4eb6d35 commit 9b1ecc6

File tree

2 files changed

+133
-80
lines changed

2 files changed

+133
-80
lines changed

packages/server/src/service/db_utils.ts

Lines changed: 18 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,8 @@ import {
77
CloudStorageCredentials,
88
gcsConnectionToCredentials,
99
getCloudTablesWithColumns,
10-
listAllDataFilesInBucket,
11-
listCloudBuckets,
12-
listFilesInCloudDirectory,
10+
listCloudDirectorySchemas,
11+
listDataFilesInDirectory,
1312
parseCloudUri,
1413
s3ConnectionToCredentials,
1514
} from "./gcs_s3_utils";
@@ -340,23 +339,10 @@ export async function getSchemasForConnection(
340339
: s3ConnectionToCredentials(attachedDb.s3Connection!);
341340

342341
try {
343-
const buckets = await listCloudBuckets(credentials);
344-
const scheme = dbType === "gcs" ? "gs" : "s3";
345-
346-
logger.info(
347-
`Listed ${buckets.length} ${dbType.toUpperCase()} buckets for attached database ${attachedDb.name}`,
348-
);
349-
350-
// Just return bucket URIs as schemas - fast!
351-
// Files/directories will be listed when user selects a bucket
352-
return buckets.map((bucket) => ({
353-
name: `${scheme}://${bucket.name}`,
354-
isHidden: false,
355-
isDefault: false,
356-
}));
342+
return await listCloudDirectorySchemas(credentials);
357343
} catch (cloudError) {
358344
logger.warn(
359-
`Failed to list ${dbType.toUpperCase()} buckets for ${attachedDb.name}`,
345+
`Failed to list ${dbType.toUpperCase()} directory schemas for ${attachedDb.name}`,
360346
{ error: cloudError },
361347
);
362348
return [];
@@ -444,19 +430,11 @@ export async function getTablesForSchema(
444430
);
445431
}
446432

447-
let fileKeys: string[];
448-
if (directoryPath) {
449-
const fileNames = await listFilesInCloudDirectory(
450-
credentials,
451-
bucketName,
452-
directoryPath,
453-
);
454-
fileKeys = fileNames.map((fileName) => `${directoryPath}/${fileName}`);
455-
} else {
456-
fileKeys = await listAllDataFilesInBucket(credentials, bucketName);
457-
}
458-
459-
console.log("File keys:", fileKeys);
433+
const fileKeys = await listDataFilesInDirectory(
434+
credentials,
435+
bucketName,
436+
directoryPath,
437+
);
460438

461439
return await getCloudTablesWithColumns(
462440
malloyConnection,
@@ -748,15 +726,15 @@ export async function listTablesForSchema(
748726
}
749727

750728
try {
751-
if (directoryPath) {
752-
return await listFilesInCloudDirectory(
753-
credentials,
754-
bucketName,
755-
directoryPath,
756-
);
757-
} else {
758-
return await listAllDataFilesInBucket(credentials, bucketName);
759-
}
729+
const fileKeys = await listDataFilesInDirectory(
730+
credentials,
731+
bucketName,
732+
directoryPath,
733+
);
734+
return fileKeys.map((key) => {
735+
const lastSlash = key.lastIndexOf("/");
736+
return lastSlash > 0 ? key.substring(lastSlash + 1) : key;
737+
});
760738
} catch (error) {
761739
logger.error(
762740
`Error listing ${cloudType.toUpperCase()} objects in ${schemaName}`,

packages/server/src/service/gcs_s3_utils.ts

Lines changed: 115 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import { components } from "../api";
88
import { logger } from "../logger";
99

1010
type ApiTable = components["schemas"]["Table"];
11-
1211
type CloudStorageType = "gcs" | "s3";
1312

1413
export interface CloudStorageCredentials {
@@ -29,7 +28,6 @@ interface CloudStorageObject {
2928
key: string;
3029
size?: number;
3130
lastModified?: Date;
32-
isFolder: boolean;
3331
}
3432

3533
export function gcsConnectionToCredentials(gcsConnection: {
@@ -92,7 +90,7 @@ function createCloudStorageClient(
9290
return client;
9391
}
9492

95-
export async function listCloudBuckets(
93+
async function listCloudBuckets(
9694
credentials: CloudStorageCredentials,
9795
): Promise<CloudStorageBucket[]> {
9896
const client = createCloudStorageClient(credentials);
@@ -143,7 +141,6 @@ async function listAllCloudFiles(
143141
key: content.Key,
144142
size: content.Size,
145143
lastModified: content.LastModified,
146-
isFolder: false,
147144
});
148145
}
149146
}
@@ -182,6 +179,15 @@ function isDataFile(key: string): boolean {
182179
);
183180
}
184181

182+
function buildCloudUri(
183+
type: CloudStorageType,
184+
bucket: string,
185+
key: string,
186+
): string {
187+
const scheme = type === "gcs" ? "gs" : "s3";
188+
return `${scheme}://${bucket}/${key}`;
189+
}
190+
185191
function getFileType(key: string): string {
186192
const lowerKey = key.toLowerCase();
187193
if (lowerKey.endsWith(".csv")) return "csv";
@@ -192,23 +198,14 @@ function getFileType(key: string): string {
192198
return "unknown";
193199
}
194200

195-
function buildCloudUri(
196-
type: CloudStorageType,
197-
bucket: string,
198-
key: string,
199-
): string {
200-
const scheme = type === "gcs" ? "gs" : "s3";
201-
return `${scheme}://${bucket}/${key}`;
202-
}
203-
204201
function standardizeRunSQLResult(result: unknown): unknown[] {
205202
return Array.isArray(result)
206203
? result
207204
: (result as { rows?: unknown[] }).rows || [];
208205
}
209206

210-
// Batch size for parallel schema fetching to avoid overwhelming the connection
211207
const SCHEMA_FETCH_BATCH_SIZE = 10;
208+
const BUCKET_SCAN_BATCH_SIZE = 3;
212209

213210
async function getTableSchema(
214211
malloyConnection: Connection,
@@ -268,11 +265,9 @@ export async function getCloudTablesWithColumns(
268265
): Promise<ApiTable[]> {
269266
const allTables: ApiTable[] = [];
270267

271-
// Process in batches to avoid overwhelming the connection
272268
for (let i = 0; i < fileKeys.length; i += SCHEMA_FETCH_BATCH_SIZE) {
273269
const batch = fileKeys.slice(i, i + SCHEMA_FETCH_BATCH_SIZE);
274270

275-
// Process batch in parallel
276271
const batchResults = await Promise.all(
277272
batch.map((fileKey) =>
278273
getTableSchema(malloyConnection, credentials, bucketName, fileKey),
@@ -315,38 +310,118 @@ export function parseCloudUri(uri: string): {
315310
return null;
316311
}
317312

318-
export async function listFilesInCloudDirectory(
313+
export async function listDataFilesInDirectory(
319314
credentials: CloudStorageCredentials,
320315
bucketName: string,
321316
directoryPath: string,
322317
): Promise<string[]> {
323-
const files = await listAllCloudFiles(credentials, bucketName);
324-
325-
const filesInDirectory = files
326-
.filter((obj) => {
327-
if (!isDataFile(obj.key)) return false;
328-
329-
const lastSlashIndex = obj.key.lastIndexOf("/");
330-
const fileDir =
331-
lastSlashIndex > 0 ? obj.key.substring(0, lastSlashIndex) : "";
332-
333-
return fileDir === directoryPath;
334-
})
335-
.map((obj) => {
336-
const lastSlashIndex = obj.key.lastIndexOf("/");
337-
return lastSlashIndex > 0
338-
? obj.key.substring(lastSlashIndex + 1)
339-
: obj.key;
340-
});
318+
const prefix = directoryPath ? `${directoryPath}/` : "";
319+
const client = createCloudStorageClient(credentials);
320+
const storageType = credentials.type.toUpperCase();
321+
const dataFiles: string[] = [];
322+
323+
try {
324+
let continuationToken: string | undefined;
325+
326+
do {
327+
const response = await client.send(
328+
new ListObjectsV2Command({
329+
Bucket: bucketName,
330+
Prefix: prefix,
331+
Delimiter: "/",
332+
ContinuationToken: continuationToken,
333+
}),
334+
);
335+
336+
for (const content of response.Contents || []) {
337+
if (content.Key && isDataFile(content.Key)) {
338+
dataFiles.push(content.Key);
339+
}
340+
}
341341

342-
return filesInDirectory;
342+
continuationToken = response.IsTruncated
343+
? response.NextContinuationToken
344+
: undefined;
345+
} while (continuationToken);
346+
347+
logger.info(
348+
`Listed ${dataFiles.length} data files in ${storageType} ${bucketName}/${directoryPath}`,
349+
);
350+
return dataFiles;
351+
} catch (error) {
352+
logger.error(
353+
`Failed to list files in ${storageType} ${bucketName}/${directoryPath}`,
354+
{ error },
355+
);
356+
throw new Error(
357+
`Failed to list files in ${storageType} ${bucketName}/${directoryPath}: ${error instanceof Error ? error.message : String(error)}`,
358+
);
359+
}
343360
}
344361

345-
// List all data files in a bucket with their full relative paths
346-
export async function listAllDataFilesInBucket(
362+
/**
363+
* Scans an entire bucket and returns unique directory paths that contain data files.
364+
* Uses flat listing for efficiency — O(total_files / 1000) API calls.
365+
*/
366+
async function listDirectorySchemas(
347367
credentials: CloudStorageCredentials,
348368
bucketName: string,
349369
): Promise<string[]> {
350-
const files = await listAllCloudFiles(credentials, bucketName);
351-
return files.filter((obj) => isDataFile(obj.key)).map((obj) => obj.key);
370+
const allFiles = await listAllCloudFiles(credentials, bucketName);
371+
const directories = new Set<string>();
372+
373+
for (const file of allFiles) {
374+
if (!isDataFile(file.key)) continue;
375+
376+
const lastSlashIndex = file.key.lastIndexOf("/");
377+
const dir =
378+
lastSlashIndex > 0 ? file.key.substring(0, lastSlashIndex) : "";
379+
directories.add(dir);
380+
}
381+
382+
const scheme = credentials.type === "gcs" ? "gs" : "s3";
383+
const sortedDirs = Array.from(directories).sort();
384+
385+
logger.info(
386+
`Found ${sortedDirs.length} directories with data files in ${credentials.type.toUpperCase()} bucket ${bucketName}`,
387+
);
388+
389+
return sortedDirs.map((dir) =>
390+
dir ? `${scheme}://${bucketName}/${dir}` : `${scheme}://${bucketName}`,
391+
);
392+
}
393+
394+
export async function listCloudDirectorySchemas(
395+
credentials: CloudStorageCredentials,
396+
): Promise<{ name: string; isHidden: boolean; isDefault: boolean }[]> {
397+
const storageType = credentials.type.toUpperCase();
398+
const buckets = await listCloudBuckets(credentials);
399+
400+
logger.info(
401+
`Listed ${buckets.length} ${storageType} buckets, scanning for directories...`,
402+
);
403+
404+
const allDirArrays: string[][] = [];
405+
406+
for (let i = 0; i < buckets.length; i += BUCKET_SCAN_BATCH_SIZE) {
407+
const batch = buckets.slice(i, i + BUCKET_SCAN_BATCH_SIZE);
408+
const batchResults = await Promise.all(
409+
batch.map((bucket) =>
410+
listDirectorySchemas(credentials, bucket.name).catch((err) => {
411+
logger.warn(
412+
`Failed to scan ${storageType} bucket ${bucket.name}`,
413+
{ error: err },
414+
);
415+
return [] as string[];
416+
}),
417+
),
418+
);
419+
allDirArrays.push(...batchResults);
420+
}
421+
422+
return allDirArrays.flat().map((dirUri) => ({
423+
name: dirUri,
424+
isHidden: false,
425+
isDefault: false,
426+
}));
352427
}

0 commit comments

Comments
 (0)