Skip to content

Commit a2e7ba8

Browse files
committed
Recursive Bucket Sync option
1 parent cd5e836 commit a2e7ba8

File tree

6 files changed

+214
-12
lines changed

6 files changed

+214
-12
lines changed

app/src/controllers/sync.js

Lines changed: 181 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,197 @@
11
const { NIL: SYSTEM_USER } = require('uuid');
22

33
const errorToProblem = require('../components/errorToProblem');
4-
const { addDashesToUuid, getCurrentIdentity } = require('../components/utils');
4+
const { addDashesToUuid, getCurrentIdentity, isAtPath, isTruthy } = require('../components/utils');
55
const utils = require('../db/models/utils');
6-
const { bucketService, objectService, storageService, objectQueueService, userService } = require('../services');
6+
const {
7+
bucketPermissionService,
8+
bucketService,
9+
objectService,
10+
storageService,
11+
objectQueueService,
12+
userService
13+
} = require('../services');
714

815
const SERVICE = 'ObjectQueueService';
916

1017
/**
1118
* The Sync Controller
1219
*/
1320
const controller = {
21+
1422
/**
1523
* @function syncBucket
16-
* Synchronizes a bucket
24+
* Synchronizes COMS db with bucket and objects in S3
25+
* calls subsequent controller functions depending on whether doing a 'resursive' sync
1726
* @param {object} req Express request object
1827
* @param {object} res Express response object
1928
* @param {function} next The next callback function
2029
* @returns {function} Express middleware function
2130
*/
2231
async syncBucket(req, res, next) {
2332
try {
24-
// TODO: Consider adding an "all" mode for checking through all known objects and buckets for job enumeration
25-
// const allMode = isTruthy(req.query.all);
33+
if (!isTruthy(req.query.recursive)) {
34+
this.syncBucketRecursive(req, res, next);
35+
}
36+
else {
37+
this.syncBucketSingle(req, res, next);
38+
}
39+
} catch (e) {
40+
next(errorToProblem(SERVICE, e));
41+
}
42+
},
43+
44+
/**
45+
* @function syncBucketRecursive
46+
* Synchronizes all objects and subfolders found at the Key and below for the given parent folder (bucket)
47+
* NOTE: OIDC users reuire MANAGE permission to do a recursive sync on a folder
48+
* All their permissions will be copied to any new sub-folders created
49+
* @param {object} req Express request object
50+
* @param {object} res Express response object
51+
* @param {function} next The next callback function
52+
* @returns {function} Express middleware function
53+
*/
54+
async syncBucketRecursive(req, res, next) {
55+
try {
56+
const userId = await userService.getCurrentUserId(getCurrentIdentity(req.currentUser, SYSTEM_USER), SYSTEM_USER);
57+
const bucketId = addDashesToUuid(req.params.bucketId);
58+
const parentBucket = await bucketService.read(bucketId);
59+
// current user's permissions on parent folder
60+
const currentUserParentBucketPerms = userId !== SYSTEM_USER ? (await bucketPermissionService.searchPermissions({
61+
bucketId: parentBucket.bucketId,
62+
userId: userId
63+
})).map(p => p.permCode) : [];
64+
65+
/**
66+
* get the two following lists for comparison:
67+
*/
68+
// get parent + child bucket records already in COMS db
69+
const dbChildBuckets = await bucketService.searchChildBuckets(parentBucket);
70+
let dbBuckets = [parentBucket].concat(dbChildBuckets);
71+
72+
// get 'folders' that exist below (and including) the parent 'folder'
73+
const s3Response = await storageService.listAllObjectVersions({ bucketId: bucketId, precisePath: false });
74+
const formatS3KeyForCompare = (k => {
75+
let key = k.substr(0, k.lastIndexOf('/')); // remove trailing slash and file name
76+
return key ? key : '/'; // if parent is root set as '/' to match convention in COMS db
77+
});
78+
const s3Keys = [...new Set([
79+
...s3Response.DeleteMarkers.map(object => formatS3KeyForCompare(object.Key)),
80+
...s3Response.Versions.map(object => formatS3KeyForCompare(object.Key)),
81+
])];
82+
// console.log('s3Keys', s3Keys);
83+
84+
85+
/**
86+
* compare each list and sync (ie create or delete) bucket records in COMS db to match 'folders' that exist in S3
87+
* note: we assign all permissions to all users that created parent bucket
88+
*/
89+
// delete buckets not found in S3 from COMS db
90+
const oldDbBuckets = dbBuckets.filter(b => !s3Keys.includes(b.key));
91+
for (const dbBucket of oldDbBuckets) {
92+
await bucketService.delete(dbBucket.bucketId);
93+
dbBuckets = dbBuckets.filter(b => b.bucketId != dbBucket.bucketId);
94+
}
95+
// Create buckets only found in S3 in COMS db
96+
const newS3Keys = s3Keys.filter(k => !dbBuckets.map(b => b.key).includes(k));
97+
for (const s3Key of newS3Keys) {
98+
const data = {
99+
bucketName: s3Key.substring(s3Key.lastIndexOf('/') + 1),
100+
accessKeyId: parentBucket.accessKeyId,
101+
bucket: parentBucket.bucket,
102+
endpoint: parentBucket.endpoint,
103+
key: s3Key,
104+
secretAccessKey: parentBucket.secretAccessKey,
105+
region: parentBucket.region ?? undefined,
106+
active: parentBucket.active,
107+
userId: parentBucket.createdBy ?? SYSTEM_USER,
108+
// if current user has MANAGE perm on parent folder, copy all their perms to new subfolders
109+
// TODO: consider copy all user's permissions to new sub-folders
110+
permCodes: currentUserParentBucketPerms.includes('MANAGE') ? currentUserParentBucketPerms : []
111+
};
112+
const dbResponse = await bucketService.create(data);
113+
dbBuckets.push(dbResponse);
114+
}
115+
116+
/**
117+
* Sync all the objects found in all the parent and child 'folders'.
118+
* by comparing objects in COMS db with the keys of the object found in S3
119+
*/
120+
// get all objects in existing buckets in all 'buckets' in COMS db
121+
const dbObjects = await objectService.searchObjects({
122+
bucketId: dbBuckets.map(b => b.bucketId)
123+
});
124+
// get all objects below parent 'key' in S3
125+
const s3Objects = s3Response;
126+
127+
/**
128+
* merge arrays of objects from COMS db and S3 to form an array of jobs with format:
129+
* [ { path: '/images/img3.jpg', bucketId: '123' }, { path: '/images/album1/img1.jpg', bucketId: '456' } ]
130+
*/
131+
const objects = [...new Set([
132+
// objects already in database
133+
...dbObjects.data.map(object => {
134+
return {
135+
path: object.path,
136+
bucketId: object.bucketId
137+
};
138+
}),
139+
// DeleteMarkers found in S3
140+
...s3Objects.DeleteMarkers.map(object => {
141+
return {
142+
path: object.Key,
143+
bucketId: dbBuckets
144+
.filter(b => isAtPath(b.key, object.Key))
145+
.map(b => b.bucketId)[0]
146+
};
147+
}),
148+
// Versions found in S3
149+
...s3Objects.Versions
150+
.filter(v => v.Size > 0) // is an file (not a 'directory')
151+
.map(object => {
152+
return {
153+
path: object.Key,
154+
bucketId: dbBuckets
155+
.filter(b => isAtPath(b.key, object.Key))
156+
.map(b => b.bucketId)[0],
157+
// adding current userId will give ALL perms on new objects
158+
// and set createdBy on all downstream resources (versions, tags, meta)
159+
// userId: userId
160+
};
161+
})
162+
])];
163+
// merge and remove duplicates
164+
const jobs = [...new Map(objects.map(o => [o.path, o])).values()];
165+
166+
// create jobs in COMS db object_queue for each object
167+
const response = await utils.trxWrapper(async (trx) => {
168+
// update 'lastSyncRequestedDate' value in COMS db for each bucket
169+
for (const bucket of dbBuckets) {
170+
await bucketService.update({
171+
bucketId: bucket.bucketId,
172+
userId: userId,
173+
lastSyncRequestedDate: new Date().toISOString()
174+
}, trx);
175+
}
176+
return await objectQueueService.enqueue({ jobs: jobs }, trx);
177+
});
178+
// return number of jobs inserted
179+
res.status(202).json(response);
180+
} catch (e) {
181+
next(errorToProblem(SERVICE, e));
182+
}
183+
},
184+
185+
/**
186+
* @function syncBucketSingle
187+
* Synchronizes objects found at the Key of the given bucket, ignoring subfolders and files after the next delimiter
188+
* @param {object} req Express request object
189+
* @param {object} res Express response object
190+
* @param {function} next The next callback function
191+
* @returns {function} Express middleware function
192+
*/
193+
async syncBucketSingle(req, res, next) {
194+
try {
26195
const bucketId = addDashesToUuid(req.params.bucketId);
27196
const userId = await userService.getCurrentUserId(getCurrentIdentity(req.currentUser, SYSTEM_USER), SYSTEM_USER);
28197

@@ -36,7 +205,13 @@ const controller = {
36205
...dbResponse.data.map(object => object.path),
37206
...s3Response.DeleteMarkers.map(object => object.Key),
38207
...s3Response.Versions.map(object => object.Key)
39-
])].map(path => ({ path: path, bucketId: bucketId }));
208+
])].map(path => ({
209+
path: path,
210+
bucketId: bucketId
211+
// adding current userId will give ALL perms on new objects
212+
// and set createdBy on all downstream resources (versions, tags, meta)
213+
// userId: userId
214+
}));
40215

