Skip to content

Commit 0947a1b

Browse files
Migrated cleanupBuckets to sdk v3
Issue : S3UTILS-203
1 parent c156b94 commit 0947a1b

File tree

1 file changed

+94
-101
lines changed

1 file changed

+94
-101
lines changed

cleanupBuckets.js

Lines changed: 94 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
1-
const { http } = require('httpagent');
2-
1+
const { http, https } = require('httpagent');
32
const async = require('async');
4-
const AWS = require('aws-sdk');
3+
const crypto = require('crypto');
4+
const {
5+
S3Client,
6+
ListObjectVersionsCommand,
7+
DeleteObjectsCommand,
8+
ListMultipartUploadsCommand,
9+
AbortMultipartUploadCommand,
10+
} = require('@aws-sdk/client-s3');
11+
const { NodeHttpHandler } = require('@aws-sdk/node-http-handler');
512
const { Logger } = require('werelogs');
613

714
const log = new Logger('s3utils::emptyBucket');
@@ -29,36 +36,23 @@ if (!SECRET_KEY) {
2936
}
3037
const LISTING_LIMIT = 1000;
3138

32-
AWS.config.update({
33-
accessKeyId: ACCESS_KEY,
34-
secretAccessKey: SECRET_KEY,
39+
const s3 = new S3Client({
40+
credentials: {
41+
accessKeyId: ACCESS_KEY,
42+
secretAccessKey: SECRET_KEY,
43+
},
3544
endpoint: ENDPOINT,
3645
region: 'us-east-1',
37-
sslEnabled: false,
38-
s3ForcePathStyle: true,
39-
apiVersions: { s3: '2006-03-01' },
40-
signatureVersion: 'v4',
41-
signatureCache: false,
46+
forcePathStyle: true,
47+
requestHandler: new NodeHttpHandler({
48+
httpAgent: new http.Agent({ keepAlive: true }),
49+
httpsAgent: new https.Agent({
50+
keepAlive: true,
51+
rejectUnauthorized: false
52+
}),
53+
}),
4254
});
4355

44-
const s3 = new AWS.S3({
45-
httpOptions: {
46-
maxRetries: 0,
47-
timeout: 0,
48-
agent: new http.Agent({ keepAlive: true }),
49-
},
50-
});
51-
52-
// list object versions
53-
function _listObjectVersions(bucket, VersionIdMarker, KeyMarker, cb) {
54-
return s3.listObjectVersions({
55-
Bucket: bucket,
56-
MaxKeys: LISTING_LIMIT,
57-
VersionIdMarker,
58-
KeyMarker,
59-
}, cb);
60-
}
61-
6256
// return object with key and version_id
6357
function _getKeys(keys) {
6458
return keys.map(v => ({
@@ -68,97 +62,96 @@ function _getKeys(keys) {
6862
}
6963

7064
// delete all versions of an object
71-
function _deleteVersions(bucket, objectsToDelete, cb) {
72-
// multi object delete can delete max 1000 objects
65+
async function _deleteVersions(bucket, objectsToDelete) {
66+
if (!objectsToDelete || objectsToDelete.length === 0) {
67+
log.info(`No objects to delete for ${bucket}, skipping deleteObjects call`);
68+
return;
69+
}
70+
7371
const params = {
7472
Bucket: bucket,
7573
Delete: { Objects: objectsToDelete },
7674
};
77-
s3.deleteObjects(params, err => {
78-
if (err) {
79-
log.error('batch delete err', err);
80-
return cb(err);
81-
}
75+
const command = new DeleteObjectsCommand(params);
76+
try {
77+
await s3.send(command);
8278
objectsToDelete.forEach(v => log.info(`deleted key: ${v.Key}`));
83-
return cb();
84-
});
79+
} catch (err) {
80+
log.error('batch delete err', err);
81+
throw err;
82+
}
8583
}
8684

87-
function cleanupVersions(bucket, cb) {
85+
async function cleanupVersions(bucket) {
8886
let VersionIdMarker = null;
8987
let KeyMarker = null;
90-
async.doWhilst(
91-
done => _listObjectVersions(
92-
bucket,
88+
let IsTruncated = true;
89+
90+
while (IsTruncated) {
91+
const data = await s3.send(new ListObjectVersionsCommand({
92+
Bucket: bucket,
93+
MaxKeys: LISTING_LIMIT,
9394
VersionIdMarker,
9495
KeyMarker,
95-
(err, data) => {
96-
if (err) {
97-
return done(err);
98-
}
99-
VersionIdMarker = data.NextVersionIdMarker;
100-
KeyMarker = data.NextKeyMarker;
101-
const keysToDelete = _getKeys(data.Versions);
102-
const markersToDelete = _getKeys(data.DeleteMarkers);
103-
return _deleteVersions(
104-
bucket,
105-
keysToDelete.concat(markersToDelete),
106-
done,
107-
);
108-
},
109-
),
110-
() => {
111-
if (VersionIdMarker || KeyMarker) {
112-
return true;
113-
}
114-
return false;
115-
},
116-
cb,
117-
);
96+
}));
97+
98+
VersionIdMarker = data.NextVersionIdMarker;
99+
KeyMarker = data.NextKeyMarker;
100+
IsTruncated = data.IsTruncated;
101+
const keysToDelete = _getKeys(data.Versions || []);
102+
const markersToDelete = _getKeys(data.DeleteMarkers || []);
103+
const allObjectsToDelete = keysToDelete.concat(markersToDelete);
104+
105+
if (allObjectsToDelete.length > 0) {
106+
await _deleteVersions(bucket, allObjectsToDelete);
107+
} else {
108+
log.info(`No objects to delete for bucket ${bucket}`);
109+
}
110+
}
118111
}
119112

120-
function abortAllMultipartUploads(bucket, cb) {
121-
s3.listMultipartUploads({ Bucket: bucket }, (err, res) => {
122-
if (err) {
123-
return cb(err);
124-
}
125-
if (!res || !res.Uploads) {
126-
return cb();
127-
}
128-
return async.mapLimit(
129-
res.Uploads,
130-
10,
131-
(item, done) => {
132-
const { Key, UploadId } = item;
133-
const params = { Bucket: bucket, Key, UploadId };
134-
s3.abortMultipartUpload(params, done);
135-
},
136-
cb,
137-
);
138-
});
113+
async function abortAllMultipartUploads(bucket) {
114+
const res = await s3.send(new ListMultipartUploadsCommand({ Bucket: bucket }));
115+
log.info(`Found ${res.Uploads ? res.Uploads.length : 0} multipart uploads to abort`);
116+
117+
if (!res || !res.Uploads || res.Uploads.length === 0) {
118+
return;
119+
}
120+
121+
const CONCURRENCY = 10;
122+
for (let i = 0; i < res.Uploads.length; i += CONCURRENCY) {
123+
const batch = res.Uploads.slice(i, i + CONCURRENCY);
124+
const deleteMpuPromises = batch.map(async item => {
125+
const { Key, UploadId } = item;
126+
const params = { Bucket: bucket, Key, UploadId };
127+
return await s3.send(new AbortMultipartUploadCommand(params));
128+
});
129+
await Promise.all(deleteMpuPromises);
130+
}
139131
}
140132

141-
function _cleanupBucket(bucket, cb) {
142-
async.parallel([
143-
done => cleanupVersions(bucket, done),
144-
done => abortAllMultipartUploads(bucket, done),
145-
], err => {
146-
if (err) {
147-
log.error('error occured deleting objects', err);
148-
return cb(err);
149-
}
133+
async function _cleanupBucket(bucket) {
134+
try {
135+
await Promise.all([
136+
cleanupVersions(bucket),
137+
// abortAllMultipartUploads(bucket),
138+
]);
150139
log.info(`completed cleaning up of bucket: ${bucket}`);
151-
return cb();
152-
});
140+
} catch (err) {
141+
log.error('error occured deleting objects', err);
142+
throw err;
143+
}
153144
}
154145

155-
function cleanupBuckets(buckets) {
156-
async.mapLimit(buckets, 1, _cleanupBucket, err => {
157-
if (err) {
158-
return log.error('error occured deleting objects', err);
146+
async function cleanupBuckets(buckets) {
147+
try {
148+
for (const bucket of buckets) {
149+
await _cleanupBucket(bucket);
159150
}
160-
return log.info('completed cleaning up the given buckets');
161-
});
151+
log.info('completed cleaning all buckets');
152+
} catch (err) {
153+
log.error('error occured deleting objects', err);
154+
}
162155
}
163156

164157
cleanupBuckets(BUCKETS);

0 commit comments

Comments
 (0)