Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Litestream is a **disaster recovery tool for SQLite** that runs as a background
- **Multi-level Compaction**: Hierarchical compaction keeps storage efficient (30s → 5m → 1h → snapshots)
- **Single Replica Constraint**: Each database is replicated to exactly one remote destination
- **Pure Go Build**: Uses `modernc.org/sqlite`, so no CGO dependency for the main binary
- **Optional NATS JetStream Support**: Additional replica backend alongside S3/GCS/ABS/File/SFTP
- **Optional NATS JetStream Support**: Additional replica backend alongside S3/GCS/ABS/OSS/File/SFTP
- **Snapshot Compatibility**: Only LTX-based backups are supported—keep legacy v0.3.x binaries to restore old WAL snapshots

**Key Design Principles:**
Expand Down Expand Up @@ -728,7 +728,7 @@ if offset < 0 {

```bash
# Test eventual consistency
go test -v ./replica_client_test.go -integration [s3|gcs|abs|sftp]
go test -v ./replica_client_test.go -integration [s3|gcs|abs|oss|sftp]

# Test partial reads
# (Example) add targeted partial-read tests in your backend package
Expand Down
4 changes: 3 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ CGO_ENABLED=1 go build -tags vfs -o bin/litestream-vfs ./cmd/litestream-vfs # T
go test -v ./replica_client_test.go -integration s3
go test -v ./replica_client_test.go -integration gcs
go test -v ./replica_client_test.go -integration abs
go test -v ./replica_client_test.go -integration oss
go test -v ./replica_client_test.go -integration sftp
```

Expand All @@ -112,7 +113,7 @@ pre-commit run --all-files

**Replica (`replica.go`)**: Connects a database to replication destinations via ReplicaClient interface. Manages periodic synchronization and maintains replication position.

**ReplicaClient Interface** (`replica_client.go`): Abstraction for different storage backends (S3, GCS, Azure Blob Storage, SFTP, file system, NATS). Each implementation handles snapshot/WAL segment upload and restoration. The `LTXFiles` method includes a `useMetadata` parameter: when true, it fetches accurate timestamps from backend metadata (required for point-in-time restores); when false, it uses fast timestamps for normal operations. During compaction, the system preserves the earliest CreatedAt timestamp from source files to maintain temporal granularity for restoration.
**ReplicaClient Interface** (`replica_client.go`): Abstraction for different storage backends (S3, GCS, Azure Blob Storage, OSS, SFTP, file system, NATS). Each implementation handles snapshot/WAL segment upload and restoration. The `LTXFiles` method includes a `useMetadata` parameter: when true, it fetches accurate timestamps from backend metadata (required for point-in-time restores); when false, it uses fast timestamps for normal operations. During compaction, the system preserves the earliest CreatedAt timestamp from source files to maintain temporal granularity for restoration.

**WAL Processing**: The system monitors SQLite WAL files for changes, segments them into LTX format files, and replicates these segments to configured destinations. Uses SQLite checksums for integrity verification.

Expand All @@ -121,6 +122,7 @@ pre-commit run --all-files
- **S3** (`s3/replica_client.go`): AWS S3 and compatible storage
- **GCS** (`gs/replica_client.go`): Google Cloud Storage
- **ABS** (`abs/replica_client.go`): Azure Blob Storage
- **OSS** (`oss/replica_client.go`): Alibaba Cloud Object Storage Service
- **SFTP** (`sftp/replica_client.go`): SSH File Transfer Protocol
- **File** (`file/replica_client.go`): Local file system replication
- **NATS** (`nats/replica_client.go`): NATS JetStream object storage
Expand Down
68 changes: 68 additions & 0 deletions cmd/litestream/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/benbjohnson/litestream/gs"
"github.com/benbjohnson/litestream/internal"
"github.com/benbjohnson/litestream/nats"
"github.com/benbjohnson/litestream/oss"
"github.com/benbjohnson/litestream/s3"
"github.com/benbjohnson/litestream/sftp"
"github.com/benbjohnson/litestream/webdav"
Expand Down Expand Up @@ -1059,6 +1060,10 @@ func NewReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *litestream.Re
if r.Client, err = newNATSReplicaClientFromConfig(c, r); err != nil {
return nil, err
}
case "oss":
if r.Client, err = newOSSReplicaClientFromConfig(c, r); err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unknown replica type in config: %q", c.Type)
}
Expand Down Expand Up @@ -1518,6 +1523,69 @@ func newNATSReplicaClientFromConfig(c *ReplicaConfig, _ *litestream.Replica) (_
return client, nil
}

