Skip to content

Commit c8a5a8b

Browse files
committed
Recursive Bucket Sync option
1 parent cd5e836 commit c8a5a8b

File tree

7 files changed

+216
-18
lines changed

7 files changed

+216
-18
lines changed

app/src/controllers/sync.js

Lines changed: 164 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,178 @@
11
const { NIL: SYSTEM_USER } = require('uuid');
22

33
const errorToProblem = require('../components/errorToProblem');
4-
const { addDashesToUuid, getCurrentIdentity } = require('../components/utils');
4+
const { addDashesToUuid, getCurrentIdentity, isAtPath, isTruthy } = require('../components/utils');
55
const utils = require('../db/models/utils');
6-
const { bucketService, objectService, storageService, objectQueueService, userService } = require('../services');
6+
const {
7+
bucketPermissionService,
8+
bucketService,
9+
objectService,
10+
storageService,
11+
objectQueueService,
12+
userService
13+
} = require('../services');
714

815
const SERVICE = 'ObjectQueueService';
916

1017
/**
1118
* The Sync Controller
1219
*/
1320
const controller = {
21+
22+
/**
23+
* @function syncBucketRecursive
24+
* Synchronizes all objects and subfolders found at the Key and below for the given parent folder (bucket)
25+
* NOTE: OIDC users reuire MANAGE permission to do a recursive sync on a folder
26+
* All their permissions will be copied to any NEW sub-folders created
27+
* @param {object} req Express request object
28+
* @param {object} res Express response object
29+
* @param {function} next The next callback function
30+
* @returns {function} Express middleware function
31+
*/
32+
async syncBucketRecursive(req, res, next) {
33+
try {
34+
const userId = await userService.getCurrentUserId(getCurrentIdentity(req.currentUser, SYSTEM_USER), SYSTEM_USER);
35+
const bucketId = addDashesToUuid(req.params.bucketId);
36+
const parentBucket = await bucketService.read(bucketId);
37+
// current user's permissions on parent folder
38+
const currentUserParentBucketPerms = userId !== SYSTEM_USER ? (await bucketPermissionService.searchPermissions({
39+
bucketId: parentBucket.bucketId,
40+
userId: userId
41+
})).map(p => p.permCode) : [];
42+
43+
/**
44+
* get the two following lists for comparison:
45+
*/
46+
// get parent + child bucket records already in COMS db
47+
const dbChildBuckets = await bucketService.searchChildBuckets(parentBucket);
48+
let dbBuckets = [parentBucket].concat(dbChildBuckets);
49+
50+
// get 'folders' that exist below (and including) the parent 'folder'
51+
const s3Response = await storageService.listAllObjectVersions({ bucketId: bucketId, precisePath: false });
52+
const formatS3KeyForCompare = (k => {
53+
let key = k.substr(0, k.lastIndexOf('/')); // remove trailing slash and file name
54+
return key ? key : '/'; // if parent is root set as '/' to match convention in COMS db
55+
});
56+
const s3Keys = [...new Set([
57+
...s3Response.DeleteMarkers.map(object => formatS3KeyForCompare(object.Key)),
58+
...s3Response.Versions.map(object => formatS3KeyForCompare(object.Key)),
59+
])];
60+
// console.log('s3Keys', s3Keys);
61+
62+
63+
/**
64+
* compare each list and sync (ie create or delete) bucket records in COMS db to match 'folders' that exist in S3
65+
* note: we assign all permissions to all users that created parent bucket
66+
*/
67+
// delete buckets not found in S3 from COMS db
68+
const oldDbBuckets = dbBuckets.filter(b => !s3Keys.includes(b.key));
69+
for (const dbBucket of oldDbBuckets) {
70+
await bucketService.delete(dbBucket.bucketId);
71+
dbBuckets = dbBuckets.filter(b => b.bucketId != dbBucket.bucketId);
72+
}
73+
// Create buckets only found in S3 in COMS db
74+
const newS3Keys = s3Keys.filter(k => !dbBuckets.map(b => b.key).includes(k));
75+
for (const s3Key of newS3Keys) {
76+
const data = {
77+
bucketName: s3Key.substring(s3Key.lastIndexOf('/') + 1),
78+
accessKeyId: parentBucket.accessKeyId,
79+
bucket: parentBucket.bucket,
80+
endpoint: parentBucket.endpoint,
81+
key: s3Key,
82+
secretAccessKey: parentBucket.secretAccessKey,
83+
region: parentBucket.region ?? undefined,
84+
active: parentBucket.active,
85+
userId: parentBucket.createdBy ?? SYSTEM_USER,
86+
// current user has MANAGE perm on parent folder (see route.hasPermission)
87+
// ..so copy all their perms to NEW subfolders
88+
permCodes: currentUserParentBucketPerms
89+
};
90+
const dbResponse = await bucketService.create(data);
91+
dbBuckets.push(dbResponse);
92+
}
93+
94+
/**
95+
* Sync all the objects found in all the parent and child 'folders'.
96+
* by comparing objects in COMS db with the keys of the object found in S3
97+
*/
98+
// get all objects in existing buckets in all 'buckets' in COMS db
99+
const dbObjects = await objectService.searchObjects({
100+
bucketId: dbBuckets.map(b => b.bucketId)
101+
});
102+
// get all objects below parent 'key' in S3
103+
const s3Objects = s3Response;
104+
105+
console.log('s3Objects', s3Objects);
106+
107+
108+
/**
109+
* merge arrays of objects from COMS db and S3 to form an array of jobs with format:
110+
* [ { path: '/images/img3.jpg', bucketId: '123' }, { path: '/images/album1/img1.jpg', bucketId: '456' } ]
111+
*/
112+
const objects = [...new Set([
113+
// objects already in database
114+
...dbObjects.data.map(object => {
115+
return {
116+
path: object.path,
117+
bucketId: object.bucketId
118+
};
119+
}),
120+
// DeleteMarkers found in S3
121+
...s3Objects.DeleteMarkers.map(object => {
122+
return {
123+
path: object.Key,
124+
bucketId: dbBuckets
125+
.filter(b => isAtPath(b.key, object.Key))
126+
.map(b => b.bucketId)[0]
127+
};
128+
}),
129+
// Versions found in S3
130+
...s3Objects.Versions
131+
.filter(v => v.Size > 0) // is an file (not a 'directory')
132+
.map(object => {
133+
return {
134+
path: object.Key,
135+
bucketId: dbBuckets
136+
.filter(b => isAtPath(b.key, object.Key))
137+
.map(b => b.bucketId)[0],
138+
// adding current userId will give ALL perms on new objects
139+
// and set createdBy on all downstream resources (versions, tags, meta)
140+
// userId: userId
141+
};
142+
})
143+
])];
144+
// merge and remove duplicates
145+
const jobs = [...new Map(objects.map(o => [o.path, o])).values()];
146+
147+
// create jobs in COMS db object_queue for each object
148+
const response = await utils.trxWrapper(async (trx) => {
149+
// update 'lastSyncRequestedDate' value in COMS db for each bucket
150+
for (const bucket of dbBuckets) {
151+
await bucketService.update({
152+
bucketId: bucket.bucketId,
153+
userId: userId,
154+
lastSyncRequestedDate: new Date().toISOString()
155+
}, trx);
156+
}
157+
return await objectQueueService.enqueue({ jobs: jobs }, trx);
158+
});
159+
// return number of jobs inserted
160+
res.status(202).json(response);
161+
} catch (e) {
162+
next(errorToProblem(SERVICE, e));
163+
}
164+
},
165+
14166
/**
15-
* @function syncBucket
16-
* Synchronizes a bucket
167+
* @function syncBucketSingle
168+
* Synchronizes objects found at the Key of the given bucket, ignoring subfolders and files after the next delimiter
17169
* @param {object} req Express request object
18170
* @param {object} res Express response object
19171
* @param {function} next The next callback function
20172
* @returns {function} Express middleware function
21173
*/
22-
async syncBucket(req, res, next) {
174+
async syncBucketSingle(req, res, next) {
23175
try {
24-
// TODO: Consider adding an "all" mode for checking through all known objects and buckets for job enumeration
25-
// const allMode = isTruthy(req.query.all);
26176
const bucketId = addDashesToUuid(req.params.bucketId);
27177
const userId = await userService.getCurrentUserId(getCurrentIdentity(req.currentUser, SYSTEM_USER), SYSTEM_USER);
28178

@@ -36,7 +186,13 @@ const controller = {
36186
...dbResponse.data.map(object => object.path),
37187
...s3Response.DeleteMarkers.map(object => object.Key),
38188
...s3Response.Versions.map(object => object.Key)
39-
])].map(path => ({ path: path, bucketId: bucketId }));
189+
])].map(path => ({
190+
path: path,
191+
bucketId: bucketId
192+
// adding current userId will give ALL perms on new objects
193+
// and set createdBy on all downstream resources (versions, tags, meta)
194+
// userId: userId
195+
}));
40196

