Skip to content

Commit 491a15f

Browse files
committed
Sync Buckets recursively
Creates missing child 'buckets' in COMS db Syncs provided bucket and all child buckets Uses object sync queue
1 parent cd5e836 commit 491a15f

File tree

5 files changed

+151
-2
lines changed

5 files changed

+151
-2
lines changed

app/src/controllers/sync.js

Lines changed: 122 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
const { NIL: SYSTEM_USER } = require('uuid');
2+
const { Permissions } = require('../components/constants');
23

34
const errorToProblem = require('../components/errorToProblem');
4-
const { addDashesToUuid, getCurrentIdentity } = require('../components/utils');
5+
const { addDashesToUuid, getCurrentIdentity, isAtPath } = require('../components/utils');
56
const utils = require('../db/models/utils');
67
const { bucketService, objectService, storageService, objectQueueService, userService } = require('../services');
78

@@ -11,6 +12,126 @@ const SERVICE = 'ObjectQueueService';
1112
* The Sync Controller
1213
*/
1314
const controller = {
15+
16+
async syncBucketRecursive(req, res, next) {
17+
try {
18+
const bucketId = addDashesToUuid(req.params.bucketId);
19+
const parentBucket = await bucketService.read(bucketId);
20+
const userId = await userService.getCurrentUserId(getCurrentIdentity(req.currentUser, SYSTEM_USER), SYSTEM_USER);
21+
22+
const [
23+
dbResponse, // buckets below the parent in COMS db
24+
s3Response // 'directories' that exist below the parent's in S3
25+
] = await Promise.all([
26+
bucketService.searchChildBuckets(parentBucket),
27+
storageService.listChildDirectories({ bucketId: bucketId })
28+
]);
29+
const dbChildBuckets = dbResponse;
30+
const s3ChildKeys = s3Response.CommonPrefixes.map(element => {
31+
return element.Prefix.replace(/\/$/, '');
32+
});
33+
34+
/**
35+
* create new 'buckets' in COMS db for each new directory found in S3
36+
* assign all permissions to user that created parent bucket
37+
*/
38+
const newDbBuckets = [];
39+
const newKeys = s3ChildKeys.filter(x => !dbChildBuckets.map(b => b.key).includes(x));
40+
for (const s3ChildKey of newKeys) {
41+
const data = {
42+
bucketName: s3ChildKey.substring(s3ChildKey.lastIndexOf('/') + 1),
43+
accessKeyId: parentBucket.accessKeyId,
44+
bucket: parentBucket.bucket,
45+
endpoint: parentBucket.endpoint,
46+
key: s3ChildKey,
47+
secretAccessKey: parentBucket.secretAccessKey,
48+
region: parentBucket.region ?? undefined,
49+
active: parentBucket.active,
50+
userId: parentBucket.createdBy ?? SYSTEM_USER,
51+
permCodes: Object.values(Permissions)
52+
};
53+
const dbResponse = await bucketService.create(data);
54+
newDbBuckets.push(dbResponse);
55+
}
56+
// Array of all buckets now in db
57+
const allBuckets = [parentBucket].concat(dbChildBuckets).concat(newDbBuckets);
58+
59+
/**
60+
* get arrays of objects from COMS db and S3 for comparison
61+
*/
62+
const [dbObjects, s3Objects] = await Promise.all([
63+
// get all objects in existing buckets in db
64+
objectService.searchObjects({
65+
bucketId: dbChildBuckets.map(b => b.bucketId)
66+
}),
67+
// list all objects (recursive) in S3
68+
storageService.listAllObjectVersions({
69+
bucketId: bucketId,
70+
filePath: parentBucket.key,
71+
precisePath: false,
72+
filterLatest: true
73+
})
74+
]);
75+
76+
/**
77+
* merge arrays of objects from COMS db and S3 to form an array of jobs with format:
78+
* [
79+
* { path: '/images/img3.jpg', bucketId: '123' },
80+
* { path: '/images/album1/img1.jpg', bucketId: '456' },
81+
* ]
82+
*/
83+
const objects = [...new Set([
84+
// objects already in database
85+
...dbObjects.data.map(object => {
86+
return {
87+
path: object.path,
88+
bucketId: object.bucketId
89+
};
90+
}),
91+
// DeleteMarkers found in S3
92+
...s3Objects.DeleteMarkers.map(object => {
93+
return {
94+
path: object.Key,
95+
bucketId: allBuckets
96+
// .filter(b => b.key === object.Key.substring(0, object.Key.lastIndexOf('/')))
97+
.filter(b => isAtPath(b.key, object.Key))
98+
.map(b => b.bucketId)[0]
99+
};
100+
}),
101+
// Versions found in S3
102+
...s3Objects.Versions
103+
.filter(v => v.Size > 0) // is an file (not a 'directory')
104+
.map(object => {
105+
return {
106+
path: object.Key,
107+
bucketId: allBuckets
108+
.filter(b => isAtPath(b.key, object.Key))
109+
.map(b => b.bucketId)[0]
110+
};
111+
})
112+
])];
113+
// remove duplicates
114+
const jobs = [...new Map(objects.map(o => [o.path, o])).values()];
115+
116+
const response = await utils.trxWrapper(async (trx) => {
117+
// update 'lastSyncRequestedDate' value in db for each bucket
118+
for (const bucket of allBuckets) {
119+
await bucketService.update({
120+
bucketId: bucket.bucketId,
121+
userId: userId,
122+
lastSyncRequestedDate: new Date().toISOString()
123+
}, trx);
124+
}
125+
// queue jobs
126+
return await objectQueueService.enqueue({ jobs: jobs }, trx);
127+
});
128+
129+
res.status(202).json(response);
130+
} catch (e) {
131+
next(errorToProblem(SERVICE, e));
132+
}
133+
},
134+
14135
/**
15136
* @function syncBucket
16137
* Synchronizes a bucket

app/src/db/models/tables/bucket.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,19 @@ class Bucket extends mixin(Model, [
4848
filterBucketName(query, value) {
4949
filterILike(query, value, 'bucket.bucketName');
5050
},
51+
filterEndpoint(query, value) {
52+
filterILike(query, value, 'bucket.endpoint');
53+
},
5154
filterKey(query, value) {
5255
filterILike(query, value, 'bucket.key');
5356
},
57+
filterKeyIsChild(query, value) {
58+
if (value && value !== '/') {
59+
query.where('bucket.key', 'like', `${value}%`);
60+
}
61+
query
62+
.where('bucket.key', '!=', value);
63+
},
5464
filterActive(query, value) {
5565
if (value !== undefined) query.where('bucket.active', value);
5666
},

app/src/routes/v1/bucket.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ router.put('/:bucketId/child', express.json(), bucketValidator.createBucketChild
5555

5656
/** Synchronizes a bucket */
5757
router.get('/:bucketId/sync', bucketValidator.syncBucket, hasPermission(Permissions.READ), (req, res, next) => {
58-
syncController.syncBucket(req, res, next);
58+
syncController.syncBucketRecursive(req, res, next);
5959
});
6060

6161
module.exports = router;

app/src/services/bucket.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,13 @@ const service = {
164164
}));
165165
},
166166

167+
searchChildBuckets: async (parentBucket) => {
168+
return Bucket.query()
169+
.modify('filterKeyIsChild', parentBucket.key)
170+
.modify('filterEndpoint', parentBucket.endpoint)
171+
.where('bucket', parentBucket.bucket);
172+
// .where('accessKeyId', parentBucket.accessKeyId)
173+
},
167174
/**
168175
* @function read
169176
* Get a bucket db record based on bucketId

app/src/services/storage.js

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,17 @@ const objectStorageService = {
397397
return this._getS3Client(data).send(new ListObjectVersionsCommand(params));
398398
},
399399

400+
async listChildDirectories({ bucketId }) {
401+
const data = await utils.getBucket(bucketId);
402+
const prefix = data.key !== DELIMITER ? data.key : '';
403+
const params = {
404+
Bucket: data.bucket,
405+
Delimiter: DELIMITER,
406+
Prefix: prefix + '/'
407+
};
408+
return this._getS3Client(data).send(new ListObjectVersionsCommand(params));
409+
},
410+
400411
/**
401412
* @function presignUrl
402413
* Generates a presigned url for the `command` with a limited expiration window

0 commit comments

Comments
 (0)