41216
const response = await utils.trxWrapper(async (trx) => {
42217
await bucketService.update({

app/src/db/models/tables/bucket.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,19 @@ class Bucket extends mixin(Model, [
4848
filterBucketName(query, value) {
4949
filterILike(query, value, 'bucket.bucketName');
5050
},
51+
filterEndpoint(query, value) {
52+
filterILike(query, value, 'bucket.endpoint');
53+
},
5154
filterKey(query, value) {
5255
filterILike(query, value, 'bucket.key');
5356
},
57+
filterKeyIsChild(query, value) {
58+
if (value && value !== '/') {
59+
query.where('bucket.key', 'like', `${value}%`);
60+
}
61+
query
62+
.where('bucket.key', '!=', value);
63+
},
5464
filterActive(query, value) {
5565
if (value !== undefined) query.where('bucket.active', value);
5666
},

app/src/services/bucket.js

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,20 @@ const service = {
164164
}));
165165
},
166166

167+
/**
168+
* @function searchChildBuckets
169+
* Get db records for each bucket that acts as a sub-folder of the provided bucket
170+
* @param {object} parentBucket a bucket model (record) from the COMS db
171+
* @returns {Promise<object[]>} An array of bucket records
172+
* @throws If there are no records found
173+
*/
174+
searchChildBuckets: async (parentBucket) => {
175+
return Bucket.query()
176+
.modify('filterKeyIsChild', parentBucket.key)
177+
.modify('filterEndpoint', parentBucket.endpoint)
178+
.where('bucket', parentBucket.bucket);
179+
},
180+
167181
/**
168182
* @function read
169183
* Get a bucket db record based on bucketId

app/src/services/objectQueue.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ const service = {
3737
* Adds a set of jobs to the object queue. If the same job already exists, the existing
3838
* job will take precedence and will not be reinserted.
3939
* @param {object[]} options.jobs An array of job objects typically containing `path` and `bucketId` attributes
40-
* @param {boolean} [options.full=false] Optional boolean indicating whether to execute full recursive run
40+
* @param {boolean} [options.full=false] Optional boolean whether to sync related versions, tags, metadata
4141
* @param {integer} [options.retries=0] Optional integer indicating how many previous retries this job has had
4242
* @param {string} [options.createdBy=SYSTEM_USER] Optional uuid attributing which user added the job
4343
* @param {object} [etrx=undefined] Optional Objection Transaction object

app/src/validators/bucket.js

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ const schema = {
5959
syncBucket: {
6060
params: Joi.object({
6161
bucketId: type.uuidv4.required()
62+
}),
63+
query: Joi.object({
64+
recursive: type.truthy,
6265
})
6366
},
6467

@@ -84,7 +87,7 @@ const validator = {
8487
deleteBucket: validate(schema.deleteBucket),
8588
headBucket: validate(schema.headBucket),
8689
readBucket: validate(schema.readBucket),
87-
syncBucket: validate(schema.readBucket),
90+
syncBucket: validate(schema.syncBucket),
8891
searchBuckets: validate(schema.searchBuckets),
8992
updateBucket: validate(schema.updateBucket)
9093
};

app/tests/unit/controllers/sync.spec.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ afterEach(() => {
3030
jest.resetAllMocks();
3131
});
3232

33-
describe('syncBucket', () => {
33+
describe('syncBucketSingle', () => {
3434
const enqueueSpy = jest.spyOn(objectQueueService, 'enqueue');
3535
const getCurrentIdentitySpy = jest.spyOn(utils, 'getCurrentIdentity');
3636
const getCurrentUserIdSpy = jest.spyOn(userService, 'getCurrentUserId');
@@ -40,7 +40,7 @@ describe('syncBucket', () => {
4040
const updateSpy = jest.spyOn(bucketService, 'update');
4141
const next = jest.fn();
4242

43-
it('should enqueue all objects in a bucket', async () => {
43+
it('should enqueue all objects in in current \'directory\' in bucket', async () => {
4444
const USR_IDENTITY = 'xxxy';
4545
const USR_ID = 'abc-123';
4646
const req = {
@@ -58,7 +58,7 @@ describe('syncBucket', () => {
5858
trxWrapperSpy.mockImplementation(callback => callback({}));
5959
updateSpy.mockResolvedValue({});
6060

61-
await controller.syncBucket(req, res, next);
61+
await controller.syncBucketSingle(req, res, next);
6262

6363
expect(enqueueSpy).toHaveBeenCalledTimes(1);
6464
expect(listAllObjectVersionsSpy).toHaveBeenCalledTimes(1);
@@ -76,7 +76,7 @@ describe('syncBucket', () => {
7676
listAllObjectVersionsSpy.mockImplementation(() => { throw new Error('error'); });
7777
searchObjectsSpy.mockResolvedValue([{ path: path }]);
7878

79-
await controller.syncBucket(req, res, next);
79+
await controller.syncBucketSingle(req, res, next);
8080

8181
expect(enqueueSpy).toHaveBeenCalledTimes(0);
8282
expect(listAllObjectVersionsSpy).toHaveBeenCalledTimes(1);

0 commit comments

Comments
 (0)