// newOSSReplicaClientFromConfig returns a new instance of oss.ReplicaClient built from config.
func newOSSReplicaClientFromConfig(c *ReplicaConfig, _ *litestream.Replica) (_ *oss.ReplicaClient, err error) {
// Ensure URL & constituent parts are not both specified.
if c.URL != "" && c.Path != "" {
return nil, fmt.Errorf("cannot specify url & path for oss replica")
} else if c.URL != "" && c.Bucket != "" {
return nil, fmt.Errorf("cannot specify url & bucket for oss replica")
}

bucket, configPath := c.Bucket, c.Path
region, endpoint := c.Region, c.Endpoint

// Apply settings from URL, if specified.
if c.URL != "" {
_, host, upath, err := ParseReplicaURL(c.URL)
if err != nil {
return nil, err
}

var (
ubucket string
uregion string
)

ubucket, uregion, _ = oss.ParseHost(host)

// Only apply URL parts to fields that have not been overridden.
if configPath == "" {
configPath = upath
}
if bucket == "" {
bucket = ubucket
}
if region == "" {
region = uregion
}
}

// Ensure required settings are set.
if bucket == "" {
return nil, fmt.Errorf("bucket required for oss replica")
}

// Build replica client.
client := oss.NewReplicaClient()
client.AccessKeyID = c.AccessKeyID
client.AccessKeySecret = c.SecretAccessKey
client.Bucket = bucket
client.Path = configPath
client.Region = region
client.Endpoint = endpoint

// Apply upload configuration if specified.
if c.PartSize != nil {
client.PartSize = int64(*c.PartSize)
}
if c.Concurrency != nil {
client.Concurrency = *c.Concurrency
}

return client, nil
}

