Skip to content

Commit f543335

Browse files
author
Kerkesni
committed
crrExistingObjects: support only triggering current versions
This is needed when using multiple-backend replication, as triggering all versions may result in an ordering missmatch between versions on both sites. This is due to the parallel execution of PUTs to the destination, there is no guarantee the order of versions in source will be respected, the latest version PUT in destination becomes the master. Issue: S3UTILS-183
1 parent 075dcff commit f543335

File tree

4 files changed

+213
-7
lines changed

4 files changed

+213
-7
lines changed

CRR/ReplicationStatusUpdater.js

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
const {
22
doWhilst, eachSeries, eachLimit, waterfall, series,
33
} = require('async');
4-
const werelogs = require('werelogs');
54
const { ObjectMD } = require('arsenal').models;
65

76
const { setupClients } = require('./clients');
@@ -44,6 +43,7 @@ class ReplicationStatusUpdater {
4443
maxScanned,
4544
keyMarker,
4645
versionIdMarker,
46+
currentVersionOnly,
4747
} = params;
4848

4949
// inputs
@@ -61,6 +61,7 @@ class ReplicationStatusUpdater {
6161
this.maxScanned = maxScanned;
6262
this.inputKeyMarker = keyMarker;
6363
this.inputVersionIdMarker = versionIdMarker;
64+
this.currentVersionOnly = currentVersionOnly;
6465
this.log = log;
6566

6667
this._setupClients();
@@ -305,7 +306,12 @@ class ReplicationStatusUpdater {
305306
this.log.warn(`missing SITE_NAME environment variable, triggering replication to the ${storageClass} storage class`);
306307
}
307308
return eachLimit(versions, this.workers, (i, apply) => {
308-
const { Key, VersionId } = i;
309+
const { Key, VersionId, IsLatest } = i;
310+
if (this.currentVersionOnly && !IsLatest) {
311+
++this._nSkipped;
312+
apply();
313+
return;
314+
}
309315
this._markObjectPending(bucket, Key, VersionId, storageClass, repConfig, apply);
310316
}, next);
311317
},

crrExistingObjects.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ const MAX_SCANNED = (process.env.MAX_SCANNED
2323
&& Number.parseInt(process.env.MAX_SCANNED, 10));
2424
const { KEY_MARKER } = process.env;
2525
const { VERSION_ID_MARKER } = process.env;
26+
const CURRENT_VERSION_ONLY = process.env.CURRENT_VERSION_ONLY === 'true' || process.env.CURRENT_VERSION_ONLY === '1' || false;
2627

