diff --git a/extensions/replication/tasks/ReplicateObject.js b/extensions/replication/tasks/ReplicateObject.js index 92ef38c325..dc8cf99884 100644 --- a/extensions/replication/tasks/ReplicateObject.js +++ b/extensions/replication/tasks/ReplicateObject.js @@ -13,9 +13,12 @@ const AccountCredentials = const RoleCredentials = require('../../../lib/credentials/RoleCredentials'); const { metricsExtension, metricsTypeProcessed } = require('../constants'); +const ObjectQueueEntry = require('../utils/ObjectQueueEntry'); const MPU_CONC_LIMIT = 10; +const errorAlreadyCompleted = {}; + function _extractAccountIdFromRole(role) { return role.split(':')[4]; } @@ -289,22 +292,61 @@ class ReplicateObject extends BackbeatTask { }); } + _refreshSourceEntry(sourceEntry, log, cb) { + const params = { + Bucket: sourceEntry.getBucket(), + Key: sourceEntry.getObjectKey(), + VersionId: sourceEntry.getEncodedVersionId(), + }; + return this.backbeatSource.getMetadata(params, (err, blob) => { + if (err) { + err.origin = 'source'; + log.error('error getting metadata blob from S3', { + method: 'ReplicateObject._refreshSourceEntry', + error: err, + }); + return cb(err); + } + const parsedEntry = ObjectQueueEntry.createFromBlob(blob.Body); + if (parsedEntry.error) { + log.error('error parsing metadata blob', { + error: parsedEntry.error, + method: 'ReplicateObject._refreshSourceEntry', + }); + return cb(errors.InternalError. + customizeDescription('error parsing metadata blob')); + } + const refreshedEntry = new ObjectQueueEntry(sourceEntry.getBucket(), + sourceEntry.getObjectVersionedKey(), parsedEntry.result); + return cb(null, refreshedEntry); + }); + } + _getAndPutData(sourceEntry, destEntry, log, cb) { - log.debug('replicating data', { entry: sourceEntry.getLogInfo() }); - if (sourceEntry.getLocation().some(part => { - const partObj = new ObjectMDLocation(part); - return partObj.getDataStoreETag() === undefined; - })) { - log.error('cannot replicate object without dataStoreETag ' + - 'property', - { method: 'ReplicateObject._getAndPutData', - entry: sourceEntry.getLogInfo() }); - return cb(errors.InvalidObjectState); - } - const locations = sourceEntry.getReducedLocations(); - return async.mapLimit(locations, MPU_CONC_LIMIT, (part, done) => { - this._getAndPutPart(sourceEntry, destEntry, part, log, done); - }, cb); + this._refreshSourceEntry(sourceEntry, log, (err, refreshedEntry) => { + if (err) { + return cb(err); + } + const status = refreshedEntry.getReplicationSiteStatus(this.site); + if (status === 'COMPLETED') { + return cb(errorAlreadyCompleted); + } + log.debug('replicating data', { entry: sourceEntry.getLogInfo() }); + if (sourceEntry.getLocation().some(part => { + const partObj = new ObjectMDLocation(part); + return partObj.getDataStoreETag() === undefined; + })) { + log.error('cannot replicate object without dataStoreETag ' + + 'property', + { method: 'ReplicateObject._getAndPutData', + entry: sourceEntry.getLogInfo() }); + return cb(errors.InvalidObjectState); + } + const locations = sourceEntry.getReducedLocations(); + return async.mapLimit(locations, MPU_CONC_LIMIT, (part, done) => { + this._getAndPutPart(sourceEntry, destEntry, part, log, done); + }, cb); + }); } _getAndPutPartOnce(sourceEntry, destEntry, part, log, done) { @@ -578,6 +620,12 @@ class ReplicateObject extends BackbeatTask { error: err.description }); return done(); } + if (err === errorAlreadyCompleted) { + log.warn('replication skipped: ' + + 'source object version already COMPLETED', + { entry: sourceEntry.getLogInfo() }); + return done(); + } if (err.ObjNotFound || err.code === 'ObjNotFound') { if (err.origin === 'source') { log.info('replication skipped: ' +