@@ -5,149 +5,206 @@ const http = require('http');
55
66const { Logger } = require ( 'werelogs' ) ;
77
8- const log = new Logger ( 's3utils:listObjectsByReplicationStatus' ) ;
9-
10- // configurable params
11- const BUCKETS = process . argv [ 2 ] ? process . argv [ 2 ] . split ( ',' ) : null ;
12- const { ACCESS_KEY } = process . env ;
13- const { SECRET_KEY } = process . env ;
14- const { ENDPOINT } = process . env ;
158const LISTING_LIMIT = 1000 ;
16- let { REPLICATION_STATUS } = process . env ;
179
18- if ( ! BUCKETS || BUCKETS . length === 0 ) {
19- log . error ( 'No buckets given as input! Please provide '
20- + 'a comma-separated list of buckets' ) ;
21- process . exit ( 1 ) ;
22- }
23- if ( ! ENDPOINT ) {
24- log . error ( 'ENDPOINT not defined!' ) ;
25- process . exit ( 1 ) ;
26- }
27- if ( ! ACCESS_KEY ) {
28- log . error ( 'ACCESS_KEY not defined' ) ;
29- process . exit ( 1 ) ;
30- }
31- if ( ! SECRET_KEY ) {
32- log . error ( 'SECRET_KEY not defined' ) ;
33- process . exit ( 1 ) ;
34- }
35- if ( ! REPLICATION_STATUS ) {
36- REPLICATION_STATUS = 'FAILED' ;
37- }
10+ /**
11+ * Main function to list objects by replication status
12+ * @param {Object } options - Configuration options
13+ * @param {string } options.buckets - Comma-separated list of buckets
14+ * @param {string } options.accessKey - AWS access key
15+ * @param {string } options.secretKey - AWS secret key
16+ * @param {string } options.endpoint - S3 endpoint
17+ * @param {string } [options.replicationStatus='FAILED'] - Comma-separated replication statuses
18+ * @param {Object } [options.logger] - Logger instance
19+ * @returns {Promise<void> }
20+ */
21+ function listObjectsByReplicationStatus ( options ) {
22+ return new Promise ( ( resolve , reject ) => {
23+ const {
24+ buckets,
25+ accessKey,
26+ secretKey,
27+ endpoint,
28+ replicationStatus = 'FAILED' ,
29+ logger,
30+ } = options ;
3831
39- const replicationStatusToProcess = REPLICATION_STATUS . split ( ',' ) ;
40- replicationStatusToProcess . forEach ( state => {
41- if ( ! [ 'NEW' , 'PENDING' , 'COMPLETED' , 'FAILED' , 'REPLICA' ] . includes ( state ) ) {
42- log . error ( 'invalid REPLICATION_STATUS environment: must be a '
43- + 'comma-separated list of replication statuses to list, '
44- + 'as NEW,PENDING,COMPLETED,FAILED,REPLICA.' ) ;
45- process . exit ( 1 ) ;
46- }
47- } ) ;
48- log . info ( 'Objects with replication status '
49- + `${ replicationStatusToProcess . join ( ' or ' ) } will be listed` ) ;
50-
51- const s3 = new S3Client ( {
52- region : 'us-east-1' ,
53- credentials : {
54- accessKeyId : ACCESS_KEY ,
55- secretAccessKey : SECRET_KEY ,
56- } ,
57- endpoint : ENDPOINT ,
58- forcePathStyle : true ,
59- tls : false ,
60- requestHandler : new NodeHttpHandler ( {
61- httpAgent : new http . Agent ( { keepAlive : true } ) ,
62- requestTimeout : 60000 ,
63- } ) ,
64- } ) ;
65-
66- // list object versions
67- function _listObjectVersions ( bucket , VersionIdMarker , KeyMarker , cb ) {
68- s3 . send ( new ListObjectVersionsCommand ( {
69- Bucket : bucket ,
70- MaxKeys : LISTING_LIMIT ,
71- VersionIdMarker,
72- KeyMarker,
73- } ) ) . then ( data => cb ( null , data ) ) . catch ( cb ) ;
74- }
32+ const log = logger || new Logger ( 's3utils:listObjectsByReplicationStatus' ) ;
7533
76- // return object with key and version_id
77- function _getKeys ( list ) {
78- return list . map ( v => ( {
79- Key : v . Key ,
80- VersionId : v . VersionId ,
81- } ) ) ;
82- }
34+ // Validate inputs
35+ if ( ! buckets || buckets . trim ( ) . length === 0 ) {
36+ const error = new Error ( 'No buckets given as input! Please provide a comma-separated list of buckets' ) ;
37+ log . error ( error . message ) ;
38+ reject ( error ) ;
39+ return ;
40+ }
41+ if ( ! endpoint ) {
42+ const error = new Error ( 'ENDPOINT not defined!' ) ;
43+ log . error ( error . message ) ;
44+ reject ( error ) ;
45+ return ;
46+ }
47+ if ( ! accessKey ) {
48+ const error = new Error ( 'ACCESS_KEY not defined' ) ;
49+ log . error ( error . message ) ;
50+ reject ( error ) ;
51+ return ;
52+ }
53+ if ( ! secretKey ) {
54+ const error = new Error ( 'SECRET_KEY not defined' ) ;
55+ log . error ( error . message ) ;
56+ reject ( error ) ;
57+ return ;
58+ }
8359
84- function listBucket ( bucket , cb ) {
85- const bucketName = bucket . trim ( ) ;
86- let VersionIdMarker = null ;
87- let KeyMarker = null ;
88- log . info ( 'listing objects by replication status from bucket' , {
89- bucket,
90- replicationStatus : replicationStatusToProcess . join ( ',' )
91- } ) ;
92- async . doWhilst (
93- done => _listObjectVersions (
94- bucketName ,
95- VersionIdMarker ,
96- KeyMarker ,
97- ( err , data ) => {
98- if ( err ) {
99- log . error ( 'error occured while listing' , { error : err , bucketName } ) ;
100- return done ( err ) ;
101- }
102- const keys = _getKeys ( data . Versions || [ ] ) ;
103- return async . mapLimit ( keys , 10 , ( k , next ) => {
104- const { Key, VersionId } = k ;
105- s3 . send ( new HeadObjectCommand ( {
106- Bucket : bucketName ,
107- Key,
108- VersionId,
109- } ) ) . then ( res => {
110- if ( replicationStatusToProcess . includes ( res . ReplicationStatus ) ) {
111- log . info ( 'object with matching replication status found' , {
112- Key,
113- ReplicationStatus : res . ReplicationStatus ,
114- ...res
115- } ) ;
60+ const bucketList = buckets . split ( ',' ) ;
61+ const replicationStatusToProcess = replicationStatus . split ( ',' ) ;
62+
63+ // Validate replication statuses
64+ for ( const state of replicationStatusToProcess ) {
65+ if ( ! [ 'NEW' , 'PENDING' , 'COMPLETED' , 'FAILED' , 'REPLICA' ] . includes ( state ) ) {
66+ const error = new Error ( 'invalid REPLICATION_STATUS: must be a comma-separated list of replication statuses: NEW,PENDING,COMPLETED,FAILED,REPLICA.' ) ;
67+ log . error ( error . message ) ;
68+ reject ( error ) ;
69+ return ;
70+ }
71+ }
72+
73+ log . info ( 'Objects with replication status '
74+ + `${ replicationStatusToProcess . join ( ' or ' ) } will be listed` ) ;
75+
76+ const s3 = new S3Client ( {
77+ region : 'us-east-1' ,
78+ credentials : {
79+ accessKeyId : accessKey ,
80+ secretAccessKey : secretKey ,
81+ } ,
82+ endpoint,
83+ forcePathStyle : true ,
84+ tls : false ,
85+ requestHandler : new NodeHttpHandler ( {
86+ httpAgent : new http . Agent ( { keepAlive : true } ) ,
87+ requestTimeout : 60000 ,
88+ } ) ,
89+ } ) ;
90+
91+ // list object versions
92+ function _listObjectVersions ( bucket , VersionIdMarker , KeyMarker , cb ) {
93+ s3 . send ( new ListObjectVersionsCommand ( {
94+ Bucket : bucket ,
95+ MaxKeys : LISTING_LIMIT ,
96+ VersionIdMarker,
97+ KeyMarker,
98+ } ) ) . then ( data => cb ( null , data ) ) . catch ( cb ) ;
99+ }
100+
101+ // return object with key and version_id
102+ function _getKeys ( list ) {
103+ return list . map ( v => ( {
104+ Key : v . Key ,
105+ VersionId : v . VersionId ,
106+ } ) ) ;
107+ }
108+
109+ function listBucket ( bucket , cb ) {
110+ const bucketName = bucket . trim ( ) ;
111+ let VersionIdMarker = null ;
112+ let KeyMarker = null ;
113+ log . info ( 'listing objects by replication status from bucket' , {
114+ bucket,
115+ replicationStatus : replicationStatusToProcess . join ( ',' )
116+ } ) ;
117+ async . doWhilst (
118+ done => _listObjectVersions (
119+ bucketName ,
120+ VersionIdMarker ,
121+ KeyMarker ,
122+ ( err , data ) => {
123+ if ( err ) {
124+ log . error ( 'error occured while listing' , { error : err , bucketName } ) ;
125+ return done ( err ) ;
116126 }
117- return next ( ) ;
118- } ) . catch ( next ) ;
119- } , err => {
120- if ( err ) {
121- return done ( err ) ;
127+ const keys = _getKeys ( data . Versions || [ ] ) ;
128+ return async . mapLimit ( keys , 10 , ( k , next ) => {
129+ const { Key, VersionId } = k ;
130+ s3 . send ( new HeadObjectCommand ( {
131+ Bucket : bucketName ,
132+ Key,
133+ VersionId,
134+ } ) ) . then ( res => {
135+ if ( replicationStatusToProcess . includes ( res . ReplicationStatus ) ) {
136+ log . info ( 'object with matching replication status found' , {
137+ Key,
138+ ReplicationStatus : res . ReplicationStatus ,
139+ ...res
140+ } ) ;
141+ }
142+ return next ( ) ;
143+ } ) . catch ( next ) ;
144+ } , err => {
145+ if ( err ) {
146+ return done ( err ) ;
147+ }
148+ VersionIdMarker = data . NextVersionIdMarker ;
149+ KeyMarker = data . NextKeyMarker ;
150+ return done ( ) ;
151+ } ) ;
152+ } ,
153+ ) ,
154+ async ( ) => {
155+ if ( ! VersionIdMarker || ! KeyMarker ) {
156+ log . debug (
157+ 'completed listing objects by replication status for bucket' ,
158+ { bucket } ,
159+ ) ;
160+ return false ;
122161 }
123- VersionIdMarker = data . NextVersionIdMarker ;
124- KeyMarker = data . NextKeyMarker ;
125- return done ( ) ;
126- } ) ;
162+ return true ;
163+ } ,
164+ cb ,
165+ ) ;
166+ }
167+
168+ async . mapSeries (
169+ bucketList ,
170+ ( bucket , done ) => listBucket ( bucket , done ) ,
171+ err => {
172+ if ( err ) {
173+ log . error ( 'error occured while listing objects by replication status' , {
174+ error : err ,
175+ } ) ;
176+ return reject ( err ) ;
177+ }
178+ // Cleanup S3 client
179+ if ( s3 && typeof s3 . destroy === 'function' ) {
180+ s3 . destroy ( ) ;
181+ }
182+ return resolve ( ) ;
127183 } ,
128- ) ,
129- async ( ) => {
130- if ( ! VersionIdMarker || ! KeyMarker ) {
131- log . debug (
132- 'completed listing objects by replication status for bucket' ,
133- { bucket } ,
134- ) ;
135- return false ;
136- }
137- return true ;
138- } ,
139- cb ,
140- ) ;
184+ ) ;
185+ } ) ;
141186}
142187
143- async . mapSeries (
144- BUCKETS ,
145- ( bucket , done ) => listBucket ( bucket , done ) ,
146- err => {
147- if ( err ) {
148- log . error ( 'error occured while listing objects by replication status' , {
149- error : err ,
150- } ) ;
151- }
152- } ,
153- ) ;
188+ module . exports = { listObjectsByReplicationStatus } ;
189+
190+ if ( require . main === module ) {
191+ const log = new Logger ( 's3utils:listObjectsByReplicationStatus' ) ;
192+
193+ const BUCKETS = process . argv [ 2 ] || null ;
194+ const { ACCESS_KEY , SECRET_KEY , ENDPOINT , REPLICATION_STATUS } = process . env ;
195+
196+ listObjectsByReplicationStatus ( {
197+ buckets : BUCKETS ,
198+ accessKey : ACCESS_KEY ,
199+ secretKey : SECRET_KEY ,
200+ endpoint : ENDPOINT ,
201+ replicationStatus : REPLICATION_STATUS ,
202+ logger : log ,
203+ } ) . then ( ( ) => {
204+ log . info ( 'Completed successfully' ) ;
205+ process . exit ( 0 ) ;
206+ } ) . catch ( err => {
207+ log . error ( 'Failed with error' , { error : err } ) ;
208+ process . exit ( 1 ) ;
209+ } ) ;
210+ }
0 commit comments