2728
const {
2829
ACCESS_KEY,
@@ -56,6 +57,11 @@ if (!STORAGE_TYPE) {
5657
if (!TARGET_REPLICATION_STATUS) {
5758
TARGET_REPLICATION_STATUS = 'NEW';
5859
}
60+
if (CURRENT_VERSION_ONLY) {
61+
log.info('CURRENT_VERSION_ONLY is enabled, only latest versions will be processed');
62+
} else {
63+
log.info('CURRENT_VERSION_ONLY is disabled, all versions will be processed');
64+
}
5965

6066
const replicationStatusToProcess = TARGET_REPLICATION_STATUS.split(',');
6167
replicationStatusToProcess.forEach(state => {
@@ -85,6 +91,7 @@ const replicationStatusUpdater = new ReplicationStatusUpdater({
8591
maxScanned: MAX_SCANNED,
8692
keyMarker: KEY_MARKER,
8793
versionIdMarker: VERSION_ID_MARKER,
94+
currentVersionOnly: CURRENT_VERSION_ONLY,
8895
}, log);
8996

9097
replicationStatusUpdater.run(err => {

tests/unit/CRR/ReplicationStatusUpdater.js

Lines changed: 171 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,12 @@
1-
const AWS = require('aws-sdk');
21
const werelogs = require('werelogs');
32
const assert = require('assert');
43

5-
const BackbeatClient = require('../../../BackbeatClient');
6-
const ReplicationStatusUpdater = require('../../../CRR/ReplicationStatusUpdater');
74
const {
85
initializeCrrWithMocks,
96
listVersionRes,
107
listVersionsRes,
118
listVersionWithMarkerRes,
12-
getBucketReplicationRes,
139
getMetadataRes,
14-
putMetadataRes,
1510
} = require('../../utils/crr');
1611

1712
const logger = new werelogs.Logger('ReplicationStatusUpdater::tests', 'debug', 'debug');
@@ -534,3 +529,174 @@ describe('ReplicationStatusUpdater with specifics', () => {
534529
});
535530
});
536531
});
532+
533+
describe('ReplicationStatusUpdater with currentVersionOnly', () => {
534+
it('should process only latest versions when currentVersionOnly is true', done => {
535+
const listVersionsWithMixedLatest = {
536+
IsTruncated: false,
537+
Versions: [
538+
{
539+
ETag: '"dabcc341ecab339daf766e1cddd5d1bb"',
540+
ChecksumAlgorithm: [],
541+
Size: 3263,
542+
StorageClass: 'STANDARD',
543+
Key: 'key0',
544+
VersionId: 'aJdO148N3LjN00000000001I4j3QKItW',
545+
IsLatest: true,
546+
LastModified: '2024-01-05T13:11:31.861Z',
547+
Owner: {
548+
DisplayName: 'bart',
549+
ID: '0',
550+
},
551+
},
552+
{
553+
ETag: '"dabcc341ecab339daf766e1cddd5d1bb"',
554+
ChecksumAlgorithm: [],
555+
Size: 3263,
556+
StorageClass: 'STANDARD',
557+
Key: 'key0',
558+
VersionId: 'aJdO148N3LjN00000000001I4j3QKItV',
559+
IsLatest: false,
560+
LastModified: '2024-01-05T13:11:30.861Z',
561+
Owner: {
562+
DisplayName: 'bart',
563+
ID: '0',
564+
},
565+
},
566+
{
567+
ETag: '"dabcc341ecab339daf766e1cddd5d1bb"',
568+
ChecksumAlgorithm: [],
569+
Size: 3263,
570+
StorageClass: 'STANDARD',
571+
Key: 'key1',
572+
VersionId: 'aJdO148N3LjN00000000001I4j3QKItU',
573+
IsLatest: true,
574+
LastModified: '2024-01-05T13:11:32.861Z',
575+
Owner: {
576+
DisplayName: 'bart',
577+
ID: '0',
578+
},
579+
},
580+
{
581+
ETag: '"dabcc341ecab339daf766e1cddd5d1bb"',
582+
ChecksumAlgorithm: [],
583+
Size: 3263,
584+
StorageClass: 'STANDARD',
585+
Key: 'key1',
586+
VersionId: 'aJdO148N3LjN00000000001I4j3QKItT',
587+
IsLatest: false,
588+
LastModified: '2024-01-05T13:11:31.861Z',
589+
Owner: {
590+
DisplayName: 'bart',
591+
ID: '0',
592+
},
593+
},
594+
],
595+
DeleteMarkers: [],
596+
Name: 'bucket0',
597+
MaxKeys: 1000,
598+
CommonPrefixes: [],
599+
};
600+
601+
const crr = initializeCrrWithMocks({
602+
buckets: ['bucket0'],
603+
workers: 10,
604+
replicationStatusToProcess: ['NEW'],
605+
currentVersionOnly: true,
606+
}, logger);
607+
608+
crr.s3.listObjectVersions = jest.fn((params, cb) => cb(null, listVersionsWithMixedLatest));
609+
610+
crr.run(err => {
611+
assert.ifError(err);
612+
613+
expect(crr.s3.listObjectVersions).toHaveBeenCalledTimes(1);
614+
expect(crr.s3.getBucketReplication).toHaveBeenCalledTimes(1);
615+
616+
expect(crr.bb.getMetadata).toHaveBeenCalledTimes(2);
617+
expect(crr.bb.getMetadata).toHaveBeenNthCalledWith(1, {
618+
Bucket: 'bucket0',
619+
Key: 'key0',
620+
VersionId: 'aJdO148N3LjN00000000001I4j3QKItW',
621+
}, expect.any(Function));
622+
expect(crr.bb.getMetadata).toHaveBeenNthCalledWith(2, {
623+
Bucket: 'bucket0',
624+
Key: 'key1',
625+
VersionId: 'aJdO148N3LjN00000000001I4j3QKItU',
626+
}, expect.any(Function));
627+
628+
expect(crr.bb.putMetadata).toHaveBeenCalledTimes(2);
629+
630+
assert.strictEqual(crr._nProcessed, 2);
631+
assert.strictEqual(crr._nSkipped, 2);
632+
assert.strictEqual(crr._nUpdated, 2);
633+
assert.strictEqual(crr._nErrors, 0);
634+
done();
635+
});
636+
});
637+
638+
it('should process all versions when currentVersionOnly is false', done => {
639+
const listVersionsWithMixedLatest = {
640+
IsTruncated: false,
641+
Versions: [
642+
{
643+
ETag: '"dabcc341ecab339daf766e1cddd5d1bb"',
644+
ChecksumAlgorithm: [],
645+
Size: 3263,
646+
StorageClass: 'STANDARD',
647+
Key: 'key0',
648+
VersionId: 'aJdO148N3LjN00000000001I4j3QKItW',
649+
IsLatest: true,
650+
LastModified: '2024-01-05T13:11:31.861Z',
651+
Owner: {
652+
DisplayName: 'bart',
653+
ID: '0',
654+
},
655+
},
656+
{
657+
ETag: '"dabcc341ecab339daf766e1cddd5d1bb"',
658+
ChecksumAlgorithm: [],
659+
Size: 3263,
660+
StorageClass: 'STANDARD',
661+
Key: 'key0',
662+
VersionId: 'aJdO148N3LjN00000000001I4j3QKItV',
663+
IsLatest: false,
664+
LastModified: '2024-01-05T13:11:30.861Z',
665+
Owner: {
666+
DisplayName: 'bart',
667+
ID: '0',
668+
},
669+
},
670+
],
671+
DeleteMarkers: [],
672+
Name: 'bucket0',
673+
MaxKeys: 1000,
674+
CommonPrefixes: [],
675+
};
676+
677+
const crr = initializeCrrWithMocks({
678+
buckets: ['bucket0'],
679+
workers: 10,
680+
replicationStatusToProcess: ['NEW'],
681+
currentVersionOnly: false,
682+
}, logger);
683+
684+
crr.s3.listObjectVersions = jest.fn((params, cb) => cb(null, listVersionsWithMixedLatest));
685+
686+
crr.run(err => {
687+
assert.ifError(err);
688+
689+
expect(crr.s3.listObjectVersions).toHaveBeenCalledTimes(1);
690+
expect(crr.s3.getBucketReplication).toHaveBeenCalledTimes(1);
691+
692+
expect(crr.bb.getMetadata).toHaveBeenCalledTimes(2);
693+
expect(crr.bb.putMetadata).toHaveBeenCalledTimes(2);
694+
695+
assert.strictEqual(crr._nProcessed, 2);
696+
assert.strictEqual(crr._nSkipped, 0);
697+
assert.strictEqual(crr._nUpdated, 2);
698+
assert.strictEqual(crr._nErrors, 0);
699+
done();
700+
});
701+
});
702+
});

tests/unit/CRR/crrExistingObjects.js

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ describe('crrExistingObjects', () => {
7575
process.env.KEY_MARKER = 'testKeyMarker';
7676
process.env.VERSION_ID_MARKER = 'testVersionIdMarker';
7777
process.env.DEBUG = '0';
78+
process.env.CURRENT_VERSION_ONLY = 'true';
7879

7980
require('../../../crrExistingObjects');
8081

@@ -96,6 +97,7 @@ describe('crrExistingObjects', () => {
9697
maxScanned: 1000,
9798
keyMarker: 'testKeyMarker',
9899
versionIdMarker: 'testVersionIdMarker',
100+
currentVersionOnly: true,
99101
}, expect.anything());
100102

101103
expect(mockFatal).not.toHaveBeenCalled();
@@ -131,6 +133,7 @@ describe('crrExistingObjects', () => {
131133
maxScanned: undefined,
132134
keyMarker: undefined,
133135
versionIdMarker: undefined,
136+
currentVersionOnly: false,
134137
}, expect.anything());
135138

136139
expect(mockFatal).not.toHaveBeenCalled();
@@ -183,4 +186,28 @@ describe('crrExistingObjects', () => {
183186
expect(mockFatal).toHaveBeenCalledWith('SECRET_KEY not defined');
184187
expect(process.exit).toHaveBeenCalledWith(1);
185188
});
189+
190+
test('should handle currentVersionOnly with value "1" as true', () => {
191+
process.argv[2] = 'bucket1';
192+
193+
process.env.ACCESS_KEY = 'testAccessKey';
194+
process.env.SECRET_KEY = 'testSecretKey';
195+
process.env.ENDPOINT = 'http://fake.endpoint';
196+
process.env.CURRENT_VERSION_ONLY = '1';
197+
198+
require('../../../crrExistingObjects');
199+
200+
const ReplicationStatusUpdater = require('../../../CRR/ReplicationStatusUpdater');
201+
202+
expect(ReplicationStatusUpdater).toHaveBeenCalledWith(
203+
expect.objectContaining({
204+
currentVersionOnly: true,
205+
}),
206+
expect.anything(),
207+
);
208+
expect(mockFatal).not.toHaveBeenCalled();
209+
expect(mockError).not.toHaveBeenCalled();
210+
expect(process.exit).not.toHaveBeenCalled();
211+
expect(mockCrrRun).toHaveBeenCalled();
212+
});
186213
});

0 commit comments

Comments
 (0)