diff --git a/go.mod b/go.mod index 94eb927769..75a1b56fe1 100644 --- a/go.mod +++ b/go.mod @@ -56,6 +56,12 @@ require ( ) require ( + github.com/aws/aws-sdk-go-v2 v1.36.5 + github.com/aws/aws-sdk-go-v2/config v1.29.17 + github.com/aws/aws-sdk-go-v2/credentials v1.17.70 + github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.83 + github.com/aws/aws-sdk-go-v2/service/s3 v1.83.0 + github.com/aws/smithy-go v1.22.4 github.com/cenkalti/backoff/v4 v4.3.0 github.com/creachadair/jrpc2 v1.2.0 github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da @@ -73,6 +79,19 @@ require ( cloud.google.com/go/longrunning v0.5.7 // indirect cloud.google.com/go/pubsub v1.38.0 // indirect github.com/andybalholm/brotli v1.0.4 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.11 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.32 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.36 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.36 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.36 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.4 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.4 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.17 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.17 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.25.5 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.3 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.34.0 // indirect github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/containerd/log v0.1.0 // indirect diff --git a/go.sum b/go.sum index a55c2ac929..cedc199994 100644 --- a/go.sum +++ b/go.sum @@ -84,6 +84,44 @@ github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:W github.com/aws/aws-sdk-go v1.33.2/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= github.com/aws/aws-sdk-go v1.45.27 h1:b+zOTPkAG4i2RvqPdHxkJZafmhhVaVHBp4r41Tu4I6U= github.com/aws/aws-sdk-go v1.45.27/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= +github.com/aws/aws-sdk-go-v2 v1.36.5 h1:0OF9RiEMEdDdZEMqF9MRjevyxAQcf6gY+E7vwBILFj0= +github.com/aws/aws-sdk-go-v2 v1.36.5/go.mod h1:EYrzvCCN9CMUTa5+6lf6MM4tq3Zjp8UhSGR/cBsjai0= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.11 h1:12SpdwU8Djs+YGklkinSSlcrPyj3H4VifVsKf78KbwA= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.11/go.mod h1:dd+Lkp6YmMryke+qxW/VnKyhMBDTYP41Q2Bb+6gNZgY= +github.com/aws/aws-sdk-go-v2/config v1.29.17 h1:jSuiQ5jEe4SAMH6lLRMY9OVC+TqJLP5655pBGjmnjr0= +github.com/aws/aws-sdk-go-v2/config v1.29.17/go.mod h1:9P4wwACpbeXs9Pm9w1QTh6BwWwJjwYvJ1iCt5QbCXh8= +github.com/aws/aws-sdk-go-v2/credentials v1.17.70 h1:ONnH5CM16RTXRkS8Z1qg7/s2eDOhHhaXVd72mmyv4/0= +github.com/aws/aws-sdk-go-v2/credentials v1.17.70/go.mod h1:M+lWhhmomVGgtuPOhO85u4pEa3SmssPTdcYpP/5J/xc= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.32 h1:KAXP9JSHO1vKGCr5f4O6WmlVKLFFXgWYAGoJosorxzU= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.32/go.mod h1:h4Sg6FQdexC1yYG9RDnOvLbW1a/P986++/Y/a+GyEM8= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.83 h1:08otkOELsIi0toRRGMytlJhOctcN8xfKfKFR2NXz3kE= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.83/go.mod h1:dGsGb2wI8JDWeMAhjVPP+z+dqvYjL6k6o+EujcRNk5c= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.36 h1:SsytQyTMHMDPspp+spo7XwXTP44aJZZAC7fBV2C5+5s= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.36/go.mod h1:Q1lnJArKRXkenyog6+Y+zr7WDpk4e6XlR6gs20bbeNo= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.36 h1:i2vNHQiXUvKhs3quBR6aqlgJaiaexz/aNvdCktW/kAM= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.36/go.mod h1:UdyGa7Q91id/sdyHPwth+043HhmP6yP9MBHgbZM0xo8= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 h1:bIqFDwgGXXN1Kpp99pDOdKMTTb5d2KyU5X/BZxjOkRo= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3/go.mod h1:H5O/EsxDWyU+LP/V8i5sm8cxoZgc2fdNR9bxlOFrQTo= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.36 h1:GMYy2EOWfzdP3wfVAGXBNKY5vK4K8vMET4sYOYltmqs= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.36/go.mod h1:gDhdAV6wL3PmPqBhiPbnlS447GoWs8HTTOYef9/9Inw= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.4 h1:CXV68E2dNqhuynZJPB80bhPQwAKqBWVer887figW6Jc= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.4/go.mod h1:/xFi9KtvBXP97ppCz1TAEvU1Uf66qvid89rbem3wCzQ= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.4 h1:nAP2GYbfh8dd2zGZqFRSMlq+/F6cMPBUuCsGAMkN074= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.4/go.mod h1:LT10DsiGjLWh4GbjInf9LQejkYEhBgBCjLG5+lvk4EE= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.17 h1:t0E6FzREdtCsiLIoLCWsYliNsRBgyGD/MCK571qk4MI= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.17/go.mod h1:ygpklyoaypuyDvOM5ujWGrYWpAK3h7ugnmKCU/76Ys4= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.17 h1:qcLWgdhq45sDM9na4cvXax9dyLitn8EYBRl8Ak4XtG4= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.17/go.mod h1:M+jkjBFZ2J6DJrjMv2+vkBbuht6kxJYtJiwoVgX4p4U= +github.com/aws/aws-sdk-go-v2/service/s3 v1.83.0 h1:5Y75q0RPQoAbieyOuGLhjV9P3txvYgXv2lg0UwJOfmE= +github.com/aws/aws-sdk-go-v2/service/s3 v1.83.0/go.mod h1:kUklwasNoCn5YpyAqC/97r6dzTA1SRKJfKq16SXeoDU= +github.com/aws/aws-sdk-go-v2/service/sso v1.25.5 h1:AIRJ3lfb2w/1/8wOOSqYb9fUKGwQbtysJ2H1MofRUPg= +github.com/aws/aws-sdk-go-v2/service/sso v1.25.5/go.mod h1:b7SiVprpU+iGazDUqvRSLf5XmCdn+JtT1on7uNL6Ipc= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.3 h1:BpOxT3yhLwSJ77qIY3DoHAQjZsc4HEGfMCE4NGy3uFg= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.3/go.mod h1:vq/GQR1gOFLquZMSrxUK/cpvKCNVYibNyJ1m7JrU88E= +github.com/aws/aws-sdk-go-v2/service/sts v1.34.0 h1:NFOJ/NXEGV4Rq//71Hs1jC/NvPs1ezajK+yQmkwnPV0= +github.com/aws/aws-sdk-go-v2/service/sts v1.34.0/go.mod h1:7ph2tGpfQvwzgistp2+zga9f+bCjlQJPkPUmMgDSD7w= +github.com/aws/smithy-go v1.22.4 h1:uqXzVZNuNexwc/xrh6Tb56u89WDlJY6HS+KC0S4QSjw= +github.com/aws/smithy-go v1.22.4/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bitly/go-hostpool v0.1.0/go.mod h1:4gOCgp6+NZnVqlKyZ/iBZFTAJKembaVENUpMkpg42fw= diff --git a/services/galexie/CHANGELOG.md b/services/galexie/CHANGELOG.md index 8040d43cb8..0de296ee06 100644 --- a/services/galexie/CHANGELOG.md +++ b/services/galexie/CHANGELOG.md @@ -3,6 +3,10 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). +## Unreleased + +- Add implementation for S3 and S3-compatible data storage. + ## [v1.0.0] - 🎉 First release! diff --git a/services/galexie/DEVELOPER_GUIDE.md b/services/galexie/DEVELOPER_GUIDE.md index 6936bdb3b7..90d7cce02e 100644 --- a/services/galexie/DEVELOPER_GUIDE.md +++ b/services/galexie/DEVELOPER_GUIDE.md @@ -1,4 +1,3 @@ - # Galexie Developer Guide Galexie is a tool to export Stellar network transaction data to cloud storage in a way that is easy to access. @@ -15,7 +14,7 @@ The goal of Galexie is to build an easy-to-use tool to export Stellar network le To achieve its goals, Galexie uses the following architecture, which consists of the 3 main components: - Captive-core to extract raw transaction metadata from the Stellar Network. - Export manager to bundles and organizes the ledgers to get them ready for export. -- The cloud storage plugin writes to the cloud storage. This is specific to the type of cloud storage, GCS in this case. +- The cloud storage plugin writes to the cloud storage. This supports multiple storage backends including Google Cloud Storage (GCS), Amazon S3, and S3-compatible storage services. ![Architecture](./architecture.png) @@ -29,8 +28,19 @@ To achieve its goals, Galexie uses the following architecture, which consists of - Objects are compressed before uploading using the [zstd](http://facebook.github.io/zstd/) (zstandard) compression algorithm to optimize network usage and storage needs. ## Data Storage -- An example implementation of `DataStore` for GCS, Google Cloud Storage. This plugin is located in the [support](https://github.com/stellar/go/tree/master/support/datastore) package. -- Galexie currently implements the interface only for Google Cloud Storage (GCS). The [GCS plugin](https://github.com/stellar/go/blob/master/support/datastore/gcs_datastore.go) uses GCS-specific behaviors like conditional puts, automatic retry, metadata, and CRC checksum. +Galexie implements a pluggable data storage architecture through the `DataStore` interface. This plugin is located in the [support](https://github.com/stellar/go/tree/master/support/datastore) package and supports multiple storage backends: + +### Google Cloud Storage (GCS) +- The [GCS plugin](https://github.com/stellar/go/blob/master/support/datastore/gcs_datastore.go) uses GCS-specific behaviors like conditional puts, automatic retry, metadata, and CRC checksum validation. + +### Amazon S3 and S3-Compatible Storage +- The [S3 plugin](https://github.com/stellar/go/blob/master/support/datastore/s3_datastore.go) supports AWS S3 and S3-compatible storage services such as: + - Amazon S3 + - Cloudflare R2 + - MinIO + - DigitalOcean Spaces + - Any other S3-compatible object storage +- Features include conditional puts using `If-None-Match` headers, metadata storage, CRC32C checksum validation, and automatic retry mechanisms. ## Build and Run using Docker The Dockerfile contains all the necessary dependencies (e.g., Stellar-core) required to run Galexie. @@ -59,7 +69,7 @@ $ GALEXIE_INTEGRATION_TESTS_ENABLED=true go test -v -race -run TestGalexieTestSu ``` ## Adding support for a new storage type -Support for different data storage types are encapsulated as 'plugins', which are implementation of `DataStore` interface in a go package. To add a data storage plugin based on a new storage type (e.g. AWS S3), follow these steps: +Support for different data storage types are encapsulated as 'plugins', which are implementation of `DataStore` interface in a go package. To add a data storage plugin based on a new storage type, follow these steps: - A data storage plugin must implement the [DataStore](https://github.com/stellar/go/blob/master/support/datastore/datastore.go) interface. - Add support for new datastore-specific features. Implement any datastore-specific custom logic. Different datastores have different ways of handling diff --git a/services/galexie/config.example.toml b/services/galexie/config.example.toml index e8d6379807..4d2ad84fcf 100644 --- a/services/galexie/config.example.toml +++ b/services/galexie/config.example.toml @@ -7,13 +7,24 @@ admin_port = 6061 # Datastore Configuration [datastore_config] -# Specifies the type of datastore. Currently, only Google Cloud Storage (GCS) is supported. +# Specifies the type of datastore. Currently, Google Cloud Storage (GCS) and s3-compatible storage (S3) are supported. type = "GCS" [datastore_config.params] +# params required for GCS storage # The Google Cloud Storage bucket path for storing data, with optional subpaths for organization. destination_bucket_path = "your-bucket-name///" +# params required for S3-compatible storage +# Check https://docs.aws.amazon.com/cli/latest/topic/config-vars.html#the-shared-credentials-file for more information on how to set up credentials. +# The S3 bucket path for storing data, with optional subpaths for organization. +#destination_bucket_path = "your-bucket-name///" +# The region where the S3 bucket is located. +#region = "us-west-1" # Example region, change as needed. +# The endpoint URL for the S3-compatible storage. If you are using Amazon S3, you can leave this commented out. +# The below example is for Cloudflare R2, but you can replace it with your S3-compatible storage endpoint. +#endpoint_url = "https://00000000000000000000000000000000.cloudflarestorage.com" + [datastore_config.schema] # Configuration for data organization ledgers_per_file = 1 # Number of ledgers stored in each file. diff --git a/support/datastore/datastore.go b/support/datastore/datastore.go index 961ba99545..7b10ceda35 100644 --- a/support/datastore/datastore.go +++ b/support/datastore/datastore.go @@ -29,11 +29,10 @@ type DataStore interface { func NewDataStore(ctx context.Context, datastoreConfig DataStoreConfig) (DataStore, error) { switch datastoreConfig.Type { case "GCS": - destinationBucketPath, ok := datastoreConfig.Params["destination_bucket_path"] - if !ok { - return nil, errors.Errorf("Invalid GCS config, no destination_bucket_path") - } - return NewGCSDataStore(ctx, destinationBucketPath, datastoreConfig.Schema) + return NewGCSDataStore(ctx, datastoreConfig) + case "S3": + return NewS3DataStore(ctx, datastoreConfig) + default: return nil, errors.Errorf("Invalid datastore type %v, not supported", datastoreConfig.Type) } diff --git a/support/datastore/gcs_datastore.go b/support/datastore/gcs_datastore.go index ab1bc669b5..d4d588bd37 100644 --- a/support/datastore/gcs_datastore.go +++ b/support/datastore/gcs_datastore.go @@ -27,13 +27,18 @@ type GCSDataStore struct { schema DataStoreSchema } -func NewGCSDataStore(ctx context.Context, bucketPath string, schema DataStoreSchema) (DataStore, error) { +func NewGCSDataStore(ctx context.Context, dataStoreConfig DataStoreConfig) (DataStore, error) { + destinationBucketPath, ok := dataStoreConfig.Params["destination_bucket_path"] + if !ok { + return nil, errors.New("invalid GCS config, no destination_bucket_path") + } + client, err := storage.NewClient(ctx) if err != nil { return nil, err } - return FromGCSClient(ctx, client, bucketPath, schema) + return FromGCSClient(ctx, client, destinationBucketPath, dataStoreConfig.Schema) } func FromGCSClient(ctx context.Context, client *storage.Client, bucketPath string, schema DataStoreSchema) (DataStore, error) { diff --git a/support/datastore/gcs_test.go b/support/datastore/gcs_test.go index 618b5d602a..3ea73b7f3c 100644 --- a/support/datastore/gcs_test.go +++ b/support/datastore/gcs_test.go @@ -377,11 +377,3 @@ func TestGCSGetFileValidatesCRC32C(t *testing.T) { _, err = io.Copy(&buf, reader) require.EqualError(t, err, "storage: bad CRC on read: got 985946173, want 2601510353") } - -func requireReaderContentEquals(t *testing.T, reader io.ReadCloser, expected []byte) { - var buf bytes.Buffer - _, err := io.Copy(&buf, reader) - require.NoError(t, err) - require.NoError(t, reader.Close()) - require.Equal(t, expected, buf.Bytes()) -} diff --git a/support/datastore/helpers_test.go b/support/datastore/helpers_test.go new file mode 100644 index 0000000000..f462c811c3 --- /dev/null +++ b/support/datastore/helpers_test.go @@ -0,0 +1,18 @@ +package datastore + +import ( + "bytes" + "io" + "testing" + + "github.com/stretchr/testify/require" +) + +// requireReaderContentEquals is a helper function to assert reader content. +func requireReaderContentEquals(t *testing.T, reader io.ReadCloser, expected []byte) { + var buf bytes.Buffer + _, err := io.Copy(&buf, reader) + require.NoError(t, err) + require.NoError(t, reader.Close()) + require.Equal(t, expected, buf.Bytes()) +} diff --git a/support/datastore/s3_datastore.go b/support/datastore/s3_datastore.go new file mode 100644 index 0000000000..991251920d --- /dev/null +++ b/support/datastore/s3_datastore.go @@ -0,0 +1,239 @@ +package datastore + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "os" + "path" + "strings" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/feature/s3/manager" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/aws/smithy-go" + "github.com/stellar/go/support/log" + "github.com/stellar/go/support/url" +) + +// S3DataStore implements DataStore for AWS S3 and S3-compatible services. +type S3DataStore struct { + client *s3.Client + uploader *manager.Uploader + bucket string + prefix string + schema DataStoreSchema +} + +func NewS3DataStore(ctx context.Context, datastoreConfig DataStoreConfig) (DataStore, error) { + destinationBucketPath, ok := datastoreConfig.Params["destination_bucket_path"] + if !ok { + return nil, errors.New("invalid S3 config, no destination_bucket_path") + } + region, ok := datastoreConfig.Params["region"] + if !ok { + return nil, errors.New("invalid S3 config, no region") + } + // endpoint_url is optional, if not provided it will use the default AWS S3 endpoint. + endpointUrl := datastoreConfig.Params["endpoint_url"] + + cfg, err := config.LoadDefaultConfig(ctx) + if err != nil { + return nil, err + } + + client := s3.NewFromConfig(cfg, func(o *s3.Options) { + if endpointUrl != "" { + o.BaseEndpoint = aws.String(endpointUrl) + } + o.Region = region + o.UsePathStyle = true + }) + + return FromS3Client(ctx, client, destinationBucketPath, datastoreConfig.Schema) +} + +func FromS3Client(ctx context.Context, client *s3.Client, bucketPath string, schema DataStoreSchema) (DataStore, error) { + s3BucketURL := fmt.Sprintf("s3://%s", bucketPath) + parsed, err := url.Parse(s3BucketURL) + if err != nil { + return nil, err + } + + prefix := strings.TrimPrefix(parsed.Path, "/") + bucketName := parsed.Host + uploader := manager.NewUploader(client) + + input := &s3.HeadBucketInput{Bucket: aws.String(bucketName)} + _, err = client.HeadBucket(ctx, input) + if err != nil { + return nil, fmt.Errorf("failed to head bucket, the bucket may not exist or you may not have access: %w", err) + } + + return S3DataStore{client: client, uploader: uploader, bucket: bucketName, prefix: prefix, schema: schema}, nil +} + +// GetFileMetadata retrieves the metadata for the specified file in the S3-compatible bucket. +func (b S3DataStore) GetFileMetadata(ctx context.Context, filePath string) (map[string]string, error) { + filePath = path.Join(b.prefix, filePath) + input := &s3.HeadObjectInput{ + Bucket: aws.String(b.bucket), + Key: aws.String(filePath), + } + + output, err := b.client.HeadObject(ctx, input) + if err != nil { + if isNotFoundError(err) { + return nil, os.ErrNotExist + } + return nil, err + } + + return output.Metadata, nil +} + +// GetFile retrieves a file from the S3-compatible bucket. +func (b S3DataStore) GetFile(ctx context.Context, filePath string) (io.ReadCloser, error) { + filePath = path.Join(b.prefix, filePath) + input := &s3.GetObjectInput{ + Bucket: aws.String(b.bucket), + Key: aws.String(filePath), + ChecksumMode: types.ChecksumModeEnabled, // Enable checksum validation + } + + output, err := b.client.GetObject(ctx, input) + if err != nil { + if isNotFoundError(err) { + return nil, os.ErrNotExist + } + return nil, err + } + + return output.Body, nil +} + +// PutFile uploads a file to S3-compatible bucket +func (b S3DataStore) PutFile(ctx context.Context, filePath string, in io.WriterTo, metaData map[string]string) error { + err := b.putFile(ctx, filePath, in, false, metaData) + + if err != nil { + var apiErr smithy.APIError + if errors.As(err, &apiErr) { + log.Errorf("S3 error: %s %s %s", apiErr.ErrorCode(), apiErr.ErrorMessage(), apiErr.Error()) + } + return fmt.Errorf("error uploading file %s: %w", filePath, err) + } + + log.Infof("File uploaded successfully: %s", filePath) + return nil +} + +// PutFileIfNotExists uploads a file to S3-compatible bucket only if it doesn't already exist. +func (b S3DataStore) PutFileIfNotExists(ctx context.Context, filePath string, in io.WriterTo, metaData map[string]string) (bool, error) { + err := b.putFile(ctx, filePath, in, true, metaData) + if err != nil { + var apiErr smithy.APIError + if errors.As(err, &apiErr) { + if apiErr.ErrorCode() == "PreconditionFailed" { + log.Infof("Precondition failed: %s already exists in the bucket", filePath) + return false, nil // Treat as success + } else { + log.Errorf("S3 error: %s %s %s", apiErr.ErrorCode(), apiErr.ErrorMessage(), apiErr.Error()) + } + } + return false, fmt.Errorf("error uploading file %s: %w", filePath, err) + } + + log.Infof("File uploaded successfully: %s", filePath) + return true, nil +} + +// Exists checks if a file exists in the S3-compatible bucket. +func (b S3DataStore) Exists(ctx context.Context, filePath string) (bool, error) { + filePath = path.Join(b.prefix, filePath) + input := &s3.HeadObjectInput{ + Bucket: aws.String(b.bucket), + Key: aws.String(filePath), + } + + _, err := b.client.HeadObject(ctx, input) + if err == nil { + return true, nil + } + + if isNotFoundError(err) { + return false, nil + } + + return false, err +} + +// Size retrieves the size of a file in the S3-compatible bucket. +func (b S3DataStore) Size(ctx context.Context, filePath string) (int64, error) { + filePath = path.Join(b.prefix, filePath) + input := &s3.HeadObjectInput{ + Bucket: aws.String(b.bucket), + Key: aws.String(filePath), + } + + output, err := b.client.HeadObject(ctx, input) + if err != nil { + if isNotFoundError(err) { + return 0, os.ErrNotExist + } + return 0, err + } + + return *output.ContentLength, nil +} + +// GetSchema returns the schema information which defines the structure +// and organization of data in the datastore. +func (b S3DataStore) GetSchema() DataStoreSchema { + return b.schema +} + +// Close does nothing for S3DataStore as it does not maintain a persistent connection. +func (b S3DataStore) Close() error { + return nil +} + +func (b S3DataStore) putFile(ctx context.Context, filePath string, in io.WriterTo, onlyIfFileDoesNotExist bool, metaData map[string]string) error { + filePath = path.Join(b.prefix, filePath) + + buf := &bytes.Buffer{} + // The files here are usually quite small, so there is no problem at the moment, but it would be best to optimize it in the future. + if _, err := in.WriteTo(buf); err != nil { + return fmt.Errorf("failed to write file %s: %w", filePath, err) + } + + // According to the document, the SDK will automatically add a checksum. + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html + input := &s3.PutObjectInput{ + Bucket: aws.String(b.bucket), + Key: aws.String(filePath), + Body: buf, + Metadata: metaData, + ChecksumAlgorithm: types.ChecksumAlgorithmCrc32c, + } + + if onlyIfFileDoesNotExist { + input.IfNoneMatch = aws.String("*") + } + + _, err := b.uploader.Upload(ctx, input) + return err +} + +func isNotFoundError(err error) bool { + var noSuchKeyErr *types.NoSuchKey // for getObject + var notFoundErr *types.NotFound // for headObject + if errors.As(err, &noSuchKeyErr) || errors.As(err, ¬FoundErr) { + return true + } + return false +} diff --git a/support/datastore/s3_datastore_test.go b/support/datastore/s3_datastore_test.go new file mode 100644 index 0000000000..b19e558e58 --- /dev/null +++ b/support/datastore/s3_datastore_test.go @@ -0,0 +1,438 @@ +package datastore + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "os" + "strings" + "sync" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/stretchr/testify/require" +) + +// mockS3Object stores object data and metadata within the mock server. +type mockS3Object struct { + body []byte + metadata map[string]string + crc32c string +} + +// mockS3Server is our mock S3 server, holding an in-memory "bucket". +type mockS3Server struct { + mu sync.Mutex + objects map[string]mockS3Object +} + +// ServeHTTP is the core logic of the mock server, handling all incoming HTTP requests. +func (s *mockS3Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.mu.Lock() + defer s.mu.Unlock() + + pathParts := strings.SplitN(strings.TrimPrefix(r.URL.Path, "/"), "/", 2) + + switch r.Method { + case http.MethodHead: + // See https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadBucket.html + // See https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html + s.handleHeadRequest(w, pathParts) + case http.MethodGet: + // See https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html + s.handleGetRequest(w, pathParts) + case http.MethodPut: + // See https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html + s.handlePutRequest(w, r, pathParts) + default: + w.WriteHeader(http.StatusMethodNotAllowed) + } +} + +func (s *mockS3Server) handleHeadRequest(w http.ResponseWriter, pathParts []string) { + // Handle HeadBucket: A request with no key part in the path. + // We assume the bucket always exists in the mock. + if len(pathParts) < 2 || pathParts[1] == "" { + w.WriteHeader(http.StatusOK) + return + } + + // Handle HeadObject: A request with a key. + key := pathParts[1] + obj, exists := s.objects[key] + if !exists { + w.WriteHeader(http.StatusNotFound) + return + } + + s.setObjectHeaders(w, obj) + w.WriteHeader(http.StatusOK) +} + +func (s *mockS3Server) handleGetRequest(w http.ResponseWriter, pathParts []string) { + if len(pathParts) < 2 { + http.Error(w, "Invalid path: Key is required for GET", http.StatusBadRequest) + return + } + + key := pathParts[1] + obj, exists := s.objects[key] + if !exists { + s.writeS3NoSuchKeyError(w) + return + } + + s.setObjectHeaders(w, obj) + w.WriteHeader(http.StatusOK) + _, _ = w.Write(obj.body) +} + +func (s *mockS3Server) handlePutRequest(w http.ResponseWriter, r *http.Request, pathParts []string) { + if len(pathParts) < 2 { + http.Error(w, "Invalid path: Key is required for PUT", http.StatusBadRequest) + return + } + + key := pathParts[1] + + // Handle conditional put + if r.Header.Get("If-None-Match") == "*" { + if _, exists := s.objects[key]; exists { + w.WriteHeader(http.StatusPreconditionFailed) + return + } + } + + body, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + metadata := s.extractMetadata(r.Header) + crc32c := r.Header.Get("x-amz-checksum-crc32c") + s.objects[key] = mockS3Object{body: body, metadata: metadata, crc32c: crc32c} + w.WriteHeader(http.StatusOK) +} + +func (s *mockS3Server) setObjectHeaders(w http.ResponseWriter, obj mockS3Object) { + w.Header().Set("Content-Length", fmt.Sprintf("%d", len(obj.body))) + for k, v := range obj.metadata { + w.Header().Set("x-amz-meta-"+k, v) + } + if obj.crc32c != "" { + w.Header().Set("x-amz-checksum-crc32c", obj.crc32c) + } +} + +func (s *mockS3Server) writeS3NoSuchKeyError(w http.ResponseWriter) { + const s3NoSuchKeyError = `NoSuchKeyThe specified key does not exist.` + w.Header().Set("Content-Type", "application/xml") + w.WriteHeader(http.StatusNotFound) + _, _ = w.Write([]byte(s3NoSuchKeyError)) +} + +func (s *mockS3Server) extractMetadata(headers http.Header) map[string]string { + metadata := make(map[string]string) + for k, v := range headers { + if strings.HasPrefix(strings.ToLower(k), "x-amz-meta-") { + metaKey := strings.TrimPrefix(strings.ToLower(k), "x-amz-meta-") + metadata[metaKey] = v[0] + } + } + return metadata +} + +// setupTestS3DataStore is a helper function that initializes the mock server and an S3DataStore instance. +func setupTestS3DataStore(t *testing.T, ctx context.Context, bucketPath string, initObjects map[string]mockS3Object) (DataStore, func()) { + t.Helper() + mockServer := &mockS3Server{ + objects: make(map[string]mockS3Object), + } + // Initialize the mock server with provided objects. + for key, obj := range initObjects { + mockServer.objects[key] = obj + } + server := httptest.NewServer(mockServer) + + cfg, err := config.LoadDefaultConfig(context.TODO(), + config.WithRegion("us-west-1"), + config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("KEY", "SECRET", "")), + ) + require.NoError(t, err) + + client := s3.NewFromConfig(cfg, func(o *s3.Options) { + o.BaseEndpoint = aws.String(server.URL) + o.UsePathStyle = true + }) + + store, err := FromS3Client(ctx, client, bucketPath, DataStoreSchema{}) + require.NoError(t, err) + + teardown := func() { + server.Close() + } + + return store, teardown +} + +func TestS3Exists(t *testing.T) { + ctx := context.Background() + store, teardown := setupTestS3DataStore(t, ctx, "test-bucket/objects/testnet", map[string]mockS3Object{}) + defer teardown() + + content := []byte("inside the file") + err := store.PutFile(ctx, "file.txt", bytes.NewReader(content), nil) + require.NoError(t, err) + + exists, err := store.Exists(ctx, "file.txt") + require.NoError(t, err) + require.True(t, exists) + + exists, err = store.Exists(ctx, "missing-file.txt") + require.NoError(t, err) + require.False(t, exists) +} + +func TestS3Size(t *testing.T) { + ctx := context.Background() + store, teardown := setupTestS3DataStore(t, ctx, "test-bucket/objects/testnet", map[string]mockS3Object{}) + defer teardown() + + content := []byte("inside the file") + err := store.PutFile(ctx, "file.txt", bytes.NewReader(content), nil) + require.NoError(t, err) + + size, err := store.Size(ctx, "file.txt") + require.NoError(t, err) + require.Equal(t, int64(len(content)), size) + + _, err = store.Size(ctx, "missing-file.txt") + require.ErrorIs(t, err, os.ErrNotExist) +} + +func TestS3PutFile(t *testing.T) { + ctx := context.Background() + store, teardown := setupTestS3DataStore(t, ctx, "test-bucket/objects/testnet", map[string]mockS3Object{}) + defer teardown() + + content := []byte("inside the file") + writerTo := &writerToRecorder{ + WriterTo: bytes.NewReader(content), + } + err := store.PutFile(ctx, "file.txt", writerTo, nil) + require.NoError(t, err) + require.Equal(t, int64(len(content)), writerTo.total) + + reader, err := store.GetFile(ctx, "file.txt") + require.NoError(t, err) + requireReaderContentEquals(t, reader, content) + + metadata, err := store.GetFileMetadata(ctx, "file.txt") + require.NoError(t, err) + require.Equal(t, map[string]string(nil), metadata) + + otherContent := []byte("other text") + writerTo = &writerToRecorder{ + WriterTo: bytes.NewReader(otherContent), + } + err = store.PutFile(ctx, "file.txt", writerTo, nil) + require.NoError(t, err) + require.Equal(t, int64(len(otherContent)), writerTo.total) + + reader, err = store.GetFile(ctx, "file.txt") + require.NoError(t, err) + requireReaderContentEquals(t, reader, otherContent) + + metadata, err = store.GetFileMetadata(ctx, "file.txt") + require.NoError(t, err) + require.Equal(t, map[string]string(nil), metadata) +} + +func TestS3PutFileIfNotExists(t *testing.T) { + ctx := context.Background() + store, teardown := setupTestS3DataStore(t, ctx, "test-bucket/objects/testnet", map[string]mockS3Object{}) + defer teardown() + + existingContent := []byte("inside the file") + err := store.PutFile(ctx, "file.txt", bytes.NewReader(existingContent), nil) + require.NoError(t, err) + + newContent := []byte("overwrite the file") + writerTo := &writerToRecorder{ + WriterTo: bytes.NewReader(newContent), + } + ok, err := store.PutFileIfNotExists(ctx, "file.txt", writerTo, nil) + require.NoError(t, err) + require.False(t, ok) + require.Equal(t, int64(len(newContent)), writerTo.total) + + reader, err := store.GetFile(ctx, "file.txt") + require.NoError(t, err) + requireReaderContentEquals(t, reader, existingContent) + + metadata, err := store.GetFileMetadata(ctx, "file.txt") + require.NoError(t, err) + require.Equal(t, map[string]string(nil), metadata) + + writerTo = &writerToRecorder{ + WriterTo: bytes.NewReader(newContent), + } + ok, err = store.PutFileIfNotExists(ctx, "other-file.txt", writerTo, nil) + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, int64(len(newContent)), writerTo.total) + + reader, err = store.GetFile(ctx, "other-file.txt") + require.NoError(t, err) + requireReaderContentEquals(t, reader, newContent) + + metadata, err = store.GetFileMetadata(ctx, "other-file.txt") + require.NoError(t, err) + require.Equal(t, map[string]string(nil), metadata) +} + +func TestS3PutFileWithMetadata(t *testing.T) { + ctx := context.Background() + store, teardown := setupTestS3DataStore(t, ctx, "test-bucket/objects/testnet", map[string]mockS3Object{}) + defer teardown() + + metadataObj := MetaData{ + StartLedger: 1234, + EndLedger: 1234, + StartLedgerCloseTime: 1234, + EndLedgerCloseTime: 1234, + NetworkPassPhrase: "testnet", + CompressionType: "zstd", + ProtocolVersion: 21, + CoreVersion: "v1.2.3", + Version: "1.0.0", + } + + content := []byte("inside the file") + writerTo := &writerToRecorder{ + WriterTo: bytes.NewReader(content), + } + err := store.PutFile(ctx, "file.txt", writerTo, metadataObj.ToMap()) + require.NoError(t, err) + require.Equal(t, int64(len(content)), writerTo.total) + + metadata, err := store.GetFileMetadata(ctx, "file.txt") + require.NoError(t, err) + require.Equal(t, metadataObj.ToMap(), metadata) + + modifiedMetadataObj := MetaData{ + StartLedger: 5678, + EndLedger: 6789, + StartLedgerCloseTime: 1622547800, + EndLedgerCloseTime: 1622548900, + NetworkPassPhrase: "mainnet", + CompressionType: "gzip", + ProtocolVersion: 23, + CoreVersion: "v1.4.0", + Version: "2.0.0", + } + + otherContent := []byte("other text") + writerTo = &writerToRecorder{ + WriterTo: bytes.NewReader(otherContent), + } + err = store.PutFile(ctx, "file.txt", writerTo, modifiedMetadataObj.ToMap()) + require.NoError(t, err) + require.Equal(t, int64(len(otherContent)), writerTo.total) + + metadata, err = store.GetFileMetadata(ctx, "file.txt") + require.NoError(t, err) + require.Equal(t, modifiedMetadataObj.ToMap(), metadata) +} + +func TestS3PutFileIfNotExistsWithMetadata(t *testing.T) { + ctx := context.Background() + store, teardown := setupTestS3DataStore(t, ctx, "test-bucket/objects/testnet", map[string]mockS3Object{}) + defer teardown() + + metadataObj := MetaData{ + StartLedger: 1234, + EndLedger: 1234, + StartLedgerCloseTime: 1234, + EndLedgerCloseTime: 1234, + NetworkPassPhrase: "testnet", + CompressionType: "zstd", + ProtocolVersion: 21, + CoreVersion: "v1.2.3", + Version: "1.0.0", + } + + newContent := []byte("overwrite the file") + writerTo := &writerToRecorder{ + WriterTo: bytes.NewReader(newContent), + } + ok, err := store.PutFileIfNotExists(ctx, "file.txt", writerTo, metadataObj.ToMap()) + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, int64(len(newContent)), writerTo.total) + + metadata, err := store.GetFileMetadata(ctx, "file.txt") + require.NoError(t, err) + require.Equal(t, metadataObj.ToMap(), metadata) + + modifiedMetadataObj := MetaData{ + StartLedger: 5678, + EndLedger: 6789, + StartLedgerCloseTime: 1622547800, + EndLedgerCloseTime: 1622548900, + NetworkPassPhrase: "mainnet", + CompressionType: "gzip", + ProtocolVersion: 23, + CoreVersion: "v1.4.0", + Version: "2.0.0", + } + + writerTo = &writerToRecorder{ + WriterTo: bytes.NewReader(newContent), + } + ok, err = store.PutFileIfNotExists(ctx, "file.txt", writerTo, modifiedMetadataObj.ToMap()) + require.NoError(t, err) + require.False(t, ok) + require.Equal(t, int64(len(newContent)), writerTo.total) + + metadata, err = store.GetFileMetadata(ctx, "file.txt") + require.NoError(t, err) + require.Equal(t, metadataObj.ToMap(), metadata) +} + +func TestS3GetNonExistentFile(t *testing.T) { + ctx := context.Background() + store, teardown := setupTestS3DataStore(t, ctx, "test-bucket/objects/testnet", map[string]mockS3Object{}) + defer teardown() + + _, err := store.GetFile(ctx, "other-file.txt") + require.ErrorIs(t, err, os.ErrNotExist) + + metadata, err := store.GetFileMetadata(ctx, "other-file.txt") + require.ErrorIs(t, err, os.ErrNotExist) + require.Equal(t, map[string]string(nil), metadata) +} + +func TestS3GetFileValidatesCRC32C(t *testing.T) { + ctx := context.Background() + store, teardown := setupTestS3DataStore(t, ctx, "test-bucket/objects/testnet", map[string]mockS3Object{ + "objects/testnet/file.txt": { + body: []byte("hello"), + metadata: map[string]string{}, + crc32c: "VLn+tw==", // invalid CRC32C for the content + }}) + defer teardown() + + reader, err := store.GetFile(ctx, "file.txt") + require.NoError(t, err) + var buf bytes.Buffer + _, err = io.Copy(&buf, reader) + require.EqualError(t, err, "checksum did not match: algorithm CRC32C, expect VLn+tw==, actual mnG7TA==") +}