// applyLitestreamEnv copies "LITESTREAM" prefixed environment variables to
// their AWS counterparts as the "AWS" prefix can be confusing when using a
// non-AWS S3-compatible service.
Expand Down
3 changes: 3 additions & 0 deletions cmd/litestream/replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/benbjohnson/litestream/file"
"github.com/benbjohnson/litestream/gs"
"github.com/benbjohnson/litestream/nats"
"github.com/benbjohnson/litestream/oss"
"github.com/benbjohnson/litestream/s3"
"github.com/benbjohnson/litestream/sftp"
)
Expand Down Expand Up @@ -183,6 +184,8 @@ func (c *ReplicateCommand) Run(ctx context.Context) (err error) {
slogWith.Info("replicating to", "host", client.Host, "user", client.User, "path", client.Path)
case *nats.ReplicaClient:
slogWith.Info("replicating to", "bucket", client.BucketName, "url", client.URL)
case *oss.ReplicaClient:
slogWith.Info("replicating to", "bucket", client.Bucket, "path", client.Path, "region", client.Region)
default:
slogWith.Info("replicating to")
}
Expand Down
3 changes: 3 additions & 0 deletions docs/ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ graph TB
S3[s3/replica_client.go]
GCS[gs/replica_client.go]
ABS[abs/replica_client.go]
OSS[oss/replica_client.go]
File[file/replica_client.go]
SFTP[sftp/replica_client.go]
NATS[nats/replica_client.go]
Expand All @@ -54,13 +55,15 @@ graph TB
RC --> S3
RC --> GCS
RC --> ABS
RC --> OSS
RC --> File
RC --> SFTP
RC --> NATS
DB <--> SQLite
S3 --> Cloud
GCS --> Cloud
ABS --> Cloud
OSS --> Cloud
```

### Layer Responsibilities
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ require (
modernc.org/memory v1.11.0 // indirect
)

require github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.3.0

require (
cloud.google.com/go v0.111.0 // indirect
cloud.google.com/go/compute v1.23.3 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ github.com/MadAppGang/httplog v1.3.0 h1:1XU54TO8kiqTeO+7oZLKAM3RP/cJ7SadzslRcKsp
github.com/MadAppGang/httplog v1.3.0/go.mod h1:gpYEdkjh/Cda6YxtDy4AB7KY+fR7mb3SqBZw74A5hJ4=
github.com/TylerBrock/colorjson v0.0.0-20200706003622-8a50f05110d2 h1:ZBbLwSJqkHBuFDA6DUhhse0IGJ7T5bemHyNILUjvOq4=
github.com/TylerBrock/colorjson v0.0.0-20200706003622-8a50f05110d2/go.mod h1:VSw57q4QFiWDbRnjdX8Cb3Ow0SFncRw+bA/ofY6Q83w=
github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.3.0 h1:wQlqotpyjYPjJz+Noh5bRu7Snmydk8SKC5Z6u1CR20Y=
github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.3.0/go.mod h1:FTzydeQVmR24FI0D6XWUOMKckjXehM/jgMn1xC+DA9M=
github.com/aws/aws-sdk-go-v2 v1.37.1 h1:SMUxeNz3Z6nqGsXv0JuJXc8w5YMtrQMuIBmDx//bBDY=
github.com/aws/aws-sdk-go-v2 v1.37.1/go.mod h1:9Q0OoGQoboYIAJyslFyF1f5K1Ryddop8gqMhWx/n4Wg=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.0 h1:6GMWV6CNpA/6fbFHnoAjrv4+LGfyTqZz2LtCHnspgDg=
Expand Down
27 changes: 27 additions & 0 deletions internal/testingutil/testingutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/benbjohnson/litestream/gs"
"github.com/benbjohnson/litestream/internal"
"github.com/benbjohnson/litestream/nats"
"github.com/benbjohnson/litestream/oss"
"github.com/benbjohnson/litestream/s3"
"github.com/benbjohnson/litestream/sftp"
"github.com/benbjohnson/litestream/webdav"
Expand Down Expand Up @@ -104,6 +105,16 @@ var (
natsPassword = flag.String("nats-password", os.Getenv("LITESTREAM_NATS_PASSWORD"), "")
)

// Alibaba Cloud OSS settings
var (
ossAccessKeyID = flag.String("oss-access-key-id", os.Getenv("LITESTREAM_OSS_ACCESS_KEY_ID"), "")
ossAccessKeySecret = flag.String("oss-access-key-secret", os.Getenv("LITESTREAM_OSS_ACCESS_KEY_SECRET"), "")
ossRegion = flag.String("oss-region", os.Getenv("LITESTREAM_OSS_REGION"), "")
ossBucket = flag.String("oss-bucket", os.Getenv("LITESTREAM_OSS_BUCKET"), "")
ossPath = flag.String("oss-path", os.Getenv("LITESTREAM_OSS_PATH"), "")
ossEndpoint = flag.String("oss-endpoint", os.Getenv("LITESTREAM_OSS_ENDPOINT"), "")
)

func Integration() bool {
return *integration
}
Expand Down Expand Up @@ -215,6 +226,8 @@ func NewReplicaClient(tb testing.TB, typ string) litestream.ReplicaClient {
return NewWebDAVReplicaClient(tb)
case nats.ReplicaClientType:
return NewNATSReplicaClient(tb)
case oss.ReplicaClientType:
return NewOSSReplicaClient(tb)
case "tigris":
return NewTigrisReplicaClient(tb)
default:
Expand Down Expand Up @@ -347,6 +360,20 @@ func NewNATSReplicaClient(tb testing.TB) *nats.ReplicaClient {
return c
}

// NewOSSReplicaClient returns a new client for integration testing.
func NewOSSReplicaClient(tb testing.TB) *oss.ReplicaClient {
tb.Helper()

c := oss.NewReplicaClient()
c.AccessKeyID = *ossAccessKeyID
c.AccessKeySecret = *ossAccessKeySecret
c.Region = *ossRegion
c.Bucket = *ossBucket
c.Path = path.Join(*ossPath, fmt.Sprintf("%016x", rand.Uint64()))
c.Endpoint = *ossEndpoint
return c
}

// MustDeleteAll deletes all objects under the client's path.
func MustDeleteAll(tb testing.TB, c litestream.ReplicaClient) {
tb.Helper()
Expand Down
Loading