diff --git a/.github/workflows/galexie-release.yml b/.github/workflows/galexie-release.yml index c2b8f6f53d..5b49ec3abb 100644 --- a/.github/workflows/galexie-release.yml +++ b/.github/workflows/galexie-release.yml @@ -1,4 +1,4 @@ -name: Galexie Release +name: Galexie Release on: push: @@ -16,6 +16,8 @@ jobs: # this is the multi-arch index sha, get it by 'docker buildx imagetools inspect stellar/quickstart:testing' GALEXIE_INTEGRATION_TESTS_QUICKSTART_IMAGE: docker.io/stellar/quickstart:testing@sha256:5333ec87069efd7bb61f6654a801dc093bf0aad91f43a5ba84806d3efe4a6322 GALEXIE_INTEGRATION_TESTS_QUICKSTART_IMAGE_PULL: "false" + GALEXIE_INTEGRATION_TESTS_LOCALSTACK_IMAGE_TAG: "4.6.0" + GALEXIE_INTEGRATION_TESTS_LOCALSTACK_IMAGE_PULL: "false" STELLAR_CORE_VERSION: 22.3.0-2485.e643061a4.focal steps: - name: Set VERSION @@ -29,6 +31,11 @@ jobs: shell: bash run: | docker pull "$GALEXIE_INTEGRATION_TESTS_QUICKSTART_IMAGE" + + - name: Pull LocalStack image + shell: bash + run: docker pull localstack/localstack:${GALEXIE_INTEGRATION_TESTS_LOCALSTACK_IMAGE_TAG} + - name: Install captive core run: | # Workaround for https://github.com/actions/virtual-environments/issues/5245, @@ -42,8 +49,11 @@ jobs: sudo apt-get update && sudo apt-get install -y stellar-core="$STELLAR_CORE_VERSION" echo "Using stellar core version $(stellar-core version)" - - name: Run tests - run: go test -v -race -run TestGalexieTestSuite ./services/galexie/... + - name: Run tests - GCS integration + run: go test -v -race -run TestGalexieGCSTestSuite ./services/galexie/... + + - name: Run tests - S3 integration + run: go test -v -race -run TestGalexieS3TestSuite ./services/galexie/... - name: Build docker run: make -C services/galexie docker-build diff --git a/.github/workflows/galexie.yml b/.github/workflows/galexie.yml index aa17a0fb47..52026f0046 100644 --- a/.github/workflows/galexie.yml +++ b/.github/workflows/galexie.yml @@ -7,8 +7,11 @@ on: jobs: galexie: - name: Test + name: Test ${{ matrix.storage_type }} integration runs-on: ubuntu-22.04 + strategy: + matrix: + storage_type: [ GCS, S3 ] env: CAPTIVE_CORE_DEBIAN_PKG_VERSION: 22.3.0-2485.e643061a4.focal GALEXIE_INTEGRATION_TESTS_ENABLED: "true" @@ -17,6 +20,8 @@ jobs: # this is the multi-arch index sha, get it by 'docker buildx imagetools inspect stellar/quickstart:testing' GALEXIE_INTEGRATION_TESTS_QUICKSTART_IMAGE: docker.io/stellar/quickstart:testing@sha256:5333ec87069efd7bb61f6654a801dc093bf0aad91f43a5ba84806d3efe4a6322 GALEXIE_INTEGRATION_TESTS_QUICKSTART_IMAGE_PULL: "false" + GALEXIE_INTEGRATION_TESTS_LOCALSTACK_IMAGE_TAG: "4.6.0" + GALEXIE_INTEGRATION_TESTS_LOCALSTACK_IMAGE_PULL: "false" steps: - name: Install captive core run: | @@ -36,10 +41,16 @@ jobs: run: | docker pull "$GALEXIE_INTEGRATION_TESTS_QUICKSTART_IMAGE" + - name: Pull LocalStack image (for S3) + if: ${{ matrix.storage_type == 'S3' }} + shell: bash + run: docker pull localstack/localstack:${GALEXIE_INTEGRATION_TESTS_LOCALSTACK_IMAGE_TAG} + - uses: actions/checkout@v3 with: # For pull requests, build and test the PR head not a merge of the PR with the destination. - ref: ${{ github.event.pull_request.head.sha || github.ref }} - - - name: Run test - run: go test -v -race -run TestGalexieTestSuite ./services/galexie/... + ref: ${{ github.event.pull_request.head.sha || github.ref }} + + - name: Run tests - ${{ matrix.storage_type }} integration + run: | + go test -v -race -run "TestGalexie${{ matrix.storage_type }}TestSuite" ./services/galexie/... \ No newline at end of file diff --git a/services/galexie/DEVELOPER_GUIDE.md b/services/galexie/DEVELOPER_GUIDE.md index 90d7cce02e..2e13db9cb9 100644 --- a/services/galexie/DEVELOPER_GUIDE.md +++ b/services/galexie/DEVELOPER_GUIDE.md @@ -65,7 +65,8 @@ Optional, can override the version of quickstart used to run standalone stellar Note, the version of stellar core in `GALEXIE_INTEGRATION_TESTS_QUICKSTART_IMAGE` and `GALEXIE_INTEGRATION_TESTS_CAPTIVE_CORE_BIN` needs to be on the same major rev or the captive core process may not be able to join or parse ledger meta from the `local` network created by `GALEXIE_INTEGRATION_TESTS_QUICKSTART_IMAGE` ``` -$ GALEXIE_INTEGRATION_TESTS_ENABLED=true go test -v -race -run TestGalexieTestSuite ./services/galexie/... +$ GALEXIE_INTEGRATION_TESTS_ENABLED=true go test -v -race -run TestGalexieGCSTestSuite ./services/galexie/... +$ GALEXIE_INTEGRATION_TESTS_ENABLED=true go test -v -race -run TestGalexieS3TestSuite ./services/galexie/... ``` ## Adding support for a new storage type diff --git a/services/galexie/internal/integration_test.go b/services/galexie/internal/integration_test.go index 7c84e13ea0..0ca5c23e11 100644 --- a/services/galexie/internal/integration_test.go +++ b/services/galexie/internal/integration_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "io" + "net/http" "os" "os/signal" "path/filepath" @@ -22,14 +23,16 @@ import ( "github.com/stretchr/testify/suite" "github.com/fsouza/fake-gcs-server/fakestorage" + "github.com/stellar/go/historyarchive" "github.com/stellar/go/support/datastore" "github.com/stellar/go/support/storage" ) const ( - maxWaitForCoreStartup = (180 * time.Second) - coreStartupPingInterval = time.Second + maxWaitForCoreStartup = 180 * time.Second + maxWaitForLocalStackStartup = 60 * time.Second + coreStartupPingInterval = time.Second // set the max ledger we want the standalone network to emit // tests then refer to ledger sequences only up to this, therefore // don't have to do complex waiting within test for a sequence to exist. @@ -37,25 +40,42 @@ const ( configTemplate = "test/integration_config_template.toml" ) -func TestGalexieTestSuite(t *testing.T) { +// TestGalexieGCSTestSuite runs tests with GCS backend +func TestGalexieGCSTestSuite(t *testing.T) { + if os.Getenv("GALEXIE_INTEGRATION_TESTS_ENABLED") != "true" { + t.Skip("skipping integration test: GALEXIE_INTEGRATION_TESTS_ENABLED not true") + } + + galexieGCSSuite := &GalexieTestSuite{ + storageType: "GCS", + } + suite.Run(t, galexieGCSSuite) +} + +// TestGalexieS3TestSuite runs tests with S3 backend +func TestGalexieS3TestSuite(t *testing.T) { if os.Getenv("GALEXIE_INTEGRATION_TESTS_ENABLED") != "true" { t.Skip("skipping integration test: GALEXIE_INTEGRATION_TESTS_ENABLED not true") } - galexieSuite := &GalexieTestSuite{} - suite.Run(t, galexieSuite) + galexieS3Suite := &GalexieTestSuite{ + storageType: "S3", + } + suite.Run(t, galexieS3Suite) } type GalexieTestSuite struct { suite.Suite - tempConfigFile string - ctx context.Context - ctxStop context.CancelFunc - coreContainerID string - dockerCli *client.Client - gcsServer *fakestorage.Server - finishedSetup bool - config Config + tempConfigFile string + ctx context.Context + ctxStop context.CancelFunc + coreContainerID string + localStackContainerID string + dockerCli *client.Client + gcsServer *fakestorage.Server + finishedSetup bool + config Config + storageType string // "GCS" or "S3" } func (s *GalexieTestSuite) TestScanAndFill() { @@ -81,18 +101,46 @@ func (s *GalexieTestSuite) TestScanAndFill() { _, err = datastore.GetFile(s.ctx, "FFFFFFFF--0-9/FFFFFFFA--5.xdr.zstd") require.NoError(err) + + lastModified, err := datastore.GetFileLastModified(s.ctx, "FFFFFFFF--0-9/FFFFFFFA--5.xdr.zstd") + require.NoError(err) + + // now run an scan-and-fill on an overlapping range, it will skip over existing ledgers + rootCmd.SetArgs([]string{"scan-and-fill", "--start", "4", "--end", "9", "--config-file", s.tempConfigFile}) + errWriter.Reset() + rootCmd.SetErr(&errWriter) + outWriter.Reset() + rootCmd.SetOut(&outWriter) + err = rootCmd.ExecuteContext(s.ctx) + require.NoError(err) + + s.T().Log(outWriter.String()) + s.T().Log(errWriter.String()) + + newLastModified, err := datastore.GetFileLastModified(s.ctx, "FFFFFFFF--0-9/FFFFFFFA--5.xdr.zstd") + require.NoError(err) + require.Equal(lastModified, newLastModified) + + _, err = datastore.GetFile(s.ctx, "FFFFFFFF--0-9/FFFFFFF6--9.xdr.zstd") + require.NoError(err) } func (s *GalexieTestSuite) TestAppend() { require := s.Require() - // first populate ledgers 4-5 + // first populate ledgers 6-7 rootCmd := defineCommands() rootCmd.SetArgs([]string{"scan-and-fill", "--start", "6", "--end", "7", "--config-file", s.tempConfigFile}) err := rootCmd.ExecuteContext(s.ctx) require.NoError(err) - // now run an append of overalapping range, it will resume past existing ledgers + datastore, err := datastore.NewDataStore(s.ctx, s.config.DataStoreConfig) + require.NoError(err) + + lastModified, err := datastore.GetFileLastModified(s.ctx, "FFFFFFFF--0-9/FFFFFFF9--6.xdr.zstd") + require.NoError(err) + + // now run an append on an overlapping range, it will resume past existing ledgers rootCmd.SetArgs([]string{"append", "--start", "6", "--end", "9", "--config-file", s.tempConfigFile}) var errWriter bytes.Buffer var outWriter bytes.Buffer @@ -106,8 +154,10 @@ func (s *GalexieTestSuite) TestAppend() { s.T().Log(output) s.T().Log(errOutput) - datastore, err := datastore.NewDataStore(s.ctx, s.config.DataStoreConfig) + // check that the file was not modified + newLastModified, err := datastore.GetFileLastModified(s.ctx, "FFFFFFFF--0-9/FFFFFFF9--6.xdr.zstd") require.NoError(err) + require.Equal(lastModified, newLastModified, "file should not be modified on append of overlapping range") _, err = datastore.GetFile(s.ctx, "FFFFFFFF--0-9/FFFFFFF6--9.xdr.zstd") require.NoError(err) @@ -171,6 +221,7 @@ func (s *GalexieTestSuite) SetupSuite() { os.Getenv("GALEXIE_INTEGRATION_TESTS_CAPTIVE_CORE_BIN")) galexieConfigTemplate.Set("stellar_core_config.storage_path", filepath.Join(testTempDir, "captive-core")) + galexieConfigTemplate.Set("datastore_config.type", s.storageType) tomlBytes, err := toml.Marshal(galexieConfigTemplate) if err != nil { @@ -180,17 +231,44 @@ func (s *GalexieTestSuite) SetupSuite() { t.Fatalf("unable to marshal config file toml into struct, %v", err) } - tempSeedDataPath := filepath.Join(testTempDir, "data") - if err = os.MkdirAll(filepath.Join(tempSeedDataPath, "integration-test"), 0777); err != nil { - t.Fatalf("unable to create seed data in temp path, %v", err) - } - s.tempConfigFile = filepath.Join(testTempDir, "config.toml") err = os.WriteFile(s.tempConfigFile, tomlBytes, 0777) if err != nil { t.Fatalf("unable to write temp config file %v, %v", s.tempConfigFile, err) } + s.dockerCli, err = client.NewClientWithOpts(client.WithAPIVersionNegotiation()) + if err != nil { + t.Fatalf("could not create docker client, %v", err) + } + + if s.storageType == "GCS" { + s.setupGCS(t, testTempDir) + } else if s.storageType == "S3" { + s.setupS3(t) + } + + quickstartImage := os.Getenv("GALEXIE_INTEGRATION_TESTS_QUICKSTART_IMAGE") + if quickstartImage == "" { + quickstartImage = "stellar/quickstart:testing" + } + pullQuickStartImage := true + if os.Getenv("GALEXIE_INTEGRATION_TESTS_QUICKSTART_IMAGE_PULL") == "false" { + pullQuickStartImage = false + } + + s.mustStartCore(t, quickstartImage, pullQuickStartImage) + s.mustWaitForCore(t, galexieConfigTemplate.GetArray("stellar_core_config.history_archive_urls").([]string), + galexieConfigTemplate.Get("stellar_core_config.network_passphrase").(string)) + s.finishedSetup = true +} + +func (s *GalexieTestSuite) setupGCS(t *testing.T, testTempDir string) { + tempSeedDataPath := filepath.Join(testTempDir, "data") + if err := os.MkdirAll(filepath.Join(tempSeedDataPath, "integration-test"), 0777); err != nil { + t.Fatalf("unable to create seed data in temp path, %v", err) + } + testWriter := &testWriter{test: t} opts := fakestorage.Options{ Scheme: "http", @@ -202,6 +280,7 @@ func (s *GalexieTestSuite) SetupSuite() { PublicHost: "127.0.0.1", } + var err error s.gcsServer, err = fakestorage.NewServerWithOptions(opts) if err != nil { @@ -210,54 +289,72 @@ func (s *GalexieTestSuite) SetupSuite() { t.Logf("fake gcs server started at %v", s.gcsServer.URL()) t.Setenv("STORAGE_EMULATOR_HOST", s.gcsServer.URL()) +} - quickstartImage := os.Getenv("GALEXIE_INTEGRATION_TESTS_QUICKSTART_IMAGE") - if quickstartImage == "" { - quickstartImage = "stellar/quickstart:testing" - } - pullQuickStartImage := true - if os.Getenv("GALEXIE_INTEGRATION_TESTS_QUICKSTART_IMAGE_PULL") == "false" { - pullQuickStartImage = false - } +func (s *GalexieTestSuite) setupS3(t *testing.T) { + s.mustStartLocalStack(t) + s.mustWaitForLocalStack(t) + s.createLocalStackBucket(t) - s.mustStartCore(t, quickstartImage, pullQuickStartImage) - s.mustWaitForCore(t, galexieConfigTemplate.GetArray("stellar_core_config.history_archive_urls").([]string), - galexieConfigTemplate.Get("stellar_core_config.network_passphrase").(string)) - s.finishedSetup = true + t.Setenv("AWS_ACCESS_KEY_ID", "KEY_ID") + t.Setenv("AWS_SECRET_ACCESS_KEY", "ACCESS_KEY") } func (s *GalexieTestSuite) TearDownSuite() { + t := s.T() + if s.coreContainerID != "" { - s.T().Logf("Stopping the quickstart container %v", s.coreContainerID) - containerLogs, err := s.dockerCli.ContainerLogs(s.ctx, s.coreContainerID, container.LogsOptions{ShowStdout: true, ShowStderr: true}) - - if err == nil { - var errWriter bytes.Buffer - var outWriter bytes.Buffer - stdcopy.StdCopy(&outWriter, &errWriter, containerLogs) - s.T().Log(outWriter.String()) - s.T().Log(errWriter.String()) - } - if err := s.dockerCli.ContainerStop(context.Background(), s.coreContainerID, container.StopOptions{}); err != nil { - s.T().Logf("unable to stop core container, %v, %v", s.coreContainerID, err) - } + t.Logf("Stopping the quickstart container %v", s.coreContainerID) + s.stopAndLogContainer(s.coreContainerID, "quickstart") + s.coreContainerID = "" + } + + if s.localStackContainerID != "" { + t.Logf("Stopping the localstack container %v", s.localStackContainerID) + s.stopAndLogContainer(s.localStackContainerID, "localstack") + s.localStackContainerID = "" } + if s.dockerCli != nil { s.dockerCli.Close() + s.dockerCli = nil } + if s.gcsServer != nil { s.gcsServer.Stop() + s.gcsServer = nil + } + + if s.ctxStop != nil { + s.ctxStop() } - s.ctxStop() } -func (s *GalexieTestSuite) mustStartCore(t *testing.T, quickstartImage string, pullImage bool) { - var err error - s.dockerCli, err = client.NewClientWithOpts(client.WithAPIVersionNegotiation()) - if err != nil { - t.Fatalf("could not create docker client, %v", err) +func (s *GalexieTestSuite) stopAndLogContainer(containerID, containerType string) { + if s.dockerCli == nil { + return } + containerLogs, err := s.dockerCli.ContainerLogs(s.ctx, containerID, container.LogsOptions{ + ShowStdout: true, + ShowStderr: true, + }) + if err == nil { + var errWriter bytes.Buffer + var outWriter bytes.Buffer + stdcopy.StdCopy(&outWriter, &errWriter, containerLogs) + s.T().Logf("%s container stdout: %s", containerType, outWriter.String()) + s.T().Logf("%s container stderr: %s", containerType, errWriter.String()) + containerLogs.Close() + } + + if err := s.dockerCli.ContainerStop(context.Background(), containerID, container.StopOptions{}); err != nil { + s.T().Logf("unable to stop %s container %v: %v", containerType, containerID, err) + } +} + +func (s *GalexieTestSuite) mustStartCore(t *testing.T, quickstartImage string, pullImage bool) { + var err error if pullImage { imgReader, imgErr := s.dockerCli.ImagePull(s.ctx, quickstartImage, image.PullOptions{}) if imgErr != nil { @@ -344,6 +441,106 @@ func (s *GalexieTestSuite) mustWaitForCore(t *testing.T, archiveUrls []string, p t.Fatalf("core did not progress ledgers within %v seconds", maxWaitForCoreStartup) } +func (s *GalexieTestSuite) mustStartLocalStack(t *testing.T) { + t.Log("Starting LocalStack container...") + imageTag := os.Getenv("GALEXIE_INTEGRATION_TESTS_LOCALSTACK_IMAGE_TAG") + if imageTag == "" { + imageTag = "latest" + } + imageName := "localstack/localstack:" + imageTag + pullImage := os.Getenv("GALEXIE_INTEGRATION_TESTS_LOCALSTACK_IMAGE_PULL") != "false" + if pullImage { + imgReader, err := s.dockerCli.ImagePull(s.ctx, imageName, image.PullOptions{}) + if err != nil { + t.Fatalf("could not pull docker image %s: %v", imageName, err) + } + defer imgReader.Close() + _, err = io.Copy(io.Discard, imgReader) + if err != nil { + t.Fatalf("could not read docker image pull response %s: %v", imageName, err) + } + } + + resp, err := s.dockerCli.ContainerCreate(s.ctx, + &container.Config{ + Image: imageName, + ExposedPorts: nat.PortSet{ + "4566/tcp": {}, + }, + }, + &container.HostConfig{ + PortBindings: nat.PortMap{ + "4566/tcp": {nat.PortBinding{HostIP: "127.0.0.1", HostPort: "4566"}}, + }, + AutoRemove: true, + }, + nil, nil, "") + if err != nil { + t.Fatalf("could not create localstack container: %v", err) + } + s.localStackContainerID = resp.ID + + if err := s.dockerCli.ContainerStart(s.ctx, resp.ID, container.StartOptions{}); err != nil { + t.Fatalf("could not run localstack container: %v", err) + } + t.Logf("Started LocalStack container %v", s.localStackContainerID) +} + +func (s *GalexieTestSuite) mustWaitForLocalStack(t *testing.T) { + t.Log("Waiting for LocalStack to be up...") + healthURL := "http://localhost:4566/_localstack/health" + startTime := time.Now() + + httpClient := &http.Client{Timeout: 5 * time.Second} + + for time.Since(startTime) < maxWaitForLocalStackStartup { + req, err := http.NewRequestWithContext(s.ctx, "GET", healthURL, nil) + if err != nil { + t.Logf("failed to create http request to localstack: %v", err) + time.Sleep(2 * time.Second) + continue + } + + resp, err := httpClient.Do(req) + if err != nil { + t.Logf("failed to connect to localstack: %v", err) + time.Sleep(2 * time.Second) + continue + } + + if resp.StatusCode == http.StatusOK { + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + t.Logf("LocalStack is ready. Health check response: %s", string(body)) + return + } + resp.Body.Close() + t.Logf("LocalStack health check failed with status: %s. Retrying...", resp.Status) + time.Sleep(2 * time.Second) + } + + t.Fatalf("LocalStack did not become ready within %v", maxWaitForLocalStackStartup) +} + +func (s *GalexieTestSuite) createLocalStackBucket(t *testing.T) { + bucketName := "integration-test" + url := "http://localhost:4566/" + bucketName + req, err := http.NewRequestWithContext(s.ctx, "PUT", url, nil) + if err != nil { + t.Fatalf("failed to create request to create bucket: %v", err) + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("failed to create bucket %s: %v", bucketName, err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { + body, _ := io.ReadAll(resp.Body) + t.Fatalf("failed to create bucket %s: %s", bucketName, string(body)) + } + t.Logf("Bucket %s created successfully", bucketName) +} + type testWriter struct { test *testing.T } diff --git a/services/galexie/internal/test/integration_config_template.toml b/services/galexie/internal/test/integration_config_template.toml index 410aa0f7e5..83ba8cd390 100644 --- a/services/galexie/internal/test/integration_config_template.toml +++ b/services/galexie/internal/test/integration_config_template.toml @@ -2,7 +2,11 @@ type = "GCS" [datastore_config.params] +# for both GCS and S3 storage destination_bucket_path = "integration-test/standalone" +# for S3 only +region = "us-east-1" +endpoint_url = "http://localhost:4566" [datastore_config.schema] ledgers_per_file = 1 @@ -12,4 +16,4 @@ files_per_partition = 10 captive_core_toml_path = "test/integration_captive_core.cfg" history_archive_urls = ["http://localhost:1570"] network_passphrase = "Standalone Network ; February 2017" -checkpoint_frequency = 8 \ No newline at end of file +checkpoint_frequency = 8 diff --git a/support/datastore/datastore.go b/support/datastore/datastore.go index 7b10ceda35..3383fc331f 100644 --- a/support/datastore/datastore.go +++ b/support/datastore/datastore.go @@ -3,6 +3,7 @@ package datastore import ( "context" "io" + "time" "github.com/stellar/go/support/errors" ) @@ -16,6 +17,7 @@ type DataStoreConfig struct { // DataStore defines an interface for interacting with data storage type DataStore interface { GetFileMetadata(ctx context.Context, path string) (map[string]string, error) + GetFileLastModified(ctx context.Context, filePath string) (time.Time, error) GetFile(ctx context.Context, path string) (io.ReadCloser, error) PutFile(ctx context.Context, path string, in io.WriterTo, metaData map[string]string) error PutFileIfNotExists(ctx context.Context, path string, in io.WriterTo, metaData map[string]string) (bool, error) diff --git a/support/datastore/gcs_datastore.go b/support/datastore/gcs_datastore.go index d4d588bd37..746356097c 100644 --- a/support/datastore/gcs_datastore.go +++ b/support/datastore/gcs_datastore.go @@ -11,6 +11,7 @@ import ( "os" "path" "strings" + "time" "cloud.google.com/go/storage" "google.golang.org/api/googleapi" @@ -65,8 +66,7 @@ func FromGCSClient(ctx context.Context, client *storage.Client, bucketPath strin return &GCSDataStore{client: client, bucket: bucket, prefix: prefix, schema: schema}, nil } -// GetFileMetadata retrieves the metadata for the specified file in the GCS bucket. -func (b GCSDataStore) GetFileMetadata(ctx context.Context, filePath string) (map[string]string, error) { +func (b GCSDataStore) GetFileAttrs(ctx context.Context, filePath string) (*storage.ObjectAttrs, error) { filePath = path.Join(b.prefix, filePath) attrs, err := b.bucket.Object(filePath).Attrs(ctx) if err != nil { @@ -74,9 +74,27 @@ func (b GCSDataStore) GetFileMetadata(ctx context.Context, filePath string) (map return nil, os.ErrNotExist } } + return attrs, nil +} + +// GetFileMetadata retrieves the metadata for the specified file in the GCS bucket. +func (b GCSDataStore) GetFileMetadata(ctx context.Context, filePath string) (map[string]string, error) { + attrs, err := b.GetFileAttrs(ctx, filePath) + if err != nil { + return nil, err + } return attrs.Metadata, nil } +// GetFileLastModified retrieves the last modified time of a file in the GCS bucket. +func (b GCSDataStore) GetFileLastModified(ctx context.Context, filePath string) (time.Time, error) { + attrs, err := b.GetFileAttrs(ctx, filePath) + if err != nil { + return time.Time{}, err + } + return attrs.Updated, nil +} + // GetFile retrieves a file from the GCS bucket. func (b GCSDataStore) GetFile(ctx context.Context, filePath string) (io.ReadCloser, error) { filePath = path.Join(b.prefix, filePath) diff --git a/support/datastore/gcs_test.go b/support/datastore/gcs_test.go index 3ea73b7f3c..1166cc1ab3 100644 --- a/support/datastore/gcs_test.go +++ b/support/datastore/gcs_test.go @@ -178,6 +178,33 @@ func TestGCSPutFileIfNotExists(t *testing.T) { require.Equal(t, map[string]string(nil), metadata) } +func TestGCSGetFileLastModified(t *testing.T) { + server := fakestorage.NewServer([]fakestorage.Object{}) + defer server.Stop() + server.CreateBucketWithOpts(fakestorage.CreateBucketOpts{ + Name: "test-bucket", + VersioningEnabled: false, + DefaultEventBasedHold: false, + }) + + store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet", DataStoreSchema{}) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, store.Close()) + }) + + content := []byte("inside the file") + writerTo := &writerToRecorder{ + WriterTo: bytes.NewReader(content), + } + err = store.PutFile(context.Background(), "file.txt", writerTo, nil) + require.NoError(t, err) + + lastModified, err := store.GetFileLastModified(context.Background(), "file.txt") + require.NoError(t, err) + require.NotZero(t, lastModified) +} + func TestGCSPutFileWithMetadata(t *testing.T) { server := fakestorage.NewServer([]fakestorage.Object{}) defer server.Stop() diff --git a/support/datastore/mocks.go b/support/datastore/mocks.go index a8c10438ab..2431ecac44 100644 --- a/support/datastore/mocks.go +++ b/support/datastore/mocks.go @@ -3,6 +3,7 @@ package datastore import ( "context" "io" + "time" "github.com/stretchr/testify/mock" ) @@ -27,6 +28,14 @@ func (m *MockDataStore) GetFileMetadata(ctx context.Context, path string) (map[s return args.Get(0).(map[string]string), args.Error(1) } +func (m *MockDataStore) GetFileLastModified(ctx context.Context, filePath string) (time.Time, error) { + args := m.Called(ctx, filePath) + if args.Get(0) != nil { + return args.Get(0).(time.Time), args.Error(1) + } + return time.Time{}, args.Error(1) +} + func (m *MockDataStore) GetFile(ctx context.Context, path string) (io.ReadCloser, error) { args := m.Called(ctx, path) closer := (io.ReadCloser)(nil) diff --git a/support/datastore/s3_datastore.go b/support/datastore/s3_datastore.go index b6c9d38ed8..145dc6e366 100644 --- a/support/datastore/s3_datastore.go +++ b/support/datastore/s3_datastore.go @@ -10,6 +10,7 @@ import ( "os" "path" "strings" + "time" "github.com/aws/aws-sdk-go-v2/aws" awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http" @@ -116,8 +117,7 @@ func FromS3Client(ctx context.Context, client *s3.Client, bucketPath string, sch return S3DataStore{client: client, uploader: uploader, bucket: bucketName, prefix: prefix, schema: schema}, nil } -// GetFileMetadata retrieves the metadata for the specified file in the S3-compatible bucket. -func (b S3DataStore) GetFileMetadata(ctx context.Context, filePath string) (map[string]string, error) { +func (b S3DataStore) HeadObject(ctx context.Context, filePath string) (*s3.HeadObjectOutput, error) { filePath = path.Join(b.prefix, filePath) input := &s3.HeadObjectInput{ Bucket: aws.String(b.bucket), @@ -132,7 +132,25 @@ func (b S3DataStore) GetFileMetadata(ctx context.Context, filePath string) (map[ return nil, err } - return output.Metadata, nil + return output, nil +} + +// GetFileMetadata retrieves the metadata for the specified file in the S3-compatible bucket. +func (b S3DataStore) GetFileMetadata(ctx context.Context, filePath string) (map[string]string, error) { + attrs, err := b.HeadObject(ctx, filePath) + if err != nil { + return nil, err + } + return attrs.Metadata, nil +} + +// GetFileLastModified retrieves the last modified time of a file in the S3-compatible bucket. +func (b S3DataStore) GetFileLastModified(ctx context.Context, filePath string) (time.Time, error) { + attrs, err := b.HeadObject(ctx, filePath) + if err != nil { + return time.Time{}, err + } + return *attrs.LastModified, nil } // GetFile retrieves a file from the S3-compatible bucket. diff --git a/support/datastore/s3_datastore_test.go b/support/datastore/s3_datastore_test.go index ab3ad24e25..40b81db645 100644 --- a/support/datastore/s3_datastore_test.go +++ b/support/datastore/s3_datastore_test.go @@ -11,6 +11,7 @@ import ( "strings" "sync" "testing" + "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" @@ -21,9 +22,10 @@ import ( // mockS3Object stores object data and metadata within the mock server. type mockS3Object struct { - body []byte - metadata map[string]string - crc32c string + body []byte + metadata map[string]string + crc32c string + lastModified time.Time } // mockS3Server is our mock S3 server, holding an in-memory "bucket". @@ -115,7 +117,12 @@ func (s *mockS3Server) handlePutRequest(w http.ResponseWriter, r *http.Request, metadata := s.extractMetadata(r.Header) crc32c := r.Header.Get("x-amz-checksum-crc32c") - s.objects[key] = mockS3Object{body: body, metadata: metadata, crc32c: crc32c} + s.objects[key] = mockS3Object{ + body: body, + metadata: metadata, + crc32c: crc32c, + lastModified: time.Now(), + } w.WriteHeader(http.StatusOK) } @@ -127,6 +134,9 @@ func (s *mockS3Server) setObjectHeaders(w http.ResponseWriter, obj mockS3Object) if obj.crc32c != "" { w.Header().Set("x-amz-checksum-crc32c", obj.crc32c) } + if !obj.lastModified.IsZero() { + w.Header().Set("Last-Modified", obj.lastModified.UTC().Format(http.TimeFormat)) + } } func (s *mockS3Server) writeS3NoSuchKeyError(w http.ResponseWriter) { @@ -155,6 +165,10 @@ func setupTestS3DataStore(t *testing.T, ctx context.Context, bucketPath string, } // Initialize the mock server with provided objects. for key, obj := range initObjects { + // Ensure lastModified is set if not already set + if obj.lastModified.IsZero() { + obj.lastModified = time.Now() + } mockServer.objects[key] = obj } server := httptest.NewServer(mockServer) @@ -253,6 +267,20 @@ func TestS3PutFile(t *testing.T) { require.Equal(t, map[string]string(nil), metadata) } +func TestS3GetFileLastModified(t *testing.T) { + ctx := context.Background() + store, teardown := setupTestS3DataStore(t, ctx, "test-bucket/objects/testnet", map[string]mockS3Object{}) + defer teardown() + + content := []byte("inside the file") + err := store.PutFile(ctx, "file.txt", bytes.NewReader(content), nil) + require.NoError(t, err) + + lastModified, err := store.GetFileLastModified(context.Background(), "file.txt") + require.NoError(t, err) + require.NotZero(t, lastModified) +} + func TestS3PutFileIfNotExists(t *testing.T) { ctx := context.Background() store, teardown := setupTestS3DataStore(t, ctx, "test-bucket/objects/testnet", map[string]mockS3Object{}) @@ -422,9 +450,10 @@ func TestS3GetFileValidatesCRC32C(t *testing.T) { ctx := context.Background() store, teardown := setupTestS3DataStore(t, ctx, "test-bucket/objects/testnet", map[string]mockS3Object{ "objects/testnet/file.txt": { - body: []byte("hello"), - metadata: map[string]string{}, - crc32c: "VLn+tw==", // invalid CRC32C for the content + body: []byte("hello"), + metadata: map[string]string{}, + crc32c: "VLn+tw==", // invalid CRC32C for the content + lastModified: time.Now(), }}) defer teardown()