@@ -41,7 +41,9 @@ func OpenYdb() *ydb.Driver {
4141 ydb .WithBalancer (balancers .SingleConn ()),
4242 ydb .WithAnonymousCredentials (),
4343 }
44- driver , err := ydb .Open (context .Background (), databaseEndpoint + "/" + databaseName , opts ... )
44+ ctx , cancel := context .WithTimeout (context .Background (), time .Second * 5 )
45+ driver , err := ydb .Open (ctx , databaseEndpoint + "/" + databaseName , opts ... )
46+ cancel ()
4547 if err != nil {
4648 log .Panicf ("failed to open database: %v" , err )
4749 }
5961("%s", "TBWR", "%s", "%s", "%s", CurrentUTCTimestamp(), "RUNNING", 0, 3)
6062` , opID , containerID , databaseName , invalidDatabaseEndpoint ,
6163 )
64+ ctx , cancel := context .WithTimeout (context .Background (), time .Second * 5 )
6265 err := driver .Table ().Do (
63- context . Background () , func (ctx context.Context , s table.Session ) error {
66+ ctx , func (ctx context.Context , s table.Session ) error {
6467 _ , res , err := s .Execute (
6568 ctx ,
6669 table .TxControl (
9194 log .Panicf ("failed to initialize YDBCP db: %v" , err )
9295 }
9396 op , err := opClient .GetOperation (
94- context . Background () , & pb.GetOperationRequest {
97+ ctx , & pb.GetOperationRequest {
9598 Id : opID ,
9699 },
97100 )
@@ -101,10 +104,12 @@ VALUES
101104 if op .GetType () != types .OperationTypeTBWR .String () {
102105 log .Panicf ("unexpected operation type: %v" , op .GetType ())
103106 }
107+ cancel ()
104108 time .Sleep (time .Second * 10 ) // to wait for four operation handlers
109+ ctx , cancel = context .WithTimeout (context .Background (), time .Second * 5 )
105110
106111 backups , err := client .ListBackups (
107- context . Background () , & pb.ListBackupsRequest {
112+ ctx , & pb.ListBackupsRequest {
108113 ContainerId : containerID ,
109114 DatabaseNameMask : "%" ,
110115 },
@@ -116,7 +121,7 @@ VALUES
116121 log .Panicf ("expected no backups by this time, got %v" , backups .Backups )
117122 }
118123 ops , err := opClient .ListOperations (
119- context . Background () , & pb.ListOperationsRequest {
124+ ctx , & pb.ListOperationsRequest {
120125 ContainerId : containerID ,
121126 DatabaseNameMask : databaseName ,
122127 OperationTypes : []string {types .OperationTypeTB .String ()},
@@ -142,6 +147,48 @@ VALUES
142147 if tbwr .Message != "retry attempts exceeded limit: 3." {
143148 log .Panicf ("unexpected operation message: %v" , tbwr .Message )
144149 }
150+ cancel ()
151+ }
152+
153+ func ResetNextLaunch (id string ) {
154+ driver := OpenYdb ()
155+ resetNextLaunchQuery := fmt .Sprintf (
156+ `UPDATE BackupSchedules SET next_launch = CurrentUTCTimestamp() WHERE id = '%s'` , id ,
157+ )
158+ log .Println (resetNextLaunchQuery )
159+ ctx , cancel := context .WithTimeout (context .Background (), time .Second * 5 )
160+ err := driver .Table ().Do (
161+ ctx , func (ctx context.Context , s table.Session ) error {
162+ _ , res , err := s .Execute (
163+ ctx ,
164+ table .TxControl (
165+ table .BeginTx (
166+ table .WithSerializableReadWrite (),
167+ ),
168+ table .CommitTx (),
169+ ),
170+ resetNextLaunchQuery ,
171+ nil ,
172+ )
173+ if err != nil {
174+ return err
175+ }
176+ defer func (res result.Result ) {
177+ err = res .Close ()
178+ if err != nil {
179+ xlog .Error (ctx , "Error closing transaction result" )
180+ }
181+ }(res ) // result must be closed
182+ if res .ResultSetCount () != 0 {
183+ return errors .New ("expected 0 result set" )
184+ }
185+ return res .Err ()
186+ },
187+ )
188+ cancel ()
189+ if err != nil {
190+ log .Panicf ("YDB fail: %v" , err )
191+ }
145192}
146193
147194type RawEvent struct {
@@ -189,22 +236,22 @@ func (e *AuditCaptureEvent) Matches(line string) bool {
189236 if err != nil {
190237 return false
191238 }
192- if p . MethodName != e .event .MethodName {
239+ if e . event . MethodName != "" && e .event . MethodName != p .MethodName {
193240 log .Printf ("MethodName mismatch: expected %s, got %s" , e .event .MethodName , p .MethodName )
194241 return false
195242 }
196243
197- if p . ContainerID != e .event .ContainerID {
244+ if e . event . ContainerID != "" && e .event . ContainerID != p .ContainerID {
198245 log .Printf ("ContainerID mismatch: expected %s, got %s" , e .event .ContainerID , p .ContainerID )
199246 return false
200247 }
201248
202- if p . Component != e .event .Component {
249+ if e . event . Component != "" && e .event . Component != p .Component {
203250 log .Printf ("Component mismatch: expected %s, got %s" , e .event .Component , p .Component )
204251 return false
205252 }
206253
207- if p . Action != e .event .Action {
254+ if e . event . Action != "" && e .event . Action != p .Action {
208255 log .Printf ("Action mismatch: expected %s, got %s" , e .event .Action , p .Action )
209256 return false
210257 }
@@ -214,12 +261,12 @@ func (e *AuditCaptureEvent) Matches(line string) bool {
214261 return false
215262 }
216263
217- if p . Subject != e .event .Subject {
264+ if e . event . Subject != "" && e .event . Subject != p .Subject {
218265 log .Printf ("Subject mismatch: expected %s, got %s" , e .event .Subject , p .Subject )
219266 return false
220267 }
221268
222- if p . Status != e .event .Status {
269+ if e . event . Status != "" && e .event . Status != p .Status {
223270 log .Printf ("Status mismatch: expected %s, got %s" , e .event .Status , p .Status )
224271 return false
225272 }
@@ -362,18 +409,30 @@ func main() {
362409 },
363410 {
364411 event : RawEvent {
365- Action : "ActionUpdate" ,
366- Component : "backup_service" ,
367- Status : "NEW" ,
368- Database : databaseName ,
412+ Action : "ActionUpdate" ,
413+ Component : "backup_service" ,
414+ Status : "NEW" ,
415+ ContainerID : containerID ,
416+ Database : databaseName ,
369417 },
370418 },
371419 {
372420 event : RawEvent {
373- Action : "ActionUpdate" ,
374- Component : "backup_service" ,
375- Status : "DONE" ,
376- Database : databaseName ,
421+ Action : "ActionUpdate" ,
422+ Component : "backup_service" ,
423+ Status : "DONE" ,
424+ ContainerID : containerID ,
425+ Database : databaseName ,
426+ },
427+ },
428+ {
429+ event : RawEvent {
430+ Action : "ActionUpdate" ,
431+ Component : "backup_service" ,
432+ Status : "ERROR" ,
433+ Reason : "Recovery point objective failed for schedule" ,
434+ ContainerID : containerID ,
435+ Database : databaseName ,
377436 },
378437 },
379438 },
@@ -634,14 +693,15 @@ func main() {
634693 ScheduleName : "schedule" ,
635694 ScheduleSettings : & pb.BackupScheduleSettings {
636695 SchedulePattern : & pb.BackupSchedulePattern {Crontab : "* * * * *" },
637- RecoveryPointObjective : durationpb .New (time .Hour ),
696+ RecoveryPointObjective : durationpb .New (time .Second ),
638697 Ttl : durationpb .New (time .Hour ),
639698 },
640699 },
641700 )
642701 if err != nil {
643702 log .Panicf ("failed to create backup schedule: %v" , err )
644703 }
704+ ResetNextLaunch (schedule .Id )
645705
646706 // local config has schedules_limit_per_db = 1, so we should not be able to create another schedule for this db
647707 _ , err = scheduleClient .CreateBackupSchedule (
@@ -684,6 +744,9 @@ func main() {
684744 log .Panicf ("schedule and listed schedule ids does not match: %s, %s" , schedules .Schedules [0 ].Id , schedule .Id )
685745 }
686746
747+ //wait for schedule handler
748+ time .Sleep (time .Second * 3 )
749+
687750 newScheduleName := "schedule-2.0"
688751 newSourcePath := "/kv_test"
689752 newSchedule , err := scheduleClient .UpdateBackupSchedule (
@@ -712,7 +775,7 @@ func main() {
712775 if newSchedule .ScheduleSettings .Ttl .AsDuration () != time .Hour {
713776 log .Panicf ("wrong ttl after update: %v" , newSchedule .ScheduleSettings .Ttl )
714777 }
715- if newSchedule .ScheduleSettings .RecoveryPointObjective .AsDuration () != time .Hour {
778+ if newSchedule .ScheduleSettings .RecoveryPointObjective .AsDuration () != time .Second {
716779 log .Panicf ("wrong rpo after update: %v" , newSchedule .ScheduleSettings .RecoveryPointObjective )
717780 }
718781
0 commit comments