Skip to content

Commit 82a2a8d

Browse files
committed
allow backups of empty databases
1 parent 90afa3a commit 82a2a8d

File tree

7 files changed

+207
-78
lines changed

7 files changed

+207
-78
lines changed

internal/backup_operations/make_backup.go

Lines changed: 62 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,67 @@ func ErrToStatus(err error) error {
181181
return err
182182
}
183183

184+
func CreateS3DestinationPrefix(databaseName string, s3 config.S3Config, clock clockwork.Clock) string {
185+
dbNamePath := strings.Replace(databaseName, "/", "_", -1) // TODO: checking user input
186+
dbNamePath = strings.Trim(dbNamePath, "_")
187+
return path.Join(
188+
s3.PathPrefix,
189+
dbNamePath,
190+
clock.Now().Format(types.BackupTimestampFormat),
191+
)
192+
}
193+
194+
func CreateEmptyBackup(
195+
req MakeBackupInternalRequest,
196+
clock clockwork.Clock,
197+
) (*types.Backup, *types.TakeBackupOperation) {
198+
var expireAt *time.Time
199+
if req.Ttl != nil {
200+
expireAt = new(time.Time)
201+
*expireAt = clock.Now().Add(*req.Ttl)
202+
}
203+
204+
now := timestamppb.New(clock.Now())
205+
backup := &types.Backup{
206+
ID: types.GenerateObjectID(),
207+
ContainerID: req.ContainerID,
208+
DatabaseName: req.DatabaseName,
209+
DatabaseEndpoint: req.DatabaseEndpoint,
210+
Status: types.BackupStateAvailable,
211+
AuditInfo: &pb.AuditInfo{
212+
CreatedAt: now,
213+
CompletedAt: now,
214+
Creator: types.OperationCreatorName,
215+
},
216+
ScheduleID: req.ScheduleID,
217+
ExpireAt: expireAt,
218+
SourcePaths: []string{},
219+
}
220+
221+
op := &types.TakeBackupOperation{
222+
ID: types.GenerateObjectID(),
223+
BackupID: backup.ID,
224+
ContainerID: req.ContainerID,
225+
State: types.OperationStateDone,
226+
YdbConnectionParams: types.YdbConnectionParams{
227+
Endpoint: req.DatabaseEndpoint,
228+
DatabaseName: req.DatabaseName,
229+
},
230+
SourcePaths: req.SourcePaths,
231+
SourcePathsToExclude: req.SourcePathsToExclude,
232+
Audit: &pb.AuditInfo{
233+
CreatedAt: now,
234+
CompletedAt: now,
235+
Creator: types.OperationCreatorName,
236+
},
237+
YdbOperationId: "",
238+
UpdatedAt: now,
239+
ParentOperationID: req.ParentOperationID,
240+
}
241+
242+
return backup, op
243+
}
244+
184245
func MakeBackup(
185246
ctx context.Context,
186247
clientConn client.ClientConnector,
@@ -234,14 +295,7 @@ func MakeBackup(
234295
return nil, nil, status.Error(codes.Internal, "can't get S3SecretKey")
235296
}
236297

237-
dbNamePath := strings.Replace(req.DatabaseName, "/", "_", -1) // TODO: checking user input
238-
dbNamePath = strings.Trim(dbNamePath, "_")
239-
240-
destinationPrefix := path.Join(
241-
s3.PathPrefix,
242-
dbNamePath,
243-
clock.Now().Format(types.BackupTimestampFormat),
244-
)
298+
destinationPrefix := CreateS3DestinationPrefix(req.DatabaseName, s3, clock)
245299
ctx = xlog.With(ctx, zap.String("S3DestinationPrefix", destinationPrefix))
246300

247301
pathsForExport, err := ValidateSourcePaths(ctx, req, clientConn, client, dsn)

internal/connectors/client/mock.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"path"
7+
"strings"
78

89
"ydbcp/internal/types"
910

@@ -20,16 +21,18 @@ type ObjectPath struct {
2021
}
2122

2223
type MockClientConnector struct {
23-
storage map[ObjectPath]bool
24-
operations map[string]*Ydb_Operations.Operation
24+
storage map[ObjectPath]bool
25+
emptyDatabases map[string]bool
26+
operations map[string]*Ydb_Operations.Operation
2527
}
2628

2729
type Option func(*MockClientConnector)
2830

2931
func NewMockClientConnector(options ...Option) *MockClientConnector {
3032
connector := &MockClientConnector{
31-
storage: make(map[ObjectPath]bool),
32-
operations: make(map[string]*Ydb_Operations.Operation),
33+
storage: make(map[ObjectPath]bool),
34+
emptyDatabases: make(map[string]bool),
35+
operations: make(map[string]*Ydb_Operations.Operation),
3336
}
3437
for _, opt := range options {
3538
opt(connector)
@@ -44,6 +47,14 @@ func WithOperations(operations map[string]*Ydb_Operations.Operation) Option {
4447
}
4548
}
4649

50+
func WithEmptyDatabases(databases ...string) Option {
51+
return func(c *MockClientConnector) {
52+
for _, database := range databases {
53+
c.emptyDatabases[database] = true
54+
}
55+
}
56+
}
57+
4758
func (m *MockClientConnector) Open(_ context.Context, _ string) (*ydb.Driver, error) {
4859
return nil, nil
4960
}
@@ -55,6 +66,14 @@ func (m *MockClientConnector) Close(_ context.Context, _ *ydb.Driver) error {
5566
func (m *MockClientConnector) PreparePathsForExport(
5667
_ context.Context, _ *ydb.Driver, sourcePaths []string, _ []string,
5768
) ([]string, error) {
69+
if sourcePaths == nil || len(sourcePaths) == 0 {
70+
return []string{}, nil
71+
}
72+
for database := range m.emptyDatabases {
73+
if strings.Contains(sourcePaths[0], database) {
74+
return []string{}, nil
75+
}
76+
}
5877
return sourcePaths, nil
5978
}
6079

internal/handlers/take_backup_retry.go

Lines changed: 37 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,6 @@ func setErrorToRetryOperation(
159159
tbwr *types.TakeBackupWithRetryOperation,
160160
ops []types.Operation,
161161
clock clockwork.Clock,
162-
isEmptyDb bool,
163162
) {
164163
operationIDs := strings.Join(func() []string {
165164
var ids []string
@@ -176,29 +175,26 @@ func setErrorToRetryOperation(
176175
zap.Int("RetriesCount", len(ops)),
177176
}
178177

179-
if isEmptyDb {
180-
tbwr.Message = "empty database"
181-
} else {
182-
if tbwr.RetryConfig != nil {
183-
switch tbwr.RetryConfig.Retries.(type) {
184-
case *pb.RetryConfig_Count:
185-
{
186-
tbwr.Message = fmt.Sprintf("retry attempts exceeded limit: %d.", tbwr.Retries)
187-
}
188-
case *pb.RetryConfig_MaxBackoff:
189-
{
190-
tbwr.Message = fmt.Sprintf("retry attempts exceeded backoff duration.")
191-
}
178+
if tbwr.RetryConfig != nil {
179+
switch tbwr.RetryConfig.Retries.(type) {
180+
case *pb.RetryConfig_Count:
181+
{
182+
tbwr.Message = fmt.Sprintf("retry attempts exceeded limit: %d.", tbwr.Retries)
183+
}
184+
case *pb.RetryConfig_MaxBackoff:
185+
{
186+
tbwr.Message = fmt.Sprintf("retry attempts exceeded backoff duration.")
192187
}
193-
} else {
194-
tbwr.Message = fmt.Sprint("retry attempts exceeded limit: 1.")
195188
}
189+
} else {
190+
tbwr.Message = fmt.Sprint("retry attempts exceeded limit: 1.")
191+
}
196192

197-
if len(ops) > 0 {
198-
tbwr.Message = tbwr.Message + fmt.Sprintf(" Launched operations %s", operationIDs)
199-
fields = append(fields, zap.String("OperationIDs", operationIDs))
200-
}
193+
if len(ops) > 0 {
194+
tbwr.Message = tbwr.Message + fmt.Sprintf(" Launched operations %s", operationIDs)
195+
fields = append(fields, zap.String("OperationIDs", operationIDs))
201196
}
197+
202198
xlog.Error(ctx, tbwr.Message, fields...)
203199
}
204200

@@ -287,7 +283,7 @@ func TBWROperationHandler(
287283
return nil
288284
case Error:
289285
{
290-
setErrorToRetryOperation(ctx, tbwr, ops, clock, false)
286+
setErrorToRetryOperation(ctx, tbwr, ops, clock)
291287
return db.ExecuteUpsert(ctx, queryBuilderFactory().WithUpdateOperation(tbwr))
292288
}
293289
case RunNewTb:
@@ -302,22 +298,35 @@ func TBWROperationHandler(
302298
types.OperationCreatorName,
303299
clock,
304300
)
301+
302+
tbwr.IncRetries()
303+
305304
if err != nil {
306305
var empty *backup_operations.EmptyDatabaseError
307306

308307
if errors.As(err, &empty) {
309-
setErrorToRetryOperation(ctx, tbwr, ops, clock, true)
310-
metrics.GlobalMetricsRegistry.ReportEmptyDatabase(tbwr)
311-
return db.ExecuteUpsert(ctx, queryBuilderFactory().WithUpdateOperation(tbwr))
308+
backup, tb = backup_operations.CreateEmptyBackup(
309+
backup_operations.FromTBWROperation(tbwr),
310+
clock,
311+
)
312+
xlog.Debug(
313+
ctx,
314+
"Created empty backup instance for empty db",
315+
zap.String("BackupID", backup.ID),
316+
zap.String("TBOperationID", tb.ID),
317+
)
318+
tbwr.State = types.OperationStateDone
319+
tbwr.Message = "Success"
320+
now := clock.Now()
321+
tbwr.UpdatedAt = timestamppb.New(now)
322+
tbwr.Audit.CompletedAt = timestamppb.New(now)
323+
return db.ExecuteUpsert(ctx, queryBuilderFactory().WithCreateBackup(*backup).WithCreateOperation(tb).WithUpdateOperation(tbwr))
312324
} else {
313-
tbwr.IncRetries()
314-
metrics.GlobalMetricsRegistry.ResetEmptyDatabase(tbwr)
325+
//increment retries
315326
return db.ExecuteUpsert(ctx, queryBuilderFactory().WithUpdateOperation(tbwr))
316327
}
317328
} else {
318-
metrics.GlobalMetricsRegistry.ResetEmptyDatabase(tbwr)
319329
xlog.Debug(ctx, "running new TB", zap.String("TBOperationID", tb.ID))
320-
tbwr.IncRetries()
321330
return db.ExecuteUpsert(ctx, queryBuilderFactory().WithCreateBackup(*backup).WithCreateOperation(tb).WithUpdateOperation(tbwr))
322331
}
323332
}

internal/handlers/take_backup_retry_test.go

Lines changed: 78 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -628,9 +628,84 @@ func TestTBWRHandlerAlwaysRunOnce(t *testing.T) {
628628
assert.NotNil(t, tb)
629629
assert.Equal(t, types.OperationStateRunning, tb.State)
630630
assert.Equal(t, t1, tb.Audit.CreatedAt)
631-
val, ok := metrics.GetMetrics()["empty_database"]
632-
assert.True(t, ok) // to show that it has been reset
633-
assert.Equal(t, float64(0), val) //to show it has been reset to 0
631+
}
632+
633+
func TestTBWRHandlerEmptyDatabase(t *testing.T) {
634+
clock := clockwork.NewFakeClockAt(t1.AsTime())
635+
metrics.InitializeMockMetricsRegistry(metrics.WithClock(clock))
636+
ctx := context.Background()
637+
tbwrID := types.GenerateObjectID()
638+
tbwr := types.TakeBackupWithRetryOperation{
639+
TakeBackupOperation: types.TakeBackupOperation{
640+
ID: tbwrID,
641+
ContainerID: "abcde",
642+
State: types.OperationStateRunning,
643+
Message: "",
644+
SourcePaths: []string{"path"},
645+
YdbConnectionParams: types.YdbConnectionParams{
646+
Endpoint: "i.valid.com",
647+
DatabaseName: "/mydb",
648+
},
649+
Audit: &pb.AuditInfo{
650+
CreatedAt: t1,
651+
},
652+
},
653+
RetryConfig: nil,
654+
}
655+
656+
ops := []types.Operation{
657+
&tbwr,
658+
}
659+
660+
dbConnector := db.NewMockDBConnector(
661+
db.WithOperations(toMap(ops...)),
662+
)
663+
dbConnector.SetOperationsIDSelector([]string{})
664+
665+
clientConnector := client.NewMockClientConnector(
666+
client.WithEmptyDatabases("/mydb"),
667+
)
668+
669+
handler := NewTBWROperationHandler(
670+
dbConnector,
671+
clientConnector,
672+
config.S3Config{
673+
IsMock: true,
674+
},
675+
config.ClientConnectionConfig{
676+
AllowedEndpointDomains: []string{".valid.com"},
677+
AllowInsecureEndpoint: true,
678+
},
679+
queries.NewWriteTableQueryMock,
680+
clock,
681+
)
682+
err := handler(ctx, &tbwr)
683+
assert.Empty(t, err)
684+
685+
op, err := dbConnector.GetOperation(ctx, tbwrID)
686+
assert.Empty(t, err)
687+
assert.NotEmpty(t, op)
688+
assert.Equal(t, types.OperationStateDone, op.GetState())
689+
operations, err := dbConnector.SelectOperations(ctx, queries.NewReadTableQuery())
690+
assert.Empty(t, err)
691+
assert.Equal(t, 2, len(operations))
692+
tbwr = *op.(*types.TakeBackupWithRetryOperation)
693+
var tb *types.TakeBackupOperation
694+
for _, op = range operations {
695+
if op.GetType() == types.OperationTypeTB {
696+
tb = op.(*types.TakeBackupOperation)
697+
break
698+
}
699+
}
700+
assert.NotNil(t, tb)
701+
assert.Equal(t, types.OperationStateDone, tb.State)
702+
assert.Equal(t, t1, tb.Audit.CreatedAt)
703+
backups, err := dbConnector.SelectBackups(ctx, queries.NewReadTableQuery())
704+
assert.Empty(t, err)
705+
assert.Equal(t, 1, len(backups))
706+
assert.Equal(t, types.BackupStateAvailable, backups[0].Status)
707+
assert.Equal(t, tbwr.ScheduleID, backups[0].ScheduleID)
708+
assert.Equal(t, tbwr.ID, *tb.ParentOperationID)
634709

635710
}
636711

internal/metrics/metrics.go

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,6 @@ type MetricsRegistry interface {
4040
IncSuccessfulHandlerRunsCount(containerId string, operationType string)
4141
IncCompletedBackupsCount(containerId string, database string, scheduleId *string, code Ydb.StatusIds_StatusCode)
4242
IncScheduleCounters(schedule *types.BackupSchedule, err error)
43-
ReportEmptyDatabase(operation *types.TakeBackupWithRetryOperation)
44-
ResetEmptyDatabase(operation *types.TakeBackupWithRetryOperation)
4543
}
4644

4745
type MetricsRegistryImpl struct {
@@ -77,7 +75,6 @@ type MetricsRegistryImpl struct {
7775
backupsSucceededCount *prometheus.GaugeVec
7876

7977
// schedule metrics
80-
emptyDatabaseGauge *prometheus.GaugeVec
8178
scheduleActionFailedCount *prometheus.CounterVec
8279
scheduleActionSucceededCount *prometheus.CounterVec
8380
scheduleLastBackupTimestamp *prometheus.GaugeVec
@@ -228,18 +225,6 @@ func (s *MetricsRegistryImpl) IncScheduleCounters(schedule *types.BackupSchedule
228225
}
229226
}
230227

231-
func (s *MetricsRegistryImpl) ReportEmptyDatabase(operation *types.TakeBackupWithRetryOperation) {
232-
if operation.ScheduleID != nil {
233-
s.emptyDatabaseGauge.WithLabelValues(operation.ContainerID, operation.GetDatabaseName(), *operation.ScheduleID).Set(float64(1))
234-
}
235-
}
236-
237-
func (s *MetricsRegistryImpl) ResetEmptyDatabase(operation *types.TakeBackupWithRetryOperation) {
238-
if operation.ScheduleID != nil {
239-
s.emptyDatabaseGauge.WithLabelValues(operation.ContainerID, operation.GetDatabaseName(), *operation.ScheduleID).Set(float64(0))
240-
}
241-
}
242-
243228
func InitializeMetricsRegistry(ctx context.Context, wg *sync.WaitGroup, cfg *config.MetricsServerConfig, clock clockwork.Clock) {
244229
GlobalMetricsRegistry = newMetricsRegistry(ctx, wg, cfg, clock)
245230
}
@@ -343,12 +328,6 @@ func newMetricsRegistry(ctx context.Context, wg *sync.WaitGroup, cfg *config.Met
343328
Help: "Total count of failed scheduled backup runs",
344329
}, []string{"container_id", "database", "schedule_id"})
345330

346-
s.emptyDatabaseGauge = promauto.With(s.reg).NewGaugeVec(prometheus.GaugeOpts{
347-
Subsystem: "schedules",
348-
Name: "empty_database",
349-
Help: "Gauge of whether database is empty",
350-
}, []string{"container_id", "database", "schedule_id"})
351-
352331
s.scheduleActionSucceededCount = promauto.With(s.reg).NewCounterVec(prometheus.CounterOpts{
353332
Subsystem: "schedules",
354333
Name: "succeeded_count",

0 commit comments

Comments
 (0)