@@ -11,6 +11,7 @@ import (
1111 "math"
1212 "strings"
1313 "time"
14+ "ydbcp/internal/audit"
1415 "ydbcp/internal/backup_operations"
1516 "ydbcp/internal/config"
1617 "ydbcp/internal/connectors/client"
@@ -73,7 +74,9 @@ func exp(p int) time.Duration {
7374 return time .Duration (math .Pow (BACKOFF_EXP , float64 (p )))
7475}
7576
76- func shouldRetry (config * pb.RetryConfig , count int , firstStart time.Time , lastEnd * time.Time , clock clockwork.Clock ) * time.Time {
77+ func shouldRetry (
78+ config * pb.RetryConfig , count int , firstStart time.Time , lastEnd * time.Time , clock clockwork.Clock ,
79+ ) * time.Time {
7780 if config == nil {
7881 if count == 0 {
7982 t := clock .Now ()
@@ -112,7 +115,21 @@ func shouldRetry(config *pb.RetryConfig, count int, firstStart time.Time, lastEn
112115 return & t
113116}
114117
115- func MakeRetryDecision (ctx context.Context , tbwr * types.TakeBackupWithRetryOperation , tbOp * types.TakeBackupOperation , clock clockwork.Clock ) (RetryDecision , error ) {
118+ func withBackupStateAudit (
119+ ctx context.Context , tbwr * types.TakeBackupWithRetryOperation , retry bool ,
120+ upsertError error ,
121+ ) error {
122+ if upsertError != nil {
123+ return upsertError
124+ }
125+ audit .ReportBackupStateAuditEvent (ctx , tbwr , retry , false )
126+ return nil
127+ }
128+
129+ func MakeRetryDecision (
130+ ctx context.Context , tbwr * types.TakeBackupWithRetryOperation , tbOp * types.TakeBackupOperation ,
131+ clock clockwork.Clock ,
132+ ) (RetryDecision , error ) {
116133 //retrieve last tbOp run time
117134 //if there is a tbOp, check its status
118135 //if success: set success to itself
@@ -160,13 +177,15 @@ func setErrorToRetryOperation(
160177 ops []types.Operation ,
161178 clock clockwork.Clock ,
162179) {
163- operationIDs := strings .Join (func () []string {
164- var ids []string
165- for _ , item := range ops {
166- ids = append (ids , item .GetID ())
167- }
168- return ids
169- }(), ", " )
180+ operationIDs := strings .Join (
181+ func () []string {
182+ var ids []string
183+ for _ , item := range ops {
184+ ids = append (ids , item .GetID ())
185+ }
186+ return ids
187+ }(), ", " ,
188+ )
170189 tbwr .State = types .OperationStateError
171190 now := clock .Now ()
172191 tbwr .UpdatedAt = timestamppb .New (now )
@@ -220,17 +239,23 @@ func TBWROperationHandler(
220239 if tbwr .ScheduleID != nil {
221240 ctx = xlog .With (ctx , zap .String ("ScheduleID" , * tbwr .ScheduleID ))
222241 }
223- ops , err := db .SelectOperations (ctx , queries .NewReadTableQuery (
224- queries .WithTableName ("Operations" ),
225- queries .WithIndex ("idx_p" ),
226- queries .WithQueryFilters (queries.QueryFilter {
227- Field : "parent_operation_id" ,
228- Values : []table_types.Value {table_types .StringValueFromString (tbwr .ID )},
229- }),
230- queries .WithOrderBy (queries.OrderSpec {
231- Field : "created_at" ,
232- }),
233- ))
242+ ops , err := db .SelectOperations (
243+ ctx , queries .NewReadTableQuery (
244+ queries .WithTableName ("Operations" ),
245+ queries .WithIndex ("idx_p" ),
246+ queries .WithQueryFilters (
247+ queries.QueryFilter {
248+ Field : "parent_operation_id" ,
249+ Values : []table_types.Value {table_types .StringValueFromString (tbwr .ID )},
250+ },
251+ ),
252+ queries .WithOrderBy (
253+ queries.OrderSpec {
254+ Field : "created_at" ,
255+ },
256+ ),
257+ ),
258+ )
234259
235260 var lastTbOp * types.TakeBackupOperation
236261 if len (ops ) > 0 {
@@ -245,6 +270,7 @@ func TBWROperationHandler(
245270 case types .OperationStateRunning :
246271 {
247272 do , err := MakeRetryDecision (ctx , tbwr , lastTbOp , clock )
273+ reportRetry := do == RunNewTb
248274 if err != nil {
249275 xlog .Error (ctx , "RetryDecision failed" , zap .Error (err ))
250276 tbwr .State = types .OperationStateError
@@ -253,9 +279,14 @@ func TBWROperationHandler(
253279 tbwr .UpdatedAt = timestamppb .New (now )
254280 tbwr .Audit .CompletedAt = timestamppb .New (now )
255281
256- errup := db .ExecuteUpsert (ctx , queryBuilderFactory ().WithUpdateOperation (tbwr ))
257- if errup != nil {
258- return errup
282+ upsertError := withBackupStateAudit (
283+ ctx , tbwr , reportRetry , db .ExecuteUpsert (
284+ ctx ,
285+ queryBuilderFactory ().WithUpdateOperation (tbwr ),
286+ ),
287+ )
288+ if upsertError != nil {
289+ return upsertError
259290 }
260291 return err
261292 }
@@ -279,14 +310,18 @@ func TBWROperationHandler(
279310 now := clock .Now ()
280311 tbwr .UpdatedAt = timestamppb .New (now )
281312 tbwr .Audit .CompletedAt = timestamppb .New (now )
282- return db .ExecuteUpsert (ctx , queryBuilderFactory ().WithUpdateOperation (tbwr ))
313+ return withBackupStateAudit (
314+ ctx , tbwr , reportRetry , db .ExecuteUpsert (ctx , queryBuilderFactory ().WithUpdateOperation (tbwr )),
315+ )
283316 }
284317 case Skip :
285318 return nil
286319 case Error :
287320 {
288321 setErrorToRetryOperation (ctx , tbwr , ops , clock )
289- return db .ExecuteUpsert (ctx , queryBuilderFactory ().WithUpdateOperation (tbwr ))
322+ return withBackupStateAudit (
323+ ctx , tbwr , reportRetry , db .ExecuteUpsert (ctx , queryBuilderFactory ().WithUpdateOperation (tbwr )),
324+ )
290325 }
291326 case RunNewTb :
292327 {
@@ -322,14 +357,29 @@ func TBWROperationHandler(
322357 now := clock .Now ()
323358 tbwr .UpdatedAt = timestamppb .New (now )
324359 tbwr .Audit .CompletedAt = timestamppb .New (now )
325- return db .ExecuteUpsert (ctx , queryBuilderFactory ().WithCreateBackup (* backup ).WithCreateOperation (tb ).WithUpdateOperation (tbwr ))
360+ return withBackupStateAudit (
361+ ctx , tbwr , reportRetry , db .ExecuteUpsert (
362+ ctx ,
363+ queryBuilderFactory ().WithCreateBackup (* backup ).WithCreateOperation (tb ).WithUpdateOperation (tbwr ),
364+ ),
365+ )
326366 } else {
327367 //increment retries
328- return db .ExecuteUpsert (ctx , queryBuilderFactory ().WithUpdateOperation (tbwr ))
368+ return withBackupStateAudit (
369+ ctx , tbwr , reportRetry , db .ExecuteUpsert (
370+ ctx ,
371+ queryBuilderFactory ().WithUpdateOperation (tbwr ),
372+ ),
373+ )
329374 }
330375 } else {
331376 xlog .Debug (ctx , "running new TB" , zap .String ("TBOperationID" , tb .ID ))
332- return db .ExecuteUpsert (ctx , queryBuilderFactory ().WithCreateBackup (* backup ).WithCreateOperation (tb ).WithUpdateOperation (tbwr ))
377+ return withBackupStateAudit (
378+ ctx , tbwr , reportRetry , db .ExecuteUpsert (
379+ ctx ,
380+ queryBuilderFactory ().WithCreateBackup (* backup ).WithCreateOperation (tb ).WithUpdateOperation (tbwr ),
381+ ),
382+ )
333383 }
334384 }
335385 default :
@@ -339,7 +389,12 @@ func TBWROperationHandler(
339389 tbwr .UpdatedAt = timestamppb .New (now )
340390 tbwr .Audit .CompletedAt = timestamppb .New (now )
341391
342- _ = db .ExecuteUpsert (ctx , queryBuilderFactory ().WithUpdateOperation (tbwr ))
392+ upsertError := withBackupStateAudit (
393+ ctx , tbwr , reportRetry , db .ExecuteUpsert (ctx , queryBuilderFactory ().WithUpdateOperation (tbwr )),
394+ )
395+ if upsertError != nil {
396+ return upsertError
397+ }
343398 return errors .New (tbwr .Message )
344399 }
345400 }
@@ -355,14 +410,18 @@ func TBWROperationHandler(
355410 now := clock .Now ()
356411 tbwr .UpdatedAt = timestamppb .New (now )
357412 tbwr .Audit .CompletedAt = timestamppb .New (now )
358- return db .ExecuteUpsert (ctx , queryBuilderFactory ().WithUpdateOperation (tbwr ))
413+ return withBackupStateAudit (
414+ ctx , tbwr , false , db .ExecuteUpsert (ctx , queryBuilderFactory ().WithUpdateOperation (tbwr )),
415+ )
359416 } else {
360417 if lastTbOp .State == types .OperationStatePending || lastTbOp .State == types .OperationStateRunning {
361418 xlog .Info (ctx , "cancelling TB operation" , zap .String ("TBOperationID" , lastTbOp .ID ))
362419 lastTbOp .State = types .OperationStateStartCancelling
363420 lastTbOp .Message = "Cancelling by parent operation"
364421 lastTbOp .UpdatedAt = timestamppb .New (clock .Now ())
365- return db .ExecuteUpsert (ctx , queryBuilderFactory ().WithUpdateOperation (lastTbOp ))
422+ return withBackupStateAudit (
423+ ctx , tbwr , false , db .ExecuteUpsert (ctx , queryBuilderFactory ().WithUpdateOperation (lastTbOp )),
424+ )
366425 }
367426 }
368427 }
@@ -373,7 +432,12 @@ func TBWROperationHandler(
373432 now := clock .Now ()
374433 tbwr .UpdatedAt = timestamppb .New (now )
375434 tbwr .Audit .CompletedAt = timestamppb .New (now )
376- _ = db .ExecuteUpsert (ctx , queryBuilderFactory ().WithUpdateOperation (tbwr ))
435+ upsertError := withBackupStateAudit (
436+ ctx , tbwr , false , db .ExecuteUpsert (ctx , queryBuilderFactory ().WithUpdateOperation (tbwr )),
437+ )
438+ if upsertError != nil {
439+ return upsertError
440+ }
377441 return errors .New (tbwr .Message )
378442 }
379443 }
0 commit comments