@@ -5,149 +5,204 @@ const http = require('http');
55
66const { Logger } = require ( 'werelogs' ) ;
77
8- const log = new Logger ( 's3utils:listObjectsByReplicationStatus' ) ;
9-
10- // configurable params
11- const BUCKETS = process . argv [ 2 ] ? process . argv [ 2 ] . split ( ',' ) : null ;
12- const { ACCESS_KEY } = process . env ;
13- const { SECRET_KEY } = process . env ;
14- const { ENDPOINT } = process . env ;
158const LISTING_LIMIT = 1000 ;
16- let { REPLICATION_STATUS } = process . env ;
179
18- if ( ! BUCKETS || BUCKETS . length === 0 ) {
19- log . error ( 'No buckets given as input! Please provide '
20- + 'a comma-separated list of buckets' ) ;
21- process . exit ( 1 ) ;
22- }
23- if ( ! ENDPOINT ) {
24- log . error ( 'ENDPOINT not defined!' ) ;
25- process . exit ( 1 ) ;
26- }
27- if ( ! ACCESS_KEY ) {
28- log . error ( 'ACCESS_KEY not defined' ) ;
29- process . exit ( 1 ) ;
30- }
31- if ( ! SECRET_KEY ) {
32- log . error ( 'SECRET_KEY not defined' ) ;
33- process . exit ( 1 ) ;
34- }
35- if ( ! REPLICATION_STATUS ) {
36- REPLICATION_STATUS = 'FAILED' ;
37- }
10+ /**
11+ * Main function to list objects by replication status
12+ * @param {Object } options - Configuration options
13+ * @param {string } options.buckets - Comma-separated list of buckets
14+ * @param {string } options.accessKey - AWS access key
15+ * @param {string } options.secretKey - AWS secret key
16+ * @param {string } options.endpoint - S3 endpoint
17+ * @param {string } [options.replicationStatus='FAILED'] - Comma-separated replication statuses
18+ * @param {Object } [options.logger] - Logger instance
19+ * @returns {Promise<void> }
20+ */
21+ function listObjectsByReplicationStatus ( options ) {
22+ return new Promise ( ( resolve , reject ) => {
23+ const {
24+ buckets,
25+ accessKey,
26+ secretKey,
27+ endpoint,
28+ replicationStatus = 'FAILED' ,
29+ logger,
30+ } = options ;
3831
39- const replicationStatusToProcess = REPLICATION_STATUS . split ( ',' ) ;
40- replicationStatusToProcess . forEach ( state => {
41- if ( ! [ 'NEW' , 'PENDING' , 'COMPLETED' , 'FAILED' , 'REPLICA' ] . includes ( state ) ) {
42- log . error ( 'invalid REPLICATION_STATUS environment: must be a '
43- + 'comma-separated list of replication statuses to list, '
44- + 'as NEW,PENDING,COMPLETED,FAILED,REPLICA.' ) ;
45- process . exit ( 1 ) ;
46- }
47- } ) ;
48- log . info ( 'Objects with replication status '
49- + `${ replicationStatusToProcess . join ( ' or ' ) } will be listed` ) ;
50-
51- const s3 = new S3Client ( {
52- region : 'us-east-1' ,
53- credentials : {
54- accessKeyId : ACCESS_KEY ,
55- secretAccessKey : SECRET_KEY ,
56- } ,
57- endpoint : ENDPOINT ,
58- forcePathStyle : true ,
59- tls : false ,
60- requestHandler : new NodeHttpHandler ( {
61- httpAgent : new http . Agent ( { keepAlive : true } ) ,
62- requestTimeout : 60000 ,
63- } ) ,
64- } ) ;
65-
66- // list object versions
67- function _listObjectVersions ( bucket , VersionIdMarker , KeyMarker , cb ) {
68- s3 . send ( new ListObjectVersionsCommand ( {
69- Bucket : bucket ,
70- MaxKeys : LISTING_LIMIT ,
71- VersionIdMarker,
72- KeyMarker,
73- } ) ) . then ( data => cb ( null , data ) ) . catch ( cb ) ;
74- }
32+ const log = logger || new Logger ( 's3utils:listObjectsByReplicationStatus' ) ;
7533
76- // return object with key and version_id
77- function _getKeys ( list ) {
78- return list . map ( v => ( {
79- Key : v . Key ,
80- VersionId : v . VersionId ,
81- } ) ) ;
82- }
34+ // Validate inputs
35+ if ( ! buckets || buckets . trim ( ) . length === 0 ) {
36+ const error = new Error ( 'No buckets given as input! Please provide a comma-separated list of buckets' ) ;
37+ log . error ( error . message ) ;
38+ return reject ( error ) ;
39+ }
40+ if ( ! endpoint ) {
41+ const error = new Error ( 'ENDPOINT not defined!' ) ;
42+ log . error ( error . message ) ;
43+ return reject ( error ) ;
44+ }
45+ if ( ! accessKey ) {
46+ const error = new Error ( 'ACCESS_KEY not defined' ) ;
47+ log . error ( error . message ) ;
48+ return reject ( error ) ;
49+ }
50+ if ( ! secretKey ) {
51+ const error = new Error ( 'SECRET_KEY not defined' ) ;
52+ log . error ( error . message ) ;
53+ return reject ( error ) ;
54+ }
8355
84- function listBucket ( bucket , cb ) {
85- const bucketName = bucket . trim ( ) ;
86- let VersionIdMarker = null ;
87- let KeyMarker = null ;
88- log . info ( 'listing objects by replication status from bucket' , {
89- bucket,
90- replicationStatus : replicationStatusToProcess . join ( ',' )
91- } ) ;
92- async . doWhilst (
93- done => _listObjectVersions (
94- bucketName ,
95- VersionIdMarker ,
96- KeyMarker ,
97- ( err , data ) => {
98- if ( err ) {
99- log . error ( 'error occured while listing' , { error : err , bucketName } ) ;
100- return done ( err ) ;
101- }
102- const keys = _getKeys ( data . Versions || [ ] ) ;
103- return async . mapLimit ( keys , 10 , ( k , next ) => {
104- const { Key, VersionId } = k ;
105- s3 . send ( new HeadObjectCommand ( {
106- Bucket : bucketName ,
107- Key,
108- VersionId,
109- } ) ) . then ( res => {
110- if ( replicationStatusToProcess . includes ( res . ReplicationStatus ) ) {
111- log . info ( 'object with matching replication status found' , {
112- Key,
113- ReplicationStatus : res . ReplicationStatus ,
114- ...res
115- } ) ;
56+ const bucketList = buckets . split ( ',' ) ;
57+ const replicationStatusToProcess = replicationStatus . split ( ',' ) ;
58+
59+ // Validate replication statuses
60+ for ( const state of replicationStatusToProcess ) {
61+ if ( ! [ 'NEW' , 'PENDING' , 'COMPLETED' , 'FAILED' , 'REPLICA' ] . includes ( state ) ) {
62+ const error = new Error ( 'invalid REPLICATION_STATUS: must be a comma-separated list of replication statuses: NEW,PENDING,COMPLETED,FAILED,REPLICA.' ) ;
63+ log . error ( error . message ) ;
64+ return reject ( error ) ;
65+ }
66+ }
67+
68+ log . info ( 'Objects with replication status '
69+ + `${ replicationStatusToProcess . join ( ' or ' ) } will be listed` ) ;
70+
71+ const s3 = new S3Client ( {
72+ region : 'us-east-1' ,
73+ credentials : {
74+ accessKeyId : accessKey ,
75+ secretAccessKey : secretKey ,
76+ } ,
77+ endpoint,
78+ forcePathStyle : true ,
79+ tls : false ,
80+ requestHandler : new NodeHttpHandler ( {
81+ httpAgent : new http . Agent ( { keepAlive : true } ) ,
82+ requestTimeout : 60000 ,
83+ } ) ,
84+ } ) ;
85+
86+ // list object versions
87+ function _listObjectVersions ( bucket , VersionIdMarker , KeyMarker , cb ) {
88+ s3 . send ( new ListObjectVersionsCommand ( {
89+ Bucket : bucket ,
90+ MaxKeys : LISTING_LIMIT ,
91+ VersionIdMarker,
92+ KeyMarker,
93+ } ) ) . then ( data => cb ( null , data ) ) . catch ( cb ) ;
94+ }
95+
96+ // return object with key and version_id
97+ function _getKeys ( list ) {
98+ return list . map ( v => ( {
99+ Key : v . Key ,
100+ VersionId : v . VersionId ,
101+ } ) ) ;
102+ }
103+
104+ function listBucket ( bucket , cb ) {
105+ const bucketName = bucket . trim ( ) ;
106+ let VersionIdMarker = null ;
107+ let KeyMarker = null ;
108+ log . info ( 'listing objects by replication status from bucket' , {
109+ bucket,
110+ replicationStatus : replicationStatusToProcess . join ( ',' )
111+ } ) ;
112+ async . doWhilst (
113+ done => _listObjectVersions (
114+ bucketName ,
115+ VersionIdMarker ,
116+ KeyMarker ,
117+ ( err , data ) => {
118+ if ( err ) {
119+ log . error ( 'error occured while listing' , { error : err , bucketName } ) ;
120+ return done ( err ) ;
116121 }
117- return next ( ) ;
118- } ) . catch ( next ) ;
119- } , err => {
120- if ( err ) {
121- return done ( err ) ;
122+ const keys = _getKeys ( data . Versions || [ ] ) ;
123+ return async . mapLimit ( keys , 10 , ( k , next ) => {
124+ const { Key, VersionId } = k ;
125+ s3 . send ( new HeadObjectCommand ( {
126+ Bucket : bucketName ,
127+ Key,
128+ VersionId,
129+ } ) ) . then ( res => {
130+ if ( replicationStatusToProcess . includes ( res . ReplicationStatus ) ) {
131+ log . info ( 'object with matching replication status found' , {
132+ Key,
133+ ReplicationStatus : res . ReplicationStatus ,
134+ ...res
135+ } ) ;
136+ }
137+ return next ( ) ;
138+ } ) . catch ( next ) ;
139+ } , err => {
140+ if ( err ) {
141+ return done ( err ) ;
142+ }
143+ VersionIdMarker = data . NextVersionIdMarker ;
144+ KeyMarker = data . NextKeyMarker ;
145+ return done ( ) ;
146+ } ) ;
147+ } ,
148+ ) ,
149+ async ( ) => {
150+ if ( ! VersionIdMarker || ! KeyMarker ) {
151+ log . debug (
152+ 'completed listing objects by replication status for bucket' ,
153+ { bucket } ,
154+ ) ;
155+ return false ;
122156 }
123- VersionIdMarker = data . NextVersionIdMarker ;
124- KeyMarker = data . NextKeyMarker ;
125- return done ( ) ;
126- } ) ;
157+ return true ;
158+ } ,
159+ cb ,
160+ ) ;
161+ }
162+
163+ async . mapSeries (
164+ bucketList ,
165+ ( bucket , done ) => listBucket ( bucket , done ) ,
166+ err => {
167+ if ( err ) {
168+ log . error ( 'error occured while listing objects by replication status' , {
169+ error : err ,
170+ } ) ;
171+ return reject ( err ) ;
172+ }
173+ // Cleanup S3 client
174+ if ( s3 && typeof s3 . destroy === 'function' ) {
175+ s3 . destroy ( ) ;
176+ }
177+ return resolve ( ) ;
127178 } ,
128- ) ,
129- async ( ) => {
130- if ( ! VersionIdMarker || ! KeyMarker ) {
131- log . debug (
132- 'completed listing objects by replication status for bucket' ,
133- { bucket } ,
134- ) ;
135- return false ;
136- }
137- return true ;
138- } ,
139- cb ,
140- ) ;
179+ ) ;
180+ } ) ;
141181}
142182
143- async . mapSeries (
144- BUCKETS ,
145- ( bucket , done ) => listBucket ( bucket , done ) ,
146- err => {
147- if ( err ) {
148- log . error ( 'error occured while listing objects by replication status' , {
149- error : err ,
150- } ) ;
151- }
152- } ,
153- ) ;
183+ // Export for testing
184+ module . exports = { listObjectsByReplicationStatus } ;
185+
186+ // Execute if run directly (not imported)
187+ if ( require . main === module ) {
188+ const log = new Logger ( 's3utils:listObjectsByReplicationStatus' ) ;
189+
190+ // Read from command line and environment variables (original behavior)
191+ const BUCKETS = process . argv [ 2 ] || null ;
192+ const { ACCESS_KEY , SECRET_KEY , ENDPOINT , REPLICATION_STATUS } = process . env ;
193+
194+ listObjectsByReplicationStatus ( {
195+ buckets : BUCKETS ,
196+ accessKey : ACCESS_KEY ,
197+ secretKey : SECRET_KEY ,
198+ endpoint : ENDPOINT ,
199+ replicationStatus : REPLICATION_STATUS ,
200+ logger : log ,
201+ } ) . then ( ( ) => {
202+ log . info ( 'Completed successfully' ) ;
203+ process . exit ( 0 ) ;
204+ } ) . catch ( err => {
205+ log . error ( 'Failed with error' , { error : err } ) ;
206+ process . exit ( 1 ) ;
207+ } ) ;
208+ }
0 commit comments