41197
const response = await utils.trxWrapper(async (trx) => {
42198
await bucketService.update({

app/src/db/models/tables/bucket.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,19 @@ class Bucket extends mixin(Model, [
4848
filterBucketName(query, value) {
4949
filterILike(query, value, 'bucket.bucketName');
5050
},
51+
filterEndpoint(query, value) {
52+
filterILike(query, value, 'bucket.endpoint');
53+
},
5154
filterKey(query, value) {
5255
filterILike(query, value, 'bucket.key');
5356
},
57+
filterKeyIsChild(query, value) {
58+
if (value && value !== '/') {
59+
query.where('bucket.key', 'like', `${value}%`);
60+
}
61+
query
62+
.where('bucket.key', '!=', value);
63+
},
5464
filterActive(query, value) {
5565
if (value !== undefined) query.where('bucket.active', value);
5666
},

app/src/routes/v1/bucket.js

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ const express = require('express');
22
const router = express.Router();
33

44
const { Permissions } = require('../../components/constants');
5+
const { isTruthy } = require('../../components/utils');
56
const { bucketController, syncController } = require('../../controllers');
67
const { bucketValidator } = require('../../validators');
78
const { requireSomeAuth } = require('../../middleware/featureToggle');
@@ -53,9 +54,23 @@ router.put('/:bucketId/child', express.json(), bucketValidator.createBucketChild
5354
}
5455
);
5556

56-
/** Synchronizes a bucket */
57-
router.get('/:bucketId/sync', bucketValidator.syncBucket, hasPermission(Permissions.READ), (req, res, next) => {
58-
syncController.syncBucket(req, res, next);
59-
});
57+
/**
58+
* Synchronizes a bucket
59+
* if doing 'recursive sync', check for MANAGE permission and call syncBucketRecursive
60+
* else skip to next route for this path
61+
* ref: https://expressjs.com/en/guide/using-middleware.html
62+
*/
63+
router.get('/:bucketId/sync',
64+
bucketValidator.syncBucket,
65+
(req, _res, next) => {
66+
if (isTruthy(req.query.recursive)) next();
67+
else next('route');
68+
},
69+
hasPermission(Permissions.MANAGE),
70+
(req, res, next) => syncController.syncBucketRecursive(req, res, next));
71+
72+
router.get('/:bucketId/sync',
73+
hasPermission(Permissions.READ),
74+
(req, res, next) => syncController.syncBucketSingle(req, res, next));
6075

6176
module.exports = router;

app/src/services/bucket.js

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,20 @@ const service = {
164164
}));
165165
},
166166

167+
/**
168+
* @function searchChildBuckets
169+
* Get db records for each bucket that acts as a sub-folder of the provided bucket
170+
* @param {object} parentBucket a bucket model (record) from the COMS db
171+
* @returns {Promise<object[]>} An array of bucket records
172+
* @throws If there are no records found
173+
*/
174+
searchChildBuckets: async (parentBucket) => {
175+
return Bucket.query()
176+
.modify('filterKeyIsChild', parentBucket.key)
177+
.modify('filterEndpoint', parentBucket.endpoint)
178+
.where('bucket', parentBucket.bucket);
179+
},
180+
167181
/**
168182
* @function read
169183
* Get a bucket db record based on bucketId

app/src/services/objectQueue.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ const service = {
3737
* Adds a set of jobs to the object queue. If the same job already exists, the existing
3838
* job will take precedence and will not be reinserted.
3939
* @param {object[]} options.jobs An array of job objects typically containing `path` and `bucketId` attributes
40-
* @param {boolean} [options.full=false] Optional boolean indicating whether to execute full recursive run
40+
* @param {boolean} [options.full=false] Optional boolean whether to sync related versions, tags, metadata
4141
* @param {integer} [options.retries=0] Optional integer indicating how many previous retries this job has had
4242
* @param {string} [options.createdBy=SYSTEM_USER] Optional uuid attributing which user added the job
4343
* @param {object} [etrx=undefined] Optional Objection Transaction object

app/src/validators/bucket.js

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ const schema = {
5959
syncBucket: {
6060
params: Joi.object({
6161
bucketId: type.uuidv4.required()
62+
}),
63+
query: Joi.object({
64+
recursive: type.truthy,
6265
})
6366
},
6467

@@ -84,7 +87,7 @@ const validator = {
8487
deleteBucket: validate(schema.deleteBucket),
8588
headBucket: validate(schema.headBucket),
8689
readBucket: validate(schema.readBucket),
87-
syncBucket: validate(schema.readBucket),
90+
syncBucket: validate(schema.syncBucket),
8891
searchBuckets: validate(schema.searchBuckets),
8992
updateBucket: validate(schema.updateBucket)
9093
};

app/tests/unit/controllers/sync.spec.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ afterEach(() => {
3030
jest.resetAllMocks();
3131
});
3232

33-
describe('syncBucket', () => {
33+
describe('syncBucketSingle', () => {
3434
const enqueueSpy = jest.spyOn(objectQueueService, 'enqueue');
3535
const getCurrentIdentitySpy = jest.spyOn(utils, 'getCurrentIdentity');
3636
const getCurrentUserIdSpy = jest.spyOn(userService, 'getCurrentUserId');
@@ -40,7 +40,7 @@ describe('syncBucket', () => {
4040
const updateSpy = jest.spyOn(bucketService, 'update');
4141
const next = jest.fn();
4242

43-
it('should enqueue all objects in a bucket', async () => {
43+
it('should enqueue all objects in in current \'directory\' in bucket', async () => {
4444
const USR_IDENTITY = 'xxxy';
4545
const USR_ID = 'abc-123';
4646
const req = {
@@ -58,7 +58,7 @@ describe('syncBucket', () => {
5858
trxWrapperSpy.mockImplementation(callback => callback({}));
5959
updateSpy.mockResolvedValue({});
6060

61-
await controller.syncBucket(req, res, next);
61+
await controller.syncBucketSingle(req, res, next);
6262

6363
expect(enqueueSpy).toHaveBeenCalledTimes(1);
6464
expect(listAllObjectVersionsSpy).toHaveBeenCalledTimes(1);
@@ -76,7 +76,7 @@ describe('syncBucket', () => {
7676
listAllObjectVersionsSpy.mockImplementation(() => { throw new Error('error'); });
7777
searchObjectsSpy.mockResolvedValue([{ path: path }]);
7878

79-
await controller.syncBucket(req, res, next);
79+
await controller.syncBucketSingle(req, res, next);
8080

8181
expect(enqueueSpy).toHaveBeenCalledTimes(0);
8282
expect(listAllObjectVersionsSpy).toHaveBeenCalledTimes(1);

0 commit comments

Comments
 (0)