@@ -2,14 +2,15 @@ const fs = require('fs');
22const { http, https } = require ( 'httpagent' ) ;
33const { ObjectMD } = require ( 'arsenal' ) . models ;
44
5- const AWS = require ( 'aws-sdk' ) ;
5+ const { S3Client, ListObjectVersionsCommand, DeleteObjectsCommand } = require ( '@aws-sdk/client-s3' ) ;
6+ const { NodeHttpHandler } = require ( '@aws-sdk/node-http-handler' ) ;
7+ const { ConfiguredRetryStrategy } = require ( '@smithy/util-retry' ) ;
68const { doWhilst, eachSeries, filterLimit } = require ( 'async' ) ;
79
810const { Logger } = require ( 'werelogs' ) ;
911
1012const BackbeatClient = require ( './BackbeatClient' ) ;
1113const parseOlderThan = require ( './utils/parseOlderThan' ) ;
12- const { safeListObjectVersions } = require ( './utils/safeList' ) ;
1314
1415const log = new Logger ( 's3utils::cleanupNoncurrentVersions' ) ;
1516
@@ -173,6 +174,31 @@ if (s3EndpointIsHttps) {
173174 agent = new http . Agent ( { keepAlive : true } ) ;
174175}
175176
177+ const s3 = new S3Client ( {
178+ credentials : {
179+ accessKeyId : ACCESS_KEY ,
180+ secretAccessKey : SECRET_KEY ,
181+ } ,
182+ endpoint : S3_ENDPOINT ,
183+ region : 'us-east-1' ,
184+ forcePathStyle : true ,
185+ tls : s3EndpointIsHttps ,
186+ requestHandler : new NodeHttpHandler ( {
187+ httpAgent : agent ,
188+ httpsAgent : agent ,
189+ requestTimeout : 60000 ,
190+ } ) ,
191+ retryStrategy : new ConfiguredRetryStrategy (
192+ AWS_SDK_REQUEST_RETRIES ,
193+ // Custom backoff with exponential delay capped at 1mn max
194+ // between retries, and a little added jitter
195+ attempt => Math . min (
196+ AWS_SDK_REQUEST_INITIAL_DELAY_MS * 2 ** attempt ,
197+ 60000
198+ ) * ( 0.9 + Math . random ( ) * 0.2 )
199+ ) ,
200+ } ) ;
201+
176202const options = {
177203 accessKeyId : ACCESS_KEY ,
178204 secretAccessKey : SECRET_KEY ,
@@ -207,7 +233,6 @@ const s3Options = {
207233
208234const opt = Object . assign ( options , s3Options ) ;
209235
210- const s3 = new AWS . S3 ( opt ) ;
211236const bb = new BackbeatClient ( opt ) ;
212237
213238let nListed = 0 ;
@@ -244,13 +269,17 @@ const logProgressInterval = setInterval(
244269) ;
245270
246271function _listObjectVersions ( bucket , VersionIdMarker , KeyMarker , cb ) {
247- return safeListObjectVersions ( s3 , {
272+ const command = new ListObjectVersionsCommand ( {
248273 Bucket : bucket ,
249274 MaxKeys : LISTING_LIMIT ,
250275 Prefix : TARGET_PREFIX ,
251276 KeyMarker,
252277 VersionIdMarker,
253- } , cb ) ;
278+ } ) ;
279+
280+ s3 . send ( command )
281+ . then ( data => cb ( null , data ) )
282+ . catch ( cb ) ;
254283}
255284
256285function _getMetadata ( bucket , key , versionId , cb ) {
@@ -297,13 +326,21 @@ function _doBatchDelete(bucket) {
297326 batchDeleteInProgress = true ;
298327 // multi object delete can delete max 1000 objects
299328 const batchDeleteObjects = deleteQueue . splice ( 0 , 1000 ) ;
300- const params = {
329+ const command = new DeleteObjectsCommand ( {
301330 Bucket : bucket ,
302331 Delete : { Objects : batchDeleteObjects } ,
303- } ;
304- s3 . deleteObjects ( params , err => {
305- if ( err ) {
306- log . error ( 'batch delete error' , { error : err } ) ;
332+ } ) ;
333+ s3 . send ( command )
334+ . then ( ( ) => {
335+ nDeleted += batchDeleteObjects . length ;
336+ batchDeleteObjects . forEach ( v => log . info ( 'object deleted' , {
337+ bucket,
338+ key : v . Key ,
339+ versionId : v . VersionId ,
340+ } ) ) ;
341+ } )
342+ . catch ( err => {
343+ log . error ( 'batch delete error' , { error : err } ) ;
307344 nErrors += 1 ;
308345 batchDeleteObjects . forEach (
309346 v => log . error ( 'object may not be deleted' , {
@@ -312,29 +349,23 @@ function _doBatchDelete(bucket) {
312349 versionId : v . VersionId ,
313350 } ) ,
314351 ) ;
315- } else {
316- nDeleted += batchDeleteObjects . length ;
317- batchDeleteObjects . forEach ( v => log . info ( 'object deleted' , {
318- bucket,
319- key : v . Key ,
320- versionId : v . VersionId ,
321- } ) ) ;
322- }
323- if ( batchDeleteOnDrain && deleteQueue . length <= 1000 ) {
324- process . nextTick ( batchDeleteOnDrain ) ;
325- batchDeleteOnDrain = null ;
326- }
327- if ( batchDeleteOnFullDrain && deleteQueue . length === 0 ) {
328- process . nextTick ( batchDeleteOnFullDrain ) ;
329- batchDeleteOnFullDrain = null ;
330- }
331- if ( deleteQueue . length > 0 ) {
332- // there are more objects to delete, keep going
333- _doBatchDelete ( bucket ) ;
334- } else {
335- batchDeleteInProgress = false ;
336- }
337- } ) ;
352+ } )
353+ . finally ( ( ) => {
354+ if ( batchDeleteOnDrain && deleteQueue . length <= 1000 ) {
355+ process . nextTick ( batchDeleteOnDrain ) ;
356+ batchDeleteOnDrain = null ;
357+ }
358+ if ( batchDeleteOnFullDrain && deleteQueue . length === 0 ) {
359+ process . nextTick ( batchDeleteOnFullDrain ) ;
360+ batchDeleteOnFullDrain = null ;
361+ }
362+ if ( deleteQueue . length > 0 ) {
363+ // there are more objects to delete, keep going
364+ _doBatchDelete ( bucket ) ;
365+ } else {
366+ batchDeleteInProgress = false ;
367+ }
368+ } ) ;
338369}
339370
340371function _triggerDeletes ( bucket , versionsToDelete , cb ) {
@@ -565,11 +596,13 @@ function triggerDeletesOnBucket(bucketName, cb) {
565596 } ) ;
566597 return done ( err ) ;
567598 }
568- nListed += data . Versions . length + data . DeleteMarkers . length ;
599+ const versions = data . Versions || [ ] ;
600+ const deleteMarkers = data . DeleteMarkers || [ ] ;
601+ nListed += versions . length + deleteMarkers . length ;
569602 const ret = _triggerDeletesOnEligibleObjects (
570603 bucket ,
571- data . Versions ,
572- data . DeleteMarkers ,
604+ versions ,
605+ deleteMarkers ,
573606 ! data . IsTruncated ,
574607 err => {
575608 if ( err ) {
0 commit comments