Skip to content

Commit a701052

Browse files
migrate verifyReplication to aws sdk v3
Issue : S3UTILS-201
1 parent 18ffb74 commit a701052

File tree

2 files changed

+87
-100
lines changed

2 files changed

+87
-100
lines changed

VerifyReplication/storage/s3.js

Lines changed: 77 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
const AWS = require('aws-sdk');
1+
const { S3Client, HeadObjectCommand, ListObjectsV2Command } = require('@aws-sdk/client-s3');
2+
const { NodeHttpHandler } = require('@aws-sdk/node-http-handler');
3+
const { ConfiguredRetryStrategy } = require('@smithy/util-retry');
24
const fs = require('fs');
35
const http = require('http');
46
const https = require('https');
@@ -17,75 +19,61 @@ function getClient(params) {
1719
showClientLogsIfAvailable,
1820
log,
1921
} = params;
20-
const s3EndpointIsHttps = (endpoint && endpoint.startsWith('https:')) || false;
21-
let agent;
22-
let clientLogger;
2322

24-
if (s3EndpointIsHttps) {
25-
agent = new https.Agent({
26-
keepAlive: true,
27-
ca: httpsCaPath ? fs.readFileSync(httpsCaPath) : undefined,
28-
rejectUnauthorized: httpsNoVerify !== '1',
29-
});
30-
} else {
31-
agent = new http.Agent({ keepAlive: true });
32-
}
23+
const httpAgent = new http.Agent({ keepAlive: true });
24+
const httpsAgent = new https.Agent({
25+
keepAlive: true,
26+
ca: httpsCaPath ? fs.readFileSync(httpsCaPath) : undefined,
27+
rejectUnauthorized: httpsNoVerify !== '1',
28+
});
3329

34-
// enable/disable sdk logs
35-
if (showClientLogsIfAvailable) {
36-
// TODO: may be use werelogs
37-
clientLogger = console;
38-
}
30+
// Options specific to s3 requests - maxRetries & customBackoff
31+
// Default aws sdk retry count is 3 with an exponential delay of 2^n * 30 ms
32+
const retryStrategy = new ConfiguredRetryStrategy(
33+
defaults.AWS_SDK_REQUEST_RETRIES,
34+
retryCount => {
35+
// retry with exponential backoff delay capped at 60s max
36+
// between retries, and a little added jitter
37+
const backoff = Math.min(defaults.AWS_SDK_REQUEST_INITIAL_DELAY_MS
38+
* 2 ** retryCount, defaults.AWS_SDK_REQUEST_MAX_BACKOFF_LIMIT_MS)
39+
* (0.9 + Math.random() * 0.2);
40+
// show retry errors only if client logs are enabled as this may
41+
// increase log size!
42+
if (showClientLogsIfAvailable) {
43+
log.error('awssdk request error', {
44+
retryCount,
45+
backoff,
46+
});
47+
}
48+
return backoff;
49+
}
50+
);
3951

