@@ -12,9 +12,11 @@ const nb_native = require('../util/nb_native');
1212const NsfsObjectSDK = require ( '../sdk/nsfs_object_sdk' ) ;
1313const native_fs_utils = require ( '../util/native_fs_utils' ) ;
1414const { NoobaaEvent } = require ( './manage_nsfs_events_utils' ) ;
15+ const notifications_util = require ( '../util/notifications_util' ) ;
1516const ManageCLIError = require ( './manage_nsfs_cli_errors' ) . ManageCLIError ;
1617const { throw_cli_error, get_service_status, NOOBAA_SERVICE_NAME ,
1718 is_desired_time, record_current_time } = require ( './manage_nsfs_cli_utils' ) ;
19+ const SensitiveString = require ( '../util/sensitive_string' ) ;
1820
1921// TODO:
2022// implement
@@ -155,8 +157,9 @@ async function process_bucket(config_fs, bucket_name, system_json) {
155157 const account = { email : '' , nsfs_account_config : config_fs . fs_context , access_keys : [ ] } ;
156158 const object_sdk = new NsfsObjectSDK ( '' , config_fs , account , bucket_json . versioning , config_fs . config_root , system_json ) ;
157159 await object_sdk . _simple_load_requesting_account ( ) ;
160+ const should_notify = notifications_util . should_notify_on_event ( bucket_json , notifications_util . OP_TO_EVENT . lifecycle_delete . name ) ;
158161 if ( ! bucket_json . lifecycle_configuration_rules ) return { } ;
159- await process_rules ( config_fs , bucket_json , object_sdk ) ;
162+ await process_rules ( config_fs , bucket_json , object_sdk , should_notify ) ;
160163}
161164
162165/**
@@ -165,15 +168,15 @@ async function process_bucket(config_fs, bucket_name, system_json) {
165168 * @param {Object } bucket_json
166169 * @param {nb.ObjectSDK } object_sdk
167170 */
168- async function process_rules ( config_fs , bucket_json , object_sdk ) {
171+ async function process_rules ( config_fs , bucket_json , object_sdk , should_notify ) {
169172 try {
170173 await P . all ( _ . map ( bucket_json . lifecycle_configuration_rules ,
171174 async ( lifecycle_rule , index ) =>
172175 await _call_op_and_update_status ( {
173176 bucket_name : bucket_json . name ,
174177 rule_id : lifecycle_rule . id ,
175178 op_name : TIMED_OPS . PROCESS_RULE ,
176- op_func : async ( ) => process_rule ( config_fs , lifecycle_rule , index , bucket_json , object_sdk )
179+ op_func : async ( ) => process_rule ( config_fs , lifecycle_rule , index , bucket_json , object_sdk , should_notify )
177180 } )
178181 )
179182 ) ;
@@ -182,6 +185,40 @@ async function process_rules(config_fs, bucket_json, object_sdk) {
182185 }
183186}
184187
188+ async function send_lifecycle_notifications ( delete_res , bucket_json , object_sdk ) {
189+ const writes = [ ] ;
190+ for ( const deleted_obj of delete_res ) {
191+ if ( delete_res . err_code ) continue ;
192+ for ( const notif of bucket_json . notifications ) {
193+ if ( notifications_util . check_notif_relevant ( notif , {
194+ op_name : 'lifecycle_delete' ,
195+ s3_event_method : deleted_obj . delete_marker_created ? 'DeleteMarkerCreated' : 'Delete' ,
196+ } ) ) {
197+ //remember that this deletion needs a notif for this specific notification conf
198+ writes . push ( { notif, deleted_obj} ) ;
199+ }
200+ }
201+ }
202+
203+ //required format by compose_notification_lifecycle
204+ bucket_json . bucket_owner = new SensitiveString ( bucket_json . bucket_owner ) ;
205+
206+ //if any notifications are needed, write them in notification log file
207+ //(otherwise don't do any unnecessary filesystem actions)
208+ if ( writes . length > 0 ) {
209+ let logger ;
210+ try {
211+ logger = notifications_util . get_notification_logger ( 'SHARED' ) ;
212+ await P . map_with_concurrency ( 100 , writes , async write => {
213+ const notif = notifications_util . compose_notification_lifecycle ( write . deleted_obj , write . notif , bucket_json , object_sdk ) ;
214+ logger . append ( JSON . stringify ( notif ) ) ;
215+ } ) ;
216+ } finally {
217+ if ( logger ) logger . close ( ) ;
218+ }
219+ }
220+ }
221+
185222/**
186223 * process_rule processes the lifecycle rule for a bucket
187224 * TODO - implement notifications for the deleted objects (check if needed for abort mpus as well)
@@ -192,7 +229,7 @@ async function process_rules(config_fs, bucket_json, object_sdk) {
192229 * @param {nb.ObjectSDK } object_sdk
193230 * @returns {Promise<Void> }
194231 */
195- async function process_rule ( config_fs , lifecycle_rule , index , bucket_json , object_sdk ) {
232+ async function process_rule ( config_fs , lifecycle_rule , index , bucket_json , object_sdk , should_notify ) {
196233 dbg . log0 ( 'nc_lifecycle.process_rule: start bucket name:' , bucket_json . name , 'rule' , lifecycle_rule , 'index' , index ) ;
197234 const bucket_name = bucket_json . name ;
198235 const rule_id = lifecycle_rule . id ;
@@ -209,7 +246,7 @@ async function process_rule(config_fs, lifecycle_rule, index, bucket_json, objec
209246 } ) ;
210247
211248 if ( candidates . delete_candidates ?. length > 0 ) {
212- await _call_op_and_update_status ( {
249+ const delete_res = await _call_op_and_update_status ( {
213250 bucket_name,
214251 rule_id,
215252 op_name : TIMED_OPS . DELETE_MULTIPLE_OBJECTS ,
@@ -218,6 +255,9 @@ async function process_rule(config_fs, lifecycle_rule, index, bucket_json, objec
218255 objects : candidates . delete_candidates
219256 } )
220257 } ) ;
258+ if ( should_notify ) {
259+ await send_lifecycle_notifications ( delete_res , bucket_json , object_sdk ) ;
260+ }
221261 }
222262
223263 if ( candidates . abort_mpu_candidates ?. length > 0 ) {
0 commit comments