Skip to content

Commit e2b3285

Browse files
migrate to sdk v3 for bucketVersionsStats
Issue: S3UTILS-200
1 parent 0ffbe21 commit e2b3285

File tree

1 file changed

+98
-97
lines changed

1 file changed

+98
-97
lines changed

bucketVersionsStats.js

Lines changed: 98 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
const fs = require('fs');
22
const { http, https } = require('httpagent');
33

4-
const AWS = require('aws-sdk');
5-
const { doWhilst } = require('async');
4+
const { S3Client, ListObjectVersionsCommand } = require('@aws-sdk/client-s3');
5+
const { NodeHttpHandler } = require('@aws-sdk/node-http-handler');
6+
const { ConfiguredRetryStrategy } = require('@smithy/util-retry');
67

78
const { Logger } = require('werelogs');
89

910
const parseOlderThan = require('./utils/parseOlderThan');
10-
const { safeListObjectVersions } = require('./utils/safeList');
1111

1212
const log = new Logger('s3utils::bucketVersionsStats');
1313
const { ENDPOINT } = process.env;
@@ -97,38 +97,30 @@ if (s3EndpointIsHttps) {
9797
agent = new http.Agent({ keepAlive: true });
9898
}
9999

100-
const options = {
101-
accessKeyId: ACCESS_KEY,
102-
secretAccessKey: SECRET_KEY,
100+
const s3 = new S3Client({
101+
credentials: {
102+
accessKeyId: ACCESS_KEY,
103+
secretAccessKey: SECRET_KEY,
104+
},
103105
endpoint: ENDPOINT,
104106
region: 'us-east-1',
105-
sslEnabled: s3EndpointIsHttps,
106-
s3ForcePathStyle: true,
107-
apiVersions: { s3: '2006-03-01' },
108-
signatureVersion: 'v4',
109-
signatureCache: false,
110-
httpOptions: {
111-
timeout: 0,
112-
agent,
113-
},
114-
};
115-
/**
116-
* Options specific to s3 requests
117-
* `maxRetries` & `customBackoff` are set only to s3 requests
118-
* default aws sdk retry count is 3 with an exponential delay of 2^n * 30 ms
119-
*/
120-
const s3Options = {
121-
maxRetries: AWS_SDK_REQUEST_RETRIES,
122-
customBackoff: (retryCount, error) => {
123-
log.error('aws sdk request error', { error, retryCount });
124-
// retry with exponential backoff delay capped at 1mn max
107+
forcePathStyle: true,
108+
tls: s3EndpointIsHttps,
109+
requestHandler: new NodeHttpHandler({
110+
httpAgent: agent,
111+
httpsAgent: agent,
112+
requestTimeout: 60000,
113+
}),
114+
retryStrategy: new ConfiguredRetryStrategy(
115+
AWS_SDK_REQUEST_RETRIES,
116+
// Custom backoff with exponential delay capped at 1mn max
125117
// between retries, and a little added jitter
126-
return Math.min(AWS_SDK_REQUEST_INITIAL_DELAY_MS
127-
* 2 ** retryCount, 60000)
128-
* (0.9 + Math.random() * 0.2);
129-
},
130-
};
131-
const s3 = new AWS.S3(Object.assign(options, s3Options));
118+
attempt => Math.min(
119+
AWS_SDK_REQUEST_INITIAL_DELAY_MS * 2 ** attempt,
120+
60000
121+
) * (0.9 + Math.random() * 0.2)
122+
),
123+
});
132124

