From 4b650e91f868c01d583faa5e3d35e9a7a54466f3 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 26 Nov 2025 12:41:02 -0600 Subject: [PATCH] feat(oss): add native Alibaba Cloud OSS storage backend MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add native support for Alibaba Cloud Object Storage Service (OSS) as a replication backend. This addresses issue #861 where Litestream 0.5+ uses AWS SDK Go v2 with chunked encoding, which OSS doesn't support via S3-compatible mode. Features: - Native OSS SDK integration (alibabacloud-oss-go-sdk-v2) - Multipart upload support for large files (>5GB) with configurable part-size and concurrency settings - Timestamp preservation via OSS metadata for point-in-time recovery - Support for both URL and explicit configuration - Integration test support with LITESTREAM_OSS_* environment variables Configuration examples: replicas: - type: oss bucket: my-bucket region: cn-hangzhou replicas: - url: oss://my-bucket.oss-cn-hangzhou.aliyuncs.com/backups Closes #861 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- AGENTS.md | 4 +- CLAUDE.md | 4 +- cmd/litestream/main.go | 68 +++ cmd/litestream/replicate.go | 3 + docs/ARCHITECTURE.md | 3 + go.mod | 2 + go.sum | 2 + internal/testingutil/testingutil.go | 27 ++ oss/replica_client.go | 618 ++++++++++++++++++++++++++++ oss/replica_client_test.go | 372 +++++++++++++++++ 10 files changed, 1100 insertions(+), 3 deletions(-) create mode 100644 oss/replica_client.go create mode 100644 oss/replica_client_test.go diff --git a/AGENTS.md b/AGENTS.md index 4eae325b..6408dd8f 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -23,7 +23,7 @@ Litestream is a **disaster recovery tool for SQLite** that runs as a background - **Multi-level Compaction**: Hierarchical compaction keeps storage efficient (30s → 5m → 1h → snapshots) - **Single Replica Constraint**: Each database is replicated to exactly one remote destination - **Pure Go Build**: Uses `modernc.org/sqlite`, so no CGO dependency for the main binary -- **Optional NATS JetStream Support**: Additional replica backend alongside S3/GCS/ABS/File/SFTP +- **Optional NATS JetStream Support**: Additional replica backend alongside S3/GCS/ABS/OSS/File/SFTP - **Snapshot Compatibility**: Only LTX-based backups are supported—keep legacy v0.3.x binaries to restore old WAL snapshots **Key Design Principles:** @@ -728,7 +728,7 @@ if offset < 0 { ```bash # Test eventual consistency -go test -v ./replica_client_test.go -integration [s3|gcs|abs|sftp] +go test -v ./replica_client_test.go -integration [s3|gcs|abs|oss|sftp] # Test partial reads # (Example) add targeted partial-read tests in your backend package diff --git a/CLAUDE.md b/CLAUDE.md index 23fb48d6..11b4191b 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -86,6 +86,7 @@ CGO_ENABLED=1 go build -tags vfs -o bin/litestream-vfs ./cmd/litestream-vfs # T go test -v ./replica_client_test.go -integration s3 go test -v ./replica_client_test.go -integration gcs go test -v ./replica_client_test.go -integration abs +go test -v ./replica_client_test.go -integration oss go test -v ./replica_client_test.go -integration sftp ``` @@ -112,7 +113,7 @@ pre-commit run --all-files **Replica (`replica.go`)**: Connects a database to replication destinations via ReplicaClient interface. Manages periodic synchronization and maintains replication position. -**ReplicaClient Interface** (`replica_client.go`): Abstraction for different storage backends (S3, GCS, Azure Blob Storage, SFTP, file system, NATS). Each implementation handles snapshot/WAL segment upload and restoration. The `LTXFiles` method includes a `useMetadata` parameter: when true, it fetches accurate timestamps from backend metadata (required for point-in-time restores); when false, it uses fast timestamps for normal operations. During compaction, the system preserves the earliest CreatedAt timestamp from source files to maintain temporal granularity for restoration. +**ReplicaClient Interface** (`replica_client.go`): Abstraction for different storage backends (S3, GCS, Azure Blob Storage, OSS, SFTP, file system, NATS). Each implementation handles snapshot/WAL segment upload and restoration. The `LTXFiles` method includes a `useMetadata` parameter: when true, it fetches accurate timestamps from backend metadata (required for point-in-time restores); when false, it uses fast timestamps for normal operations. During compaction, the system preserves the earliest CreatedAt timestamp from source files to maintain temporal granularity for restoration. **WAL Processing**: The system monitors SQLite WAL files for changes, segments them into LTX format files, and replicates these segments to configured destinations. Uses SQLite checksums for integrity verification. @@ -121,6 +122,7 @@ pre-commit run --all-files - **S3** (`s3/replica_client.go`): AWS S3 and compatible storage - **GCS** (`gs/replica_client.go`): Google Cloud Storage - **ABS** (`abs/replica_client.go`): Azure Blob Storage +- **OSS** (`oss/replica_client.go`): Alibaba Cloud Object Storage Service - **SFTP** (`sftp/replica_client.go`): SSH File Transfer Protocol - **File** (`file/replica_client.go`): Local file system replication - **NATS** (`nats/replica_client.go`): NATS JetStream object storage diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index 0e8890e4..aa749ce1 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -29,6 +29,7 @@ import ( "github.com/benbjohnson/litestream/gs" "github.com/benbjohnson/litestream/internal" "github.com/benbjohnson/litestream/nats" + "github.com/benbjohnson/litestream/oss" "github.com/benbjohnson/litestream/s3" "github.com/benbjohnson/litestream/sftp" "github.com/benbjohnson/litestream/webdav" @@ -1059,6 +1060,10 @@ func NewReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *litestream.Re if r.Client, err = newNATSReplicaClientFromConfig(c, r); err != nil { return nil, err } + case "oss": + if r.Client, err = newOSSReplicaClientFromConfig(c, r); err != nil { + return nil, err + } default: return nil, fmt.Errorf("unknown replica type in config: %q", c.Type) } @@ -1518,6 +1523,69 @@ func newNATSReplicaClientFromConfig(c *ReplicaConfig, _ *litestream.Replica) (_ return client, nil } +// newOSSReplicaClientFromConfig returns a new instance of oss.ReplicaClient built from config. +func newOSSReplicaClientFromConfig(c *ReplicaConfig, _ *litestream.Replica) (_ *oss.ReplicaClient, err error) { + // Ensure URL & constituent parts are not both specified. + if c.URL != "" && c.Path != "" { + return nil, fmt.Errorf("cannot specify url & path for oss replica") + } else if c.URL != "" && c.Bucket != "" { + return nil, fmt.Errorf("cannot specify url & bucket for oss replica") + } + + bucket, configPath := c.Bucket, c.Path + region, endpoint := c.Region, c.Endpoint + + // Apply settings from URL, if specified. + if c.URL != "" { + _, host, upath, err := ParseReplicaURL(c.URL) + if err != nil { + return nil, err + } + + var ( + ubucket string + uregion string + ) + + ubucket, uregion, _ = oss.ParseHost(host) + + // Only apply URL parts to fields that have not been overridden. + if configPath == "" { + configPath = upath + } + if bucket == "" { + bucket = ubucket + } + if region == "" { + region = uregion + } + } + + // Ensure required settings are set. + if bucket == "" { + return nil, fmt.Errorf("bucket required for oss replica") + } + + // Build replica client. + client := oss.NewReplicaClient() + client.AccessKeyID = c.AccessKeyID + client.AccessKeySecret = c.SecretAccessKey + client.Bucket = bucket + client.Path = configPath + client.Region = region + client.Endpoint = endpoint + + // Apply upload configuration if specified. + if c.PartSize != nil { + client.PartSize = int64(*c.PartSize) + } + if c.Concurrency != nil { + client.Concurrency = *c.Concurrency + } + + return client, nil +} + // applyLitestreamEnv copies "LITESTREAM" prefixed environment variables to // their AWS counterparts as the "AWS" prefix can be confusing when using a // non-AWS S3-compatible service. diff --git a/cmd/litestream/replicate.go b/cmd/litestream/replicate.go index e77f3a6c..fbe68d5f 100644 --- a/cmd/litestream/replicate.go +++ b/cmd/litestream/replicate.go @@ -20,6 +20,7 @@ import ( "github.com/benbjohnson/litestream/file" "github.com/benbjohnson/litestream/gs" "github.com/benbjohnson/litestream/nats" + "github.com/benbjohnson/litestream/oss" "github.com/benbjohnson/litestream/s3" "github.com/benbjohnson/litestream/sftp" ) @@ -183,6 +184,8 @@ func (c *ReplicateCommand) Run(ctx context.Context) (err error) { slogWith.Info("replicating to", "host", client.Host, "user", client.User, "path", client.Path) case *nats.ReplicaClient: slogWith.Info("replicating to", "bucket", client.BucketName, "url", client.URL) + case *oss.ReplicaClient: + slogWith.Info("replicating to", "bucket", client.Bucket, "path", client.Path, "region", client.Region) default: slogWith.Info("replicating to") } diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index a00d281b..802b7ceb 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -37,6 +37,7 @@ graph TB S3[s3/replica_client.go] GCS[gs/replica_client.go] ABS[abs/replica_client.go] + OSS[oss/replica_client.go] File[file/replica_client.go] SFTP[sftp/replica_client.go] NATS[nats/replica_client.go] @@ -54,6 +55,7 @@ graph TB RC --> S3 RC --> GCS RC --> ABS + RC --> OSS RC --> File RC --> SFTP RC --> NATS @@ -61,6 +63,7 @@ graph TB S3 --> Cloud GCS --> Cloud ABS --> Cloud + OSS --> Cloud ``` ### Layer Responsibilities diff --git a/go.mod b/go.mod index a24cb8ea..5af91562 100644 --- a/go.mod +++ b/go.mod @@ -52,6 +52,8 @@ require ( modernc.org/memory v1.11.0 // indirect ) +require github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.3.0 + require ( cloud.google.com/go v0.111.0 // indirect cloud.google.com/go/compute v1.23.3 // indirect diff --git a/go.sum b/go.sum index 5ca79b76..a2093709 100644 --- a/go.sum +++ b/go.sum @@ -36,6 +36,8 @@ github.com/MadAppGang/httplog v1.3.0 h1:1XU54TO8kiqTeO+7oZLKAM3RP/cJ7SadzslRcKsp github.com/MadAppGang/httplog v1.3.0/go.mod h1:gpYEdkjh/Cda6YxtDy4AB7KY+fR7mb3SqBZw74A5hJ4= github.com/TylerBrock/colorjson v0.0.0-20200706003622-8a50f05110d2 h1:ZBbLwSJqkHBuFDA6DUhhse0IGJ7T5bemHyNILUjvOq4= github.com/TylerBrock/colorjson v0.0.0-20200706003622-8a50f05110d2/go.mod h1:VSw57q4QFiWDbRnjdX8Cb3Ow0SFncRw+bA/ofY6Q83w= +github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.3.0 h1:wQlqotpyjYPjJz+Noh5bRu7Snmydk8SKC5Z6u1CR20Y= +github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.3.0/go.mod h1:FTzydeQVmR24FI0D6XWUOMKckjXehM/jgMn1xC+DA9M= github.com/aws/aws-sdk-go-v2 v1.37.1 h1:SMUxeNz3Z6nqGsXv0JuJXc8w5YMtrQMuIBmDx//bBDY= github.com/aws/aws-sdk-go-v2 v1.37.1/go.mod h1:9Q0OoGQoboYIAJyslFyF1f5K1Ryddop8gqMhWx/n4Wg= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.0 h1:6GMWV6CNpA/6fbFHnoAjrv4+LGfyTqZz2LtCHnspgDg= diff --git a/internal/testingutil/testingutil.go b/internal/testingutil/testingutil.go index af369986..fb78cbcb 100644 --- a/internal/testingutil/testingutil.go +++ b/internal/testingutil/testingutil.go @@ -24,6 +24,7 @@ import ( "github.com/benbjohnson/litestream/gs" "github.com/benbjohnson/litestream/internal" "github.com/benbjohnson/litestream/nats" + "github.com/benbjohnson/litestream/oss" "github.com/benbjohnson/litestream/s3" "github.com/benbjohnson/litestream/sftp" "github.com/benbjohnson/litestream/webdav" @@ -104,6 +105,16 @@ var ( natsPassword = flag.String("nats-password", os.Getenv("LITESTREAM_NATS_PASSWORD"), "") ) +// Alibaba Cloud OSS settings +var ( + ossAccessKeyID = flag.String("oss-access-key-id", os.Getenv("LITESTREAM_OSS_ACCESS_KEY_ID"), "") + ossAccessKeySecret = flag.String("oss-access-key-secret", os.Getenv("LITESTREAM_OSS_ACCESS_KEY_SECRET"), "") + ossRegion = flag.String("oss-region", os.Getenv("LITESTREAM_OSS_REGION"), "") + ossBucket = flag.String("oss-bucket", os.Getenv("LITESTREAM_OSS_BUCKET"), "") + ossPath = flag.String("oss-path", os.Getenv("LITESTREAM_OSS_PATH"), "") + ossEndpoint = flag.String("oss-endpoint", os.Getenv("LITESTREAM_OSS_ENDPOINT"), "") +) + func Integration() bool { return *integration } @@ -215,6 +226,8 @@ func NewReplicaClient(tb testing.TB, typ string) litestream.ReplicaClient { return NewWebDAVReplicaClient(tb) case nats.ReplicaClientType: return NewNATSReplicaClient(tb) + case oss.ReplicaClientType: + return NewOSSReplicaClient(tb) case "tigris": return NewTigrisReplicaClient(tb) default: @@ -347,6 +360,20 @@ func NewNATSReplicaClient(tb testing.TB) *nats.ReplicaClient { return c } +// NewOSSReplicaClient returns a new client for integration testing. +func NewOSSReplicaClient(tb testing.TB) *oss.ReplicaClient { + tb.Helper() + + c := oss.NewReplicaClient() + c.AccessKeyID = *ossAccessKeyID + c.AccessKeySecret = *ossAccessKeySecret + c.Region = *ossRegion + c.Bucket = *ossBucket + c.Path = path.Join(*ossPath, fmt.Sprintf("%016x", rand.Uint64())) + c.Endpoint = *ossEndpoint + return c +} + // MustDeleteAll deletes all objects under the client's path. func MustDeleteAll(tb testing.TB, c litestream.ReplicaClient) { tb.Helper() diff --git a/oss/replica_client.go b/oss/replica_client.go new file mode 100644 index 00000000..4881a0f3 --- /dev/null +++ b/oss/replica_client.go @@ -0,0 +1,618 @@ +package oss + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "log/slog" + "net/url" + "os" + "path" + "regexp" + "strings" + "sync" + "time" + + "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss" + "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials" + "github.com/superfly/ltx" + + "github.com/benbjohnson/litestream" + "github.com/benbjohnson/litestream/internal" +) + +// ReplicaClientType is the client type for this package. +const ReplicaClientType = "oss" + +// MetadataKeyTimestamp is the metadata key for storing LTX file timestamps in OSS. +// Note: OSS SDK automatically adds "x-oss-meta-" prefix when setting metadata. +const MetadataKeyTimestamp = "litestream-timestamp" + +// MaxKeys is the number of keys OSS can operate on per batch. +const MaxKeys = 1000 + +// DefaultRegion is the region used if one is not specified. +const DefaultRegion = "cn-hangzhou" + +var _ litestream.ReplicaClient = (*ReplicaClient)(nil) + +// ReplicaClient is a client for writing LTX files to Alibaba Cloud OSS. +type ReplicaClient struct { + mu sync.Mutex + client *oss.Client + uploader *oss.Uploader + logger *slog.Logger + + // Alibaba Cloud authentication keys. + AccessKeyID string + AccessKeySecret string + + // OSS bucket information + Region string + Bucket string + Path string + Endpoint string + + // Upload configuration + PartSize int64 // Part size for multipart uploads (default: 5MB) + Concurrency int // Number of concurrent parts to upload (default: 3) +} + +// NewReplicaClient returns a new instance of ReplicaClient. +func NewReplicaClient() *ReplicaClient { + return &ReplicaClient{ + logger: slog.Default().WithGroup(ReplicaClientType), + } +} + +// Type returns "oss" as the client type. +func (c *ReplicaClient) Type() string { + return ReplicaClientType +} + +// Init initializes the connection to OSS. No-op if already initialized. +func (c *ReplicaClient) Init(ctx context.Context) (err error) { + c.mu.Lock() + defer c.mu.Unlock() + + if c.client != nil { + return nil + } + + // Validate required configuration + if c.Bucket == "" { + return fmt.Errorf("oss: bucket name is required") + } + + // Use default region if not specified + region := c.Region + if region == "" { + region = DefaultRegion + } + + // Build configuration + cfg := oss.LoadDefaultConfig() + + // Configure credentials + if c.AccessKeyID != "" && c.AccessKeySecret != "" { + cfg = cfg.WithCredentialsProvider( + credentials.NewStaticCredentialsProvider(c.AccessKeyID, c.AccessKeySecret), + ) + } else { + // Use environment variable credentials provider + cfg = cfg.WithCredentialsProvider( + credentials.NewEnvironmentVariableCredentialsProvider(), + ) + } + + // Configure region + cfg = cfg.WithRegion(region) + + // Configure custom endpoint if specified + if c.Endpoint != "" { + endpoint := c.Endpoint + // Add scheme if not present + if !strings.HasPrefix(endpoint, "http://") && !strings.HasPrefix(endpoint, "https://") { + endpoint = "https://" + endpoint + } + cfg = cfg.WithEndpoint(endpoint) + } + + // Create OSS client + c.client = oss.NewClient(cfg) + + // Create uploader with configurable part size and concurrency + uploaderOpts := []func(*oss.UploaderOptions){} + if c.PartSize > 0 { + uploaderOpts = append(uploaderOpts, func(o *oss.UploaderOptions) { + o.PartSize = c.PartSize + }) + } + if c.Concurrency > 0 { + uploaderOpts = append(uploaderOpts, func(o *oss.UploaderOptions) { + o.ParallelNum = c.Concurrency + }) + } + c.uploader = c.client.NewUploader(uploaderOpts...) + + return nil +} + +// LTXFiles returns an iterator over all LTX files on the replica for the given level. +// When useMetadata is true, fetches accurate timestamps from OSS metadata via HeadObject. +// When false, uses fast LastModified timestamps from LIST operation. +func (c *ReplicaClient) LTXFiles(ctx context.Context, level int, seek ltx.TXID, useMetadata bool) (ltx.FileIterator, error) { + if err := c.Init(ctx); err != nil { + return nil, err + } + return newFileIterator(ctx, c, level, seek, useMetadata), nil +} + +// OpenLTXFile returns a reader for an LTX file. +// Returns os.ErrNotExist if no matching file is found. +func (c *ReplicaClient) OpenLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, offset, size int64) (io.ReadCloser, error) { + if err := c.Init(ctx); err != nil { + return nil, err + } + + // Build the key from the file info + filename := ltx.FormatFilename(minTXID, maxTXID) + key := c.ltxPath(level, filename) + + request := &oss.GetObjectRequest{ + Bucket: oss.Ptr(c.Bucket), + Key: oss.Ptr(key), + } + + // Set range header if offset is specified + if size > 0 { + request.RangeBehavior = oss.Ptr("standard") + request.Range = oss.Ptr(fmt.Sprintf("bytes=%d-%d", offset, offset+size-1)) + } else if offset > 0 { + request.RangeBehavior = oss.Ptr("standard") + request.Range = oss.Ptr(fmt.Sprintf("bytes=%d-", offset)) + } + + result, err := c.client.GetObject(ctx, request) + if err != nil { + if isNotExists(err) { + return nil, os.ErrNotExist + } + return nil, fmt.Errorf("oss: get object %s: %w", key, err) + } + + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "GET").Inc() + + return result.Body, nil +} + +// WriteLTXFile writes an LTX file to the replica. +// Extracts timestamp from LTX header and stores it in OSS metadata to preserve original creation time. +// Uses multipart upload for large files via the uploader. +func (c *ReplicaClient) WriteLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, r io.Reader) (*ltx.FileInfo, error) { + if err := c.Init(ctx); err != nil { + return nil, err + } + + // Use TeeReader to peek at LTX header while preserving data for upload + var buf bytes.Buffer + teeReader := io.TeeReader(r, &buf) + + // Extract timestamp from LTX header + hdr, _, err := ltx.PeekHeader(teeReader) + if err != nil { + return nil, fmt.Errorf("extract timestamp from LTX header: %w", err) + } + timestamp := time.UnixMilli(hdr.Timestamp).UTC() + + // Combine buffered data with rest of reader + rc := internal.NewReadCounter(io.MultiReader(&buf, r)) + + filename := ltx.FormatFilename(minTXID, maxTXID) + key := c.ltxPath(level, filename) + + // Store timestamp in OSS metadata for accurate timestamp retrieval + metadata := map[string]string{ + MetadataKeyTimestamp: timestamp.Format(time.RFC3339Nano), + } + + // Use uploader for automatic multipart handling (files >5GB) + result, err := c.uploader.UploadFrom(ctx, &oss.PutObjectRequest{ + Bucket: oss.Ptr(c.Bucket), + Key: oss.Ptr(key), + Metadata: metadata, + }, rc) + if err != nil { + return nil, fmt.Errorf("oss: upload to %s: %w", key, err) + } + + // Build file info from the uploaded file + info := <x.FileInfo{ + Level: level, + MinTXID: minTXID, + MaxTXID: maxTXID, + Size: rc.N(), + CreatedAt: timestamp, + } + + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "PUT").Inc() + internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "PUT").Add(float64(rc.N())) + + // ETag indicates successful upload + if result.ETag == nil || *result.ETag == "" { + return nil, fmt.Errorf("oss: upload failed: no ETag returned") + } + + return info, nil +} + +// DeleteLTXFiles deletes one or more LTX files. +func (c *ReplicaClient) DeleteLTXFiles(ctx context.Context, a []*ltx.FileInfo) error { + if err := c.Init(ctx); err != nil { + return err + } + + if len(a) == 0 { + return nil + } + + // Convert file infos to object identifiers + objects := make([]oss.DeleteObject, 0, len(a)) + for _, info := range a { + filename := ltx.FormatFilename(info.MinTXID, info.MaxTXID) + key := c.ltxPath(info.Level, filename) + objects = append(objects, oss.DeleteObject{Key: oss.Ptr(key)}) + + c.logger.Debug("deleting ltx file", "level", info.Level, "minTXID", info.MinTXID, "maxTXID", info.MaxTXID, "key", key) + } + + // Delete in batches + for len(objects) > 0 { + n := min(len(objects), MaxKeys) + batch := objects[:n] + + request := &oss.DeleteMultipleObjectsRequest{ + Bucket: oss.Ptr(c.Bucket), + Objects: batch, + } + + out, err := c.client.DeleteMultipleObjects(ctx, request) + if err != nil { + return fmt.Errorf("oss: delete batch of %d objects: %w", n, err) + } else if err := deleteResultError(batch, out); err != nil { + return err + } + + internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc() + + objects = objects[n:] + } + + return nil +} + +// DeleteAll deletes all files. +func (c *ReplicaClient) DeleteAll(ctx context.Context) error { + if err := c.Init(ctx); err != nil { + return err + } + + var objects []oss.DeleteObject + + // Create paginator for listing objects + prefix := c.Path + "/" + paginator := c.client.NewListObjectsV2Paginator(&oss.ListObjectsV2Request{ + Bucket: oss.Ptr(c.Bucket), + Prefix: oss.Ptr(prefix), + }) + + // Iterate through all pages + for paginator.HasNext() { + page, err := paginator.NextPage(ctx) + if err != nil { + return fmt.Errorf("oss: list objects page: %w", err) + } + + // Collect object identifiers + for _, obj := range page.Contents { + if obj.Key != nil { + objects = append(objects, oss.DeleteObject{Key: obj.Key}) + } + } + } + + // Delete all collected objects in batches + for len(objects) > 0 { + n := min(len(objects), MaxKeys) + batch := objects[:n] + + request := &oss.DeleteMultipleObjectsRequest{ + Bucket: oss.Ptr(c.Bucket), + Objects: batch, + } + + out, err := c.client.DeleteMultipleObjects(ctx, request) + if err != nil { + return fmt.Errorf("oss: delete all batch of %d objects: %w", n, err) + } else if err := deleteResultError(batch, out); err != nil { + return err + } + + objects = objects[n:] + } + + return nil +} + +// ltxPath returns the full path to an LTX file. +func (c *ReplicaClient) ltxPath(level int, filename string) string { + return c.Path + "/" + fmt.Sprintf("%04x/%s", level, filename) +} + +// fileIterator represents an iterator over LTX files in OSS. +type fileIterator struct { + ctx context.Context + cancel context.CancelFunc + client *ReplicaClient + level int + seek ltx.TXID + useMetadata bool // When true, fetch accurate timestamps from metadata + + paginator *oss.ListObjectsV2Paginator + page *oss.ListObjectsV2Result + pageIndex int + initialized bool + + closed bool + err error + info *ltx.FileInfo +} + +func newFileIterator(ctx context.Context, client *ReplicaClient, level int, seek ltx.TXID, useMetadata bool) *fileIterator { + ctx, cancel := context.WithCancel(ctx) + + itr := &fileIterator{ + ctx: ctx, + cancel: cancel, + client: client, + level: level, + seek: seek, + useMetadata: useMetadata, + } + + return itr +} + +// initPaginator initializes the paginator lazily. +func (itr *fileIterator) initPaginator() { + if itr.initialized { + return + } + itr.initialized = true + + // Create paginator for listing objects with level prefix + prefix := itr.client.ltxPath(itr.level, "") + itr.paginator = itr.client.client.NewListObjectsV2Paginator(&oss.ListObjectsV2Request{ + Bucket: oss.Ptr(itr.client.Bucket), + Prefix: oss.Ptr(prefix), + }) +} + +// Close stops iteration. +func (itr *fileIterator) Close() (err error) { + itr.closed = true + itr.cancel() + return nil +} + +// Next returns the next file. Returns false when no more files are available. +func (itr *fileIterator) Next() bool { + if itr.closed || itr.err != nil { + return false + } + + // Initialize paginator on first call + itr.initPaginator() + + // Process objects until we find a valid LTX file + for { + // Load next page if needed + if itr.page == nil || itr.pageIndex >= len(itr.page.Contents) { + if !itr.paginator.HasNext() { + return false + } + + var err error + itr.page, err = itr.paginator.NextPage(itr.ctx) + if err != nil { + itr.err = err + return false + } + itr.pageIndex = 0 + } + + // Process current object + if itr.pageIndex < len(itr.page.Contents) { + obj := itr.page.Contents[itr.pageIndex] + itr.pageIndex++ + + if obj.Key == nil { + continue + } + + // Extract file info from key + key := path.Base(*obj.Key) + minTXID, maxTXID, err := ltx.ParseFilename(key) + if err != nil { + continue // Skip non-LTX files + } + + // Build file info + info := <x.FileInfo{ + Level: itr.level, + MinTXID: minTXID, + MaxTXID: maxTXID, + } + + // Skip if below seek TXID + if info.MinTXID < itr.seek { + continue + } + + // Set file info + info.Size = obj.Size + + // Use fast LastModified timestamp by default + var createdAt time.Time + if obj.LastModified != nil { + createdAt = obj.LastModified.UTC() + } else { + createdAt = time.Now().UTC() + } + + // Only fetch accurate timestamp from metadata when requested (timestamp-based restore) + if itr.useMetadata { + head, err := itr.client.client.HeadObject(itr.ctx, &oss.HeadObjectRequest{ + Bucket: oss.Ptr(itr.client.Bucket), + Key: obj.Key, + }) + if err != nil { + itr.err = fmt.Errorf("fetch object metadata: %w", err) + return false + } + + if head.Metadata != nil { + if ts, ok := head.Metadata[MetadataKeyTimestamp]; ok { + if parsed, err := time.Parse(time.RFC3339Nano, ts); err == nil { + createdAt = parsed + } else { + itr.err = fmt.Errorf("parse timestamp from metadata: %w", err) + return false + } + } + } + } + + info.CreatedAt = createdAt + itr.info = info + return true + } + } +} + +// Item returns the metadata for the current file. +func (itr *fileIterator) Item() *ltx.FileInfo { + return itr.info +} + +// Err returns any error that occurred during iteration. +func (itr *fileIterator) Err() error { + return itr.err +} + +// ParseURL parses an OSS URL into its host and path parts. +func ParseURL(s string) (bucket, region, key string, err error) { + u, err := url.Parse(s) + if err != nil { + return "", "", "", err + } + + if u.Scheme != "oss" { + return "", "", "", fmt.Errorf("oss: invalid url scheme") + } + + // Parse host to extract bucket and region + bucket, region, _ = ParseHost(u.Host) + if bucket == "" { + bucket = u.Host + } + + key = strings.TrimPrefix(u.Path, "/") + return bucket, region, key, nil +} + +// ParseHost parses the host/endpoint for an OSS storage system. +// Supports formats like: +// - bucket.oss-cn-hangzhou.aliyuncs.com +// - bucket.oss-cn-hangzhou-internal.aliyuncs.com +// - bucket (just bucket name) +func ParseHost(host string) (bucket, region, endpoint string) { + // Check for internal OSS URL format first (more specific pattern) + if a := ossInternalRegex.FindStringSubmatch(host); len(a) > 1 { + bucket = a[1] + if len(a) > 2 && a[2] != "" { + region = a[2] + } + return bucket, region, "" + } + + // Check for standard OSS URL format + if a := ossRegex.FindStringSubmatch(host); len(a) > 1 { + bucket = a[1] + if len(a) > 2 && a[2] != "" { + region = a[2] + } + return bucket, region, "" + } + + // For other hosts, assume it's just the bucket name + return host, "", "" +} + +var ( + // oss-cn-hangzhou.aliyuncs.com or bucket.oss-cn-hangzhou.aliyuncs.com + ossRegex = regexp.MustCompile(`^(?:([^.]+)\.)?oss-([^.]+)\.aliyuncs\.com$`) + // oss-cn-hangzhou-internal.aliyuncs.com or bucket.oss-cn-hangzhou-internal.aliyuncs.com + // Uses non-greedy .+? to correctly extract region without -internal suffix + ossInternalRegex = regexp.MustCompile(`^(?:([^.]+)\.)?oss-(.+?)-internal\.aliyuncs\.com$`) +) + +func isNotExists(err error) bool { + var serviceErr *oss.ServiceError + if errors.As(err, &serviceErr) { + return serviceErr.Code == "NoSuchKey" + } + return false +} + +// deleteResultError checks if all requested objects were deleted. +// OSS SDK doesn't have explicit per-object error reporting like S3, so we verify +// all requested keys appear in the deleted list. +func deleteResultError(requested []oss.DeleteObject, out *oss.DeleteMultipleObjectsResult) error { + if out == nil { + return nil + } + + // Build set of deleted keys for quick lookup + deleted := make(map[string]struct{}, len(out.DeletedObjects)) + for _, obj := range out.DeletedObjects { + if obj.Key != nil { + deleted[*obj.Key] = struct{}{} + } + } + + // Check that all requested keys were deleted + var failed []string + for _, obj := range requested { + if obj.Key == nil { + continue + } + if _, ok := deleted[*obj.Key]; !ok { + failed = append(failed, *obj.Key) + } + } + + if len(failed) == 0 { + return nil + } + + // Build error message listing failed keys + var b strings.Builder + b.WriteString("oss: failed to delete files:") + for _, key := range failed { + fmt.Fprintf(&b, "\n%s", key) + } + return errors.New(b.String()) +} diff --git a/oss/replica_client_test.go b/oss/replica_client_test.go new file mode 100644 index 00000000..df7518d9 --- /dev/null +++ b/oss/replica_client_test.go @@ -0,0 +1,372 @@ +package oss + +import ( + "errors" + "testing" + + "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss" +) + +func TestReplicaClient_Type(t *testing.T) { + c := NewReplicaClient() + if got := c.Type(); got != ReplicaClientType { + t.Errorf("Type() = %q, want %q", got, ReplicaClientType) + } + if got := c.Type(); got != "oss" { + t.Errorf("Type() = %q, want %q", got, "oss") + } +} + +func TestReplicaClient_Init_BucketValidation(t *testing.T) { + t.Run("EmptyBucket", func(t *testing.T) { + c := NewReplicaClient() + c.Bucket = "" // Empty bucket name + c.Region = "cn-hangzhou" + + err := c.Init(t.Context()) + if err == nil { + t.Fatal("expected error for empty bucket name") + } + if got := err.Error(); got != "oss: bucket name is required" { + t.Errorf("unexpected error: %v", err) + } + }) + + t.Run("ValidBucketWithRegion", func(t *testing.T) { + c := NewReplicaClient() + c.Bucket = "test-bucket" + c.Region = "cn-hangzhou" + c.AccessKeyID = "test-key" + c.AccessKeySecret = "test-secret" + + // Init should succeed (client will be created even without real credentials) + err := c.Init(t.Context()) + if err != nil { + t.Errorf("Init() should succeed with valid bucket: %v", err) + } + }) + + t.Run("ValidBucketDefaultRegion", func(t *testing.T) { + c := NewReplicaClient() + c.Bucket = "test-bucket" + // Region is empty, should use DefaultRegion + c.AccessKeyID = "test-key" + c.AccessKeySecret = "test-secret" + + err := c.Init(t.Context()) + if err != nil { + t.Errorf("Init() should succeed with default region: %v", err) + } + }) +} + +func TestReplicaClient_Init_Idempotent(t *testing.T) { + c := NewReplicaClient() + c.Bucket = "test-bucket" + c.AccessKeyID = "test-key" + c.AccessKeySecret = "test-secret" + + // First init + if err := c.Init(t.Context()); err != nil { + t.Fatalf("first Init() failed: %v", err) + } + + // Second init should be a no-op + if err := c.Init(t.Context()); err != nil { + t.Fatalf("second Init() failed: %v", err) + } +} + +func TestParseURL(t *testing.T) { + tests := []struct { + name string + url string + wantBucket string + wantRegion string + wantKey string + wantErr bool + }{ + { + name: "SimpleOSSURL", + url: "oss://my-bucket/path/to/file", + wantBucket: "my-bucket", + wantRegion: "", + wantKey: "path/to/file", + wantErr: false, + }, + { + name: "OSSURLWithRegion", + url: "oss://my-bucket.oss-cn-hangzhou.aliyuncs.com/backup", + wantBucket: "my-bucket", + wantRegion: "cn-hangzhou", + wantKey: "backup", + wantErr: false, + }, + { + name: "OSSURLNoPath", + url: "oss://my-bucket", + wantBucket: "my-bucket", + wantRegion: "", + wantKey: "", + wantErr: false, + }, + { + name: "InvalidScheme", + url: "s3://my-bucket/path", + wantBucket: "", + wantRegion: "", + wantKey: "", + wantErr: true, + }, + { + name: "HTTPScheme", + url: "http://my-bucket/path", + wantBucket: "", + wantRegion: "", + wantKey: "", + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + bucket, region, key, err := ParseURL(tt.url) + + if (err != nil) != tt.wantErr { + t.Errorf("ParseURL() error = %v, wantErr %v", err, tt.wantErr) + return + } + + if err != nil { + return + } + + if bucket != tt.wantBucket { + t.Errorf("bucket = %q, want %q", bucket, tt.wantBucket) + } + if region != tt.wantRegion { + t.Errorf("region = %q, want %q", region, tt.wantRegion) + } + if key != tt.wantKey { + t.Errorf("key = %q, want %q", key, tt.wantKey) + } + }) + } +} + +func TestParseHost(t *testing.T) { + tests := []struct { + name string + host string + wantBucket string + wantRegion string + }{ + { + name: "StandardOSSURL", + host: "my-bucket.oss-cn-hangzhou.aliyuncs.com", + wantBucket: "my-bucket", + wantRegion: "cn-hangzhou", + }, + { + name: "OSSURLBeijingRegion", + host: "test-bucket.oss-cn-beijing.aliyuncs.com", + wantBucket: "test-bucket", + wantRegion: "cn-beijing", + }, + { + name: "OSSURLShanghaiRegion", + host: "data-bucket.oss-cn-shanghai.aliyuncs.com", + wantBucket: "data-bucket", + wantRegion: "cn-shanghai", + }, + { + name: "InternalOSSURL", + host: "my-bucket.oss-cn-hangzhou-internal.aliyuncs.com", + wantBucket: "my-bucket", + wantRegion: "cn-hangzhou", + }, + { + name: "InternalOSSURLBeijing", + host: "test-bucket.oss-cn-beijing-internal.aliyuncs.com", + wantBucket: "test-bucket", + wantRegion: "cn-beijing", + }, + { + name: "SimpleBucketName", + host: "my-bucket", + wantBucket: "my-bucket", + wantRegion: "", + }, + { + name: "BucketWithHyphens", + host: "my-test-bucket-2024.oss-cn-shenzhen.aliyuncs.com", + wantBucket: "my-test-bucket-2024", + wantRegion: "cn-shenzhen", + }, + { + name: "OSSURLWithNumbers", + host: "bucket123.oss-cn-hangzhou.aliyuncs.com", + wantBucket: "bucket123", + wantRegion: "cn-hangzhou", + }, + { + name: "OSSURLHongKong", + host: "hk-bucket.oss-cn-hongkong.aliyuncs.com", + wantBucket: "hk-bucket", + wantRegion: "cn-hongkong", + }, + { + name: "OSSURLSingapore", + host: "sg-bucket.oss-ap-southeast-1.aliyuncs.com", + wantBucket: "sg-bucket", + wantRegion: "ap-southeast-1", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + bucket, region, _ := ParseHost(tt.host) + + if bucket != tt.wantBucket { + t.Errorf("bucket = %q, want %q", bucket, tt.wantBucket) + } + if region != tt.wantRegion { + t.Errorf("region = %q, want %q", region, tt.wantRegion) + } + }) + } +} + +func TestIsNotExists(t *testing.T) { + t.Run("NilError", func(t *testing.T) { + if isNotExists(nil) { + t.Error("isNotExists should return false for nil error") + } + }) + + t.Run("RegularError", func(t *testing.T) { + regularErr := errors.New("regular error") + if isNotExists(regularErr) { + t.Error("isNotExists should return false for regular error") + } + }) + + t.Run("WrappedError", func(t *testing.T) { + wrappedErr := errors.New("wrapped: something went wrong") + if isNotExists(wrappedErr) { + t.Error("isNotExists should return false for wrapped non-ServiceError") + } + }) +} + +func TestLtxPath(t *testing.T) { + c := NewReplicaClient() + c.Path = "backups" + + tests := []struct { + level int + filename string + want string + }{ + {0, "00000001-00000001.ltx", "backups/0000/00000001-00000001.ltx"}, + {1, "00000001-00000010.ltx", "backups/0001/00000001-00000010.ltx"}, + {15, "00000001-000000ff.ltx", "backups/000f/00000001-000000ff.ltx"}, + } + + for _, tt := range tests { + t.Run(tt.want, func(t *testing.T) { + got := c.ltxPath(tt.level, tt.filename) + if got != tt.want { + t.Errorf("ltxPath(%d, %q) = %q, want %q", tt.level, tt.filename, got, tt.want) + } + }) + } +} + +func TestDeleteResultError(t *testing.T) { + ptr := func(s string) *string { return &s } + + t.Run("NilResult", func(t *testing.T) { + requested := []oss.DeleteObject{{Key: ptr("key1")}} + if err := deleteResultError(requested, nil); err != nil { + t.Errorf("expected nil error for nil result, got %v", err) + } + }) + + t.Run("AllDeleted", func(t *testing.T) { + requested := []oss.DeleteObject{ + {Key: ptr("key1")}, + {Key: ptr("key2")}, + } + result := &oss.DeleteMultipleObjectsResult{ + DeletedObjects: []oss.DeletedInfo{ + {Key: ptr("key1")}, + {Key: ptr("key2")}, + }, + } + if err := deleteResultError(requested, result); err != nil { + t.Errorf("expected nil error when all deleted, got %v", err) + } + }) + + t.Run("SomeNotDeleted", func(t *testing.T) { + requested := []oss.DeleteObject{ + {Key: ptr("key1")}, + {Key: ptr("key2")}, + {Key: ptr("key3")}, + } + result := &oss.DeleteMultipleObjectsResult{ + DeletedObjects: []oss.DeletedInfo{ + {Key: ptr("key1")}, + // key2 and key3 not deleted + }, + } + err := deleteResultError(requested, result) + if err == nil { + t.Fatal("expected error when some keys not deleted") + } + errStr := err.Error() + if !contains(errStr, "key2") { + t.Errorf("error should mention key2: %s", errStr) + } + if !contains(errStr, "key3") { + t.Errorf("error should mention key3: %s", errStr) + } + }) + + t.Run("EmptyRequested", func(t *testing.T) { + requested := []oss.DeleteObject{} + result := &oss.DeleteMultipleObjectsResult{} + if err := deleteResultError(requested, result); err != nil { + t.Errorf("expected nil error for empty requested, got %v", err) + } + }) + + t.Run("NilKeyInRequested", func(t *testing.T) { + requested := []oss.DeleteObject{ + {Key: nil}, // nil key should be skipped + {Key: ptr("key1")}, + } + result := &oss.DeleteMultipleObjectsResult{ + DeletedObjects: []oss.DeletedInfo{ + {Key: ptr("key1")}, + }, + } + if err := deleteResultError(requested, result); err != nil { + t.Errorf("expected nil error, got %v", err) + } + }) +} + +func contains(s, substr string) bool { + return len(s) >= len(substr) && (s == substr || len(s) > 0 && containsHelper(s, substr)) +} + +func containsHelper(s, substr string) bool { + for i := 0; i <= len(s)-len(substr); i++ { + if s[i:i+len(substr)] == substr { + return true + } + } + return false +}