1- const { http } = require ( 'httpagent' ) ;
2-
1+ const { http, https } = require ( 'httpagent' ) ;
32const async = require ( 'async' ) ;
4- const AWS = require ( 'aws-sdk' ) ;
3+ const crypto = require ( 'crypto' ) ;
4+ const {
5+ S3Client,
6+ ListObjectVersionsCommand,
7+ DeleteObjectsCommand,
8+ ListMultipartUploadsCommand,
9+ AbortMultipartUploadCommand,
10+ } = require ( '@aws-sdk/client-s3' ) ;
11+ const { NodeHttpHandler } = require ( '@aws-sdk/node-http-handler' ) ;
512const { Logger } = require ( 'werelogs' ) ;
613
714const log = new Logger ( 's3utils::emptyBucket' ) ;
@@ -29,34 +36,33 @@ if (!SECRET_KEY) {
2936}
3037const LISTING_LIMIT = 1000 ;
3138
32- AWS . config . update ( {
33- accessKeyId : ACCESS_KEY ,
34- secretAccessKey : SECRET_KEY ,
39+ const s3 = new S3Client ( {
40+ credentials : {
41+ accessKeyId : ACCESS_KEY ,
42+ secretAccessKey : SECRET_KEY ,
43+ } ,
3544 endpoint : ENDPOINT ,
3645 region : 'us-east-1' ,
37- sslEnabled : false ,
38- s3ForcePathStyle : true ,
39- apiVersions : { s3 : '2006-03-01' } ,
40- signatureVersion : 'v4' ,
41- signatureCache : false ,
42- } ) ;
43-
44- const s3 = new AWS . S3 ( {
45- httpOptions : {
46- maxRetries : 0 ,
47- timeout : 0 ,
48- agent : new http . Agent ( { keepAlive : true } ) ,
49- } ,
46+ forcePathStyle : true ,
47+ requestHandler : new NodeHttpHandler ( {
48+ httpAgent : new http . Agent ( { keepAlive : true } ) ,
49+ httpsAgent : new https . Agent ( {
50+ keepAlive : true ,
51+ rejectUnauthorized : false
52+ } ) ,
53+ } ) ,
54+ tls : false ,
5055} ) ;
5156
5257// list object versions
53- function _listObjectVersions ( bucket , VersionIdMarker , KeyMarker , cb ) {
54- return s3 . listObjectVersions ( {
58+ async function _listObjectVersions ( bucket , VersionIdMarker , KeyMarker ) {
59+ const params = {
5560 Bucket : bucket ,
5661 MaxKeys : LISTING_LIMIT ,
5762 VersionIdMarker,
5863 KeyMarker,
59- } , cb ) ;
64+ } ;
65+ return await s3 . send ( new ListObjectVersionsCommand ( params ) ) ;
6066}
6167
6268// return object with key and version_id
@@ -68,97 +74,103 @@ function _getKeys(keys) {
6874}
6975
7076// delete all versions of an object
71- function _deleteVersions ( bucket , objectsToDelete , cb ) {
72- // multi object delete can delete max 1000 objects
77+ async function _deleteVersions ( bucket , objectsToDelete ) {
78+ if ( ! objectsToDelete || objectsToDelete . length === 0 ) {
79+ log . info ( `No objects to delete for ${ bucket } , skipping deleteObjects call` ) ;
80+ return ;
81+ }
82+
7383 const params = {
7484 Bucket : bucket ,
7585 Delete : { Objects : objectsToDelete } ,
7686 } ;
77- s3 . deleteObjects ( params , err => {
78- if ( err ) {
79- log . error ( 'batch delete err' , err ) ;
80- return cb ( err ) ;
87+ const command = new DeleteObjectsCommand ( params ) ;
88+ command . middlewareStack . add (
89+ next => async args => {
90+ if ( args . request . body ) {
91+ const bodyContent = Buffer . from ( args . request . body ) ;
92+ const md5Hash = crypto . createHash ( 'md5' ) . update ( bodyContent ) . digest ( 'base64' ) ;
93+ // eslint-disable-next-line no-param-reassign
94+ args . request . headers [ 'Content-MD5' ] = md5Hash ;
95+ }
96+ return await next ( args ) ;
97+ } ,
98+ {
99+ step : 'build' ,
81100 }
101+ ) ;
102+
103+ try {
104+ await s3 . send ( command ) ;
82105 objectsToDelete . forEach ( v => log . info ( `deleted key: ${ v . Key } ` ) ) ;
83- return cb ( ) ;
84- } ) ;
106+ } catch ( err ) {
107+ log . error ( 'batch delete err' , err ) ;
108+ throw err ;
109+ }
85110}
86111
87- function cleanupVersions ( bucket , cb ) {
112+ async function cleanupVersions ( bucket ) {
88113 let VersionIdMarker = null ;
89114 let KeyMarker = null ;
90- async . doWhilst (
91- done => _listObjectVersions (
92- bucket ,
93- VersionIdMarker ,
94- KeyMarker ,
95- ( err , data ) => {
96- if ( err ) {
97- return done ( err ) ;
98- }
99- VersionIdMarker = data . NextVersionIdMarker ;
100- KeyMarker = data . NextKeyMarker ;
101- const keysToDelete = _getKeys ( data . Versions ) ;
102- const markersToDelete = _getKeys ( data . DeleteMarkers ) ;
103- return _deleteVersions (
104- bucket ,
105- keysToDelete . concat ( markersToDelete ) ,
106- done ,
107- ) ;
108- } ,
109- ) ,
110- ( ) => {
111- if ( VersionIdMarker || KeyMarker ) {
112- return true ;
113- }
114- return false ;
115- } ,
116- cb ,
117- ) ;
115+ let IsTruncated = true ;
116+
117+ while ( IsTruncated ) {
118+ const data = await _listObjectVersions ( bucket , VersionIdMarker , KeyMarker ) ;
119+ VersionIdMarker = data . NextVersionIdMarker ;
120+ KeyMarker = data . NextKeyMarker ;
121+ IsTruncated = data . IsTruncated ;
122+
123+ const keysToDelete = _getKeys ( data . Versions || [ ] ) ;
124+ const markersToDelete = _getKeys ( data . DeleteMarkers || [ ] ) ;
125+ const allObjectsToDelete = keysToDelete . concat ( markersToDelete ) ;
126+
127+ if ( allObjectsToDelete . length > 0 ) {
128+ await _deleteVersions ( bucket , allObjectsToDelete ) ;
129+ } else {
130+ log . info ( `No objects to delete for bucket ${ bucket } ` ) ;
131+ }
132+ }
118133}
119134
120- function abortAllMultipartUploads ( bucket , cb ) {
121- s3 . listMultipartUploads ( { Bucket : bucket } , ( err , res ) => {
122- if ( err ) {
123- return cb ( err ) ;
124- }
125- if ( ! res || ! res . Uploads ) {
126- return cb ( ) ;
127- }
128- return async . mapLimit (
129- res . Uploads ,
130- 10 ,
131- ( item , done ) => {
132- const { Key, UploadId } = item ;
133- const params = { Bucket : bucket , Key, UploadId } ;
134- s3 . abortMultipartUpload ( params , done ) ;
135- } ,
136- cb ,
137- ) ;
135+ async function abortAllMultipartUploads ( bucket ) {
136+ const res = await s3 . send ( new ListMultipartUploadsCommand ( { Bucket : bucket } ) ) ;
137+ log . info ( `Found ${ res . Uploads ? res . Uploads . length : 0 } multipart uploads to abort` ) ;
138+
139+ if ( ! res || ! res . Uploads || res . Uploads . length === 0 ) {
140+ return ;
141+ }
142+
143+ const deleteMpuPromises = res . Uploads . map ( async item => {
144+ const { Key, UploadId } = item ;
145+ const params = { Bucket : bucket , Key, UploadId } ;
146+ return await s3 . send ( new AbortMultipartUploadCommand ( params ) ) ;
138147 } ) ;
148+
149+ await Promise . all ( deleteMpuPromises ) ;
139150}
140151
141- function _cleanupBucket ( bucket , cb ) {
142- async . parallel ( [
143- done => cleanupVersions ( bucket , done ) ,
144- done => abortAllMultipartUploads ( bucket , done ) ,
145- ] , err => {
146- if ( err ) {
147- log . error ( 'error occured deleting objects' , err ) ;
148- return cb ( err ) ;
149- }
152+ async function _cleanupBucket ( bucket ) {
153+ try {
154+ await Promise . all ( [
155+ cleanupVersions ( bucket ) ,
156+ abortAllMultipartUploads ( bucket ) ,
157+ ] ) ;
150158 log . info ( `completed cleaning up of bucket: ${ bucket } ` ) ;
151- return cb ( ) ;
152- } ) ;
159+ } catch ( err ) {
160+ log . error ( 'error occured deleting objects' , err ) ;
161+ throw err ;
162+ }
153163}
154164
155- function cleanupBuckets ( buckets ) {
156- async . mapLimit ( buckets , 1 , _cleanupBucket , err => {
157- if ( err ) {
158- return log . error ( 'error occured deleting objects' , err ) ;
165+ async function cleanupBuckets ( buckets ) {
166+ try {
167+ for ( const bucket of buckets ) {
168+ await _cleanupBucket ( bucket ) ;
159169 }
160- return log . info ( 'completed cleaning up the given buckets' ) ;
161- } ) ;
170+ log . info ( 'completed cleaning all buckets' ) ;
171+ } catch ( err ) {
172+ log . error ( 'error occured deleting objects' , err ) ;
173+ }
162174}
163175
164176cleanupBuckets ( BUCKETS ) ;
0 commit comments