40-
const options = {
41-
accessKeyId: accessKey,
42-
secretAccessKey: secretKey,
43-
endpoint,
52+
const clientConfig = {
4453
region,
45-
sslEnabled: s3EndpointIsHttps,
46-
s3ForcePathStyle: true,
47-
apiVersions: { s3: '2006-03-01' },
48-
signatureVersion: 'v4',
49-
signatureCache: false,
50-
httpOptions: {
51-
timeout: httpTimeout,
52-
agent,
54+
credentials: {
55+
accessKeyId: accessKey,
56+
secretAccessKey: secretKey,
5357
},
54-
logger: clientLogger,
58+
endpoint,
59+
forcePathStyle: true,
60+
retryStrategy,
61+
requestHandler: new NodeHttpHandler({
62+
httpAgent,
63+
httpsAgent,
64+
requestTimeout: httpTimeout || 300000,
65+
}),
5566
};
5667

57-
/**
58-
* Options specific to s3 requests
59-
* `maxRetries` & `customBackoff` are set only to s3 requests
60-
* default aws sdk retry count is 3 with an exponential delay of 2^n * 30 ms
61-
*/
62-
const s3Options = {
63-
maxRetries: defaults.AWS_SDK_REQUEST_RETRIES,
64-
retryDelayOptions: {
65-
customBackoff: (retryCount, error) => {
66-
// retry with exponential backoff delay capped at 60s max
67-
// between retries, and a little added jitter
68-
const backoff = Math.min(defaults.AWS_SDK_REQUEST_INITIAL_DELAY_MS
69-
* 2 ** retryCount, defaults.AWS_SDK_REQUEST_MAX_BACKOFF_LIMIT_MS)
70-
* (0.9 + Math.random() * 0.2);
71-
// show retry errors only if client logs are enabled as this may
72-
// increase log size!
73-
if (showClientLogsIfAvailable) {
74-
log.error('awssdk request error', {
75-
error,
76-
retryCount,
77-
backoff,
78-
});
79-
}
80-
return backoff;
81-
},
82-
},
83-
};
68+
if (showClientLogsIfAvailable) {
69+
// TODO: consider using werelogs
70+
clientConfig.logger = console;
71+
}
8472

85-
return new AWS.S3({ ...options, ...s3Options });
73+
return new S3Client(clientConfig);
8674
}
8775

88-
function getObjMd(params, cb) {
76+
async function getObjMd(params, cb) {
8977
const {
9078
client,
9179
bucket,
@@ -98,31 +86,27 @@ function getObjMd(params, cb) {
9886
return cb(new Error(errMsg));
9987
}
10088

101-
return client.headObject({
102-
Bucket: bucket,
103-
Key: key,
104-
VersionId: versionId,
105-
}, (err, data) => {
106-
if (err) {
107-
return cb(err);
108-
}
109-
const {
110-
ContentLength,
111-
LastModified,
112-
VersionId,
113-
Metadata,
114-
} = data;
89+
try {
90+
const commandParams = {
91+
Bucket: bucket,
92+
Key: key,
93+
VersionId: versionId
94+
};
95+
96+
const data = await client.send(new HeadObjectCommand(commandParams));
11597
const resp = {
116-
size: ContentLength,
117-
lastModified: LastModified,
118-
versionId: VersionId,
119-
md: Metadata,
98+
size: data.ContentLength,
99+
lastModified: data.LastModified,
100+
versionId: data.VersionId,
101+
md: data.Metadata,
120102
};
121103
return cb(null, resp);
122-
});
104+
} catch (err) {
105+
return cb(err);
106+
}
123107
}
124108

125-
function listObjects(params, cb) {
109+
async function listObjects(params, cb) {
126110
const {
127111
client,
128112
bucket,
@@ -140,14 +124,19 @@ function listObjects(params, cb) {
140124
return cb(new Error(errMsg));
141125
}
142126

143-
// TODO: support listing all versions
144-
return client.listObjectsV2({
145-
Bucket: bucket,
146-
MaxKeys: listingLimit,
147-
Prefix: prefix,
148-
Delimiter: delimiter,
149-
ContinuationToken: nextContinuationToken,
150-
}, cb);
127+
try {
128+
// TODO: support listing all versions
129+
const data = await client.send(new ListObjectsV2Command({
130+
Bucket: bucket,
131+
MaxKeys: listingLimit,
132+
Prefix: prefix,
133+
Delimiter: delimiter,
134+
ContinuationToken: nextContinuationToken,
135+
}));
136+
return cb(null, data);
137+
} catch (err) {
138+
return cb(err);
139+
}
151140
}
152141

153142
module.exports = {

VerifyReplication/verifyReplication.js

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
const async = require('async');
22

33
const storage = require('./storage');
4+
const { NotFound } = require('@aws-sdk/client-s3');
45

56
let bucketMatch = false;
67
let compareSize = false;
@@ -41,26 +42,20 @@ function verifyObjects(objectList, cb) {
4142

4243
return destinationStorage.getObjMd(params, (err, dstMd) => {
4344
++statusObj.dstProcessedCount;
44-
if (err && err.code !== 'NotFound') {
45+
if (err) {
4546
++statusObj.dstFailedMdRetrievalsCount;
4647
logger.error('error getting metadata', {
4748
error: err,
4849
bucket: statusObj.dstBucket,
4950
key: dstKey,
5051
srcLastModified,
5152
});
53+
if (err instanceof NotFound) {
54+
++statusObj.missingInDstCount;
55+
}
5256
// log the error and continue processing objects
5357
return done();
5458
}
55-
if (err && err.code === 'NotFound') {
56-
++statusObj.missingInDstCount;
57-
logger.info('object missing in destination', {
58-
key,
59-
size,
60-
srcLastModified,
61-
});
62-
return done();
63-
}
6459
const srcSize = Number.parseInt(size, 10);
6560
const dstSize = Number.parseInt(dstMd.size, 10);
6661
if (compareSize && (srcSize !== dstSize)) {
@@ -81,6 +76,9 @@ function verifyObjects(objectList, cb) {
8176
}
8277

8378
function handlePrefixes(prefixList, cb) {
79+
if (!prefixList || prefixList.length === 0) {
80+
return process.nextTick(cb);
81+
}
8482
const prefixes = prefixList.map(p => p.Prefix);
8583
return async.eachLimit(prefixes, listingWorkers, (prefix, done) => {
8684
const params = {
@@ -106,7 +104,7 @@ function listAndCompare(params, cb) {
106104
}
107105
const {
108106
IsTruncated,
109-
NextContinuationToken: nextContinuationToken,
107+
NextContinuationToken,
110108
Contents,
111109
CommonPrefixes,
112110
} = data;
@@ -118,7 +116,7 @@ function listAndCompare(params, cb) {
118116
return cb(error);
119117
}
120118
if (IsTruncated) {
121-
const listingParams = { ...params, nextContinuationToken };
119+
const listingParams = { ...params, nextContinuationToken: NextContinuationToken };
122120
return listAndCompare(listingParams, cb);
123121
}
124122
logger.info('completed listing and compare', {

0 commit comments

Comments
 (0)