diff --git a/apis/kubedb/constants.go b/apis/kubedb/constants.go index 8d21616122..6a75e561b0 100644 --- a/apis/kubedb/constants.go +++ b/apis/kubedb/constants.go @@ -62,6 +62,7 @@ const ( DistributedDBReplicaENV = "DB_REPLICAS" DistributedMaxVolumeUsed = "max_used" DistributedVolumeCapacity = "capacity" + KubesliceContainerExcludeLabel = "kubeslice.io/exclude" KubeSliceNSMIPKey = "kubeslice.io/nsmIP" KubeSlicePodIPVolumeName = "podip" @@ -398,11 +399,37 @@ const ( DatabasePodMasterComponent = "Master" DatabasePodSlaveComponent = "Slave" - MariaDBDistributedUpgradeCommand = "mariadb-upgrade" - MariaDBDistributedPodMetricGetCommand = "get-pod-metrics" - MariaDBDistributedPodGetCommand = "get-pod" - MariaDBDistributedVolumeUsageGetCommand = "get-volume-usage" - MariaDBDistributedVolumeCapacityGetCommand = "get-volume-capacity" + MariaDBDistributedUpgradeCommand = "mariadb-upgrade" + MariaDBDistributedPodMetricGetCommand = "get-pod-metrics" + MariaDBDistributedPodGetCommand = "get-pod" + MariaDBDistributedVolumeUsageGetCommand = "get-volume-usage" + MariaDBDistributedVolumeCapacityGetCommand = "get-volume-capacity" + MariaDBDistributedBackupCommand = "take-backup" + MariaDBDistributedRestoreCommand = "restore-backup" + MariaDBDistributedRecoveryFileCreateCommand = "create-recovery-done-file" + MariaDBArchiverPVCRestorerSuffix = "pvc-restorer" + MariaDBBinlogRestoreSidekickSuffix = "binlog-restorer" + MariaDBBinlogRestoreServiceSuffix = "binlog-restore" + MariaDBXtraBackupInfoFile = "/var/lib/mysql/mariadb_backup_binlog_info" + MariaDBBackupInfoFile = "/var/lib/mysql/xtrabackup_binlog_info" + MariaDBArchiverRestoreRecoveryFileName = "/tmp/recovery.done" + MariaDBArchiverBackupJobSelector = GroupName + "/archiver-job-name" + MariaDBSidekickNameLabelKey = GroupName + "/sidekick-name" + MariaDBArchiverBaseBackupRestic = "Restic" + MariaDBArchiverBaseBackupVolumeSnapshooter = "VolumeSnapshotter" + + // Distributed Archiver + BackupsessionAnnotation = "kubestash.com/backupsession" + RestoresessionAnnotation = "kubestash.com/restoresession" + BackupconfigurationAnnotation = "kubestash.com/backupconfiguration" + DistributedSnapshotinfoAnnotation = "kubestash.com/distributedsnapshotinfo" + SnapshotsKey = "snapshots" + RestoreSessionKey = "restoresession" + DistributedArchiverBackupCMNameSuffix = "backup" + DistributedArchiverRestoreCMNameSuffix = "restore" + DistributedArchiverSnapshotCMNameSuffix = "snapshots" + MariaDBKubestashBackupContainerName = "physical-backup-1" + MariaDBKubestashRestoreContainerName = "distributed-physical-backup-restore-0" // Maxscale MaxscaleCommonName = "mx" diff --git a/apis/kubedb/v1/mariadb_helpers.go b/apis/kubedb/v1/mariadb_helpers.go index 114999990e..2c0083f32c 100644 --- a/apis/kubedb/v1/mariadb_helpers.go +++ b/apis/kubedb/v1/mariadb_helpers.go @@ -114,6 +114,18 @@ func (m MariaDB) OffshootMaxscaleLabels() map[string]string { return m.offshootLabels(m.OffshootMaxscaleSelectors(), nil) } +func (m MariaDB) GetPVCRestoreSessionName(ordinal int) string { + return fmt.Sprintf("%s-%s-%v-%s", kubedb.DefaultVolumeClaimTemplateName, m.OffshootName(), ordinal, kubedb.MariaDBArchiverPVCRestorerSuffix) +} + +func (m MariaDB) GetBinlogRestoreSidekickName(ordinal int) string { + return fmt.Sprintf("%s-%s-%d", m.OffshootName(), kubedb.MariaDBBinlogRestoreSidekickSuffix, ordinal) +} + +func (m MariaDB) GetBinlogRestoreServiceName(ordinal int) string { + return fmt.Sprintf("%s-%s-%d", m.OffshootName(), kubedb.MariaDBBinlogRestoreServiceSuffix, ordinal) +} + func (m MariaDB) PodLabels() map[string]string { return m.offshootLabels(m.OffshootSelectors(), m.Spec.PodTemplate.Labels) } diff --git a/pkg/utils/grpc/mariadb/client.go b/pkg/utils/grpc/mariadb/client.go index 211835ef49..8d547035f2 100644 --- a/pkg/utils/grpc/mariadb/client.go +++ b/pkg/utils/grpc/mariadb/client.go @@ -104,6 +104,25 @@ func RunCommand(grpcClient pb.CommandServiceClient, cmd string) ([]byte, error) return resp.Output, nil } +func RunCommandWithPayload(grpcClient pb.CommandServiceClient, cmd string, data []byte) ([]byte, error) { + ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second) + defer cancel() + + req := &pb.CommandRequest{ + Command: cmd, + Data: data, + } + resp, err := grpcClient.ExecuteCommand(ctx, req) + if err != nil { + klog.Infof("failed to execute command: %v", err) + return nil, err + } + if resp.Status != "success" { + return nil, fmt.Errorf("failed to execute command: %s, Output: %s, err: %v", cmd, string(resp.Output), resp.Error) + } + return resp.Output, nil +} + func serverAddress(db *dbapi.MariaDB, podName string) string { return fmt.Sprintf("%s.%s.%s.svc:%v", podName, db.GoverningServiceName(), db.Namespace, port) } diff --git a/pkg/utils/grpc/mariadb/proto/api.proto b/pkg/utils/grpc/mariadb/proto/api.proto index c98874e077..d569a3d1b1 100644 --- a/pkg/utils/grpc/mariadb/proto/api.proto +++ b/pkg/utils/grpc/mariadb/proto/api.proto @@ -1,7 +1,7 @@ syntax="proto3"; package proto; -option go_package = "grpc/protogen"; +option go_package = "pkg/utils/grpc/mariadb/protogen"; // CommandService provides a method to execute commands in the pod. service CommandService { @@ -12,6 +12,7 @@ service CommandService { // CommandRequest defines the command to execute and authentication key. message CommandRequest { string command = 1; // The command to run (e.g., "cat /scripts/seqno"). + bytes data = 2; // backup job. } // CommandResponse contains the command output or error. diff --git a/pkg/utils/grpc/mariadb/protogen/api.pb.go b/pkg/utils/grpc/mariadb/protogen/api.pb.go index e85db83e40..8badbdd741 100644 --- a/pkg/utils/grpc/mariadb/protogen/api.pb.go +++ b/pkg/utils/grpc/mariadb/protogen/api.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.7 +// protoc-gen-go v1.36.10 // protoc v3.21.12 // source: api.proto @@ -26,6 +26,7 @@ const ( type CommandRequest struct { state protoimpl.MessageState `protogen:"open.v1"` Command string `protobuf:"bytes,1,opt,name=command,proto3" json:"command,omitempty"` // The command to run (e.g., "cat /scripts/seqno"). + Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` // backup job. unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -67,6 +68,13 @@ func (x *CommandRequest) GetCommand() string { return "" } +func (x *CommandRequest) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + // CommandResponse contains the command output or error. type CommandResponse struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -132,15 +140,16 @@ var File_api_proto protoreflect.FileDescriptor const file_api_proto_rawDesc = "" + "\n" + - "\tapi.proto\x12\x05proto\"*\n" + + "\tapi.proto\x12\x05proto\">\n" + "\x0eCommandRequest\x12\x18\n" + - "\acommand\x18\x01 \x01(\tR\acommand\"W\n" + + "\acommand\x18\x01 \x01(\tR\acommand\x12\x12\n" + + "\x04data\x18\x02 \x01(\fR\x04data\"W\n" + "\x0fCommandResponse\x12\x16\n" + "\x06status\x18\x01 \x01(\tR\x06status\x12\x16\n" + "\x06output\x18\x02 \x01(\fR\x06output\x12\x14\n" + "\x05error\x18\x03 \x01(\tR\x05error2Q\n" + "\x0eCommandService\x12?\n" + - "\x0eExecuteCommand\x12\x15.proto.CommandRequest\x1a\x16.proto.CommandResponseB\x0fZ\rgrpc/protogenb\x06proto3" + "\x0eExecuteCommand\x12\x15.proto.CommandRequest\x1a\x16.proto.CommandResponseB!Z\x1fpkg/utils/grpc/mariadb/protogenb\x06proto3" var ( file_api_proto_rawDescOnce sync.Once diff --git a/pkg/utils/grpc/mariadb/protogen/api_grpc.pb.go b/pkg/utils/grpc/mariadb/protogen/api_grpc.pb.go index 4ec8085189..fa880d7325 100644 --- a/pkg/utils/grpc/mariadb/protogen/api_grpc.pb.go +++ b/pkg/utils/grpc/mariadb/protogen/api_grpc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.5.1 +// - protoc-gen-go-grpc v1.6.0 // - protoc v3.21.12 // source: api.proto @@ -70,7 +70,7 @@ type CommandServiceServer interface { type UnimplementedCommandServiceServer struct{} func (UnimplementedCommandServiceServer) ExecuteCommand(context.Context, *CommandRequest) (*CommandResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method ExecuteCommand not implemented") + return nil, status.Error(codes.Unimplemented, "method ExecuteCommand not implemented") } func (UnimplementedCommandServiceServer) mustEmbedUnimplementedCommandServiceServer() {} func (UnimplementedCommandServiceServer) testEmbeddedByValue() {} @@ -83,7 +83,7 @@ type UnsafeCommandServiceServer interface { } func RegisterCommandServiceServer(s grpc.ServiceRegistrar, srv CommandServiceServer) { - // If the following call pancis, it indicates UnimplementedCommandServiceServer was + // If the following call panics, it indicates UnimplementedCommandServiceServer was // embedded by pointer and is nil. This will cause panics if an // unimplemented method is ever invoked, so we test this at initialization // time to prevent it from happening at runtime later due to I/O.