133125
const stats = {
134126
current: {
@@ -147,10 +139,17 @@ let VersionIdMarker;
147139
function _logProgress(message) {
148140
const loggedStats = {
149141
total: {
150-
count: BigInt(stats.current.count + stats.noncurrent.count),
151-
size: BigInt(stats.current.size + stats.noncurrent.size),
142+
count: (stats.current.count + stats.noncurrent.count).toString(),
143+
size: (stats.current.size + stats.noncurrent.size).toString(),
144+
},
145+
current: {
146+
count: stats.current.count.toString(),
147+
size: stats.current.size.toString(),
148+
},
149+
noncurrent: {
150+
count: stats.noncurrent.count.toString(),
151+
size: stats.noncurrent.size.toString(),
152152
},
153-
...stats,
154153
};
155154
log.info(message, {
156155
bucket: BUCKET,
@@ -166,67 +165,65 @@ const logProgressInterval = setInterval(
166165
LOG_PROGRESS_INTERVAL_MS,
167166
);
168167

169-
function _listObjectVersions(bucket, KeyMarker, VersionIdMarker, cb) {
170-
return safeListObjectVersions(s3, {
171-
Bucket: bucket,
172-
MaxKeys: LISTING_LIMIT,
173-
Prefix: TARGET_PREFIX,
174-
KeyMarker,
175-
VersionIdMarker,
176-
}, cb);
177-
}
178-
179-
180-
function listBucket(bucket, cb) {
168+
async function listBucket(bucket) {
181169
let NextKeyMarker = KEY_MARKER;
182170
let NextVersionIdMarker = VERSION_ID_MARKER;
183-
return doWhilst(
184-
done => {
185-
KeyMarker = NextKeyMarker;
186-
VersionIdMarker = NextVersionIdMarker;
187-
_listObjectVersions(bucket, KeyMarker, VersionIdMarker, (err, data) => {
188-
if (err) {
189-
log.error('error listing object versions', {
190-
error: err,
191-
});
192-
return done(err);
193-
}
194-
for (const version of data.Versions) {
195-
if (_OLDER_THAN_TIMESTAMP) {
196-
const parsed = new Date(version.LastModified);
197-
if (Number.isNaN(parsed.getTime()) || parsed > _OLDER_THAN_TIMESTAMP) {
198-
continue;
199-
}
200-
}
201-
const statObj = version.IsLatest ? stats.current : stats.noncurrent;
202-
statObj.count += 1n;
203-
statObj.size += version.Size || 0n;
204-
if (VERBOSE) {
205-
log.info('version info', {
206-
bucket: BUCKET,
207-
key: version.Key,
208-
versionId: version.VersionId,
209-
isLatest: version.IsLatest,
210-
lastModified: version.LastModified,
211-
size: version.Size,
212-
});
171+
172+
while (true) {
173+
KeyMarker = NextKeyMarker;
174+
VersionIdMarker = NextVersionIdMarker;
175+
176+
const command = new ListObjectVersionsCommand({
177+
Bucket: bucket,
178+
MaxKeys: LISTING_LIMIT,
179+
Prefix: TARGET_PREFIX,
180+
KeyMarker,
181+
VersionIdMarker,
182+
});
183+
184+
try {
185+
const data = await s3.send(command);
186+
const versions = data.Versions || [];
187+
for (const version of versions) {
188+
if (_OLDER_THAN_TIMESTAMP) {
189+
const parsed = new Date(version.LastModified);
190+
if (Number.isNaN(parsed.getTime()) || parsed > _OLDER_THAN_TIMESTAMP) {
191+
continue;
213192
}
214193
}
215-
NextKeyMarker = data.NextKeyMarker;
216-
NextVersionIdMarker = data.NextVersionIdMarker;
217-
return done();
218-
});
219-
},
220-
() => {
221-
if (NextKeyMarker || NextVersionIdMarker) {
222-
return true;
194+
const statObj = version.IsLatest ? stats.current : stats.noncurrent;
195+
statObj.count += 1n;
196+
statObj.size += BigInt(version.Size || 0);
197+
if (VERBOSE) {
198+
log.info('version info', {
199+
bucket: BUCKET,
200+
key: version.Key,
201+
versionId: version.VersionId,
202+
isLatest: version.IsLatest,
203+
lastModified: version.LastModified,
204+
size: version.Size,
205+
});
206+
}
223207
}
224-
KeyMarker = undefined;
225-
VersionIdMarker = undefined;
226-
return false;
227-
},
228-
cb,
229-
);
208+
209+
NextKeyMarker = data.NextKeyMarker;
210+
NextVersionIdMarker = data.NextVersionIdMarker;
211+
212+
if (!NextKeyMarker && !NextVersionIdMarker) {
213+
break;
214+
}
215+
} catch (error) {
216+
log.error('error listing object versions', {
217+
bucket,
218+
keyMarker: KeyMarker,
219+
versionIdMarker: VersionIdMarker,
220+
error,
221+
errorName: error.name,
222+
errorMessage: error.message,
223+
});
224+
throw error;
225+
}
226+
}
230227
}
231228

232229
function shutdown(exitCode) {
@@ -235,20 +232,24 @@ function shutdown(exitCode) {
235232
process.exit(exitCode);
236233
}
237234

238-
listBucket(BUCKET, err => {
239-
if (err) {
235+
async function main() {
236+
try {
237+
await listBucket(BUCKET);
238+
_logProgress('final summary');
239+
shutdown(0);
240+
} catch (error) {
240241
log.error('error during execution', {
241242
bucket: BUCKET,
242243
KeyMarker,
243244
VersionIdMarker,
245+
error,
244246
});
245247
_logProgress('summary after error');
246248
shutdown(1);
247-
} else {
248-
_logProgress('final summary');
249-
shutdown(0);
250249
}
251-
});
250+
}
251+
252+
main();
252253

253254
function stop() {
254255
log.warn('stopping execution');

0 commit comments

Comments
 (0)