Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 86 additions & 22 deletions executor/task_dataset_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@ package executor

import (
"context"
"crypto/rand"
"fmt"
"math/big"
"net/http"
"strconv"
"strings"
"time"

"github.com/ONSdigital/dis-migration-service/application"
"github.com/ONSdigital/dis-migration-service/clients"
Expand All @@ -15,6 +22,11 @@ import (
"github.com/ONSdigital/log.go/v2/log"
)

const (
maxRetries = 5
baseDelay = 50 * time.Millisecond
)

// DatasetDownloadTaskExecutor executes migration tasks for dataset downloads.
type DatasetDownloadTaskExecutor struct {
jobService application.JobService
Expand Down Expand Up @@ -131,10 +143,6 @@ func (e *DatasetDownloadTaskExecutor) updateDownloadMetadata(ctx context.Context
return appErrors.ErrInvalidTask
}

headers := datasetSDK.Headers{
AccessToken: e.serviceAuthToken,
}

logData := log.Data{
"task_id": task.ID,
"dataset_id": task.Target.DatasetID,
Expand All @@ -143,12 +151,73 @@ func (e *DatasetDownloadTaskExecutor) updateDownloadMetadata(ctx context.Context
"title": distribution.Title,
}

//TODO: use the eTag here to prevent collisions. SDK needs to support it first.
currentVersion, _, err := e.clientList.DatasetAPI.GetVersionWithHeaders(ctx, headers, task.Target.DatasetID, task.Target.EditionID, task.Target.VersionID)
for attempt := 0; attempt <= maxRetries; attempt++ {
if attempt > 0 {
delay := getRandomDelay(baseDelay)

log.Info(ctx, "ETag conflict, retrying after delay", log.Data{
"task_id": task.ID,
"attempt": attempt,
"delay": delay.String(),
})

select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(delay):
}
}

err := e.tryUpdateDownloadMetadata(ctx, task, distribution, logData)
if err == nil {
return nil
}

if !isConflictError(err) {
return err
}
}

return fmt.Errorf("failed to update download metadata after %d attempts", maxRetries)
}

// getRandomDelay returns a random duration between 0 and baseDelay
func getRandomDelay(baseDelay time.Duration) time.Duration {
maxDelay := big.NewInt(int64(baseDelay))
n, err := rand.Int(rand.Reader, maxDelay)
if err != nil {
return baseDelay
}
return time.Duration(n.Int64())
}

// tryUpdateDownloadMetadata attempts a single update
// of the dataset version with the distribution.
func (e *DatasetDownloadTaskExecutor) tryUpdateDownloadMetadata(ctx context.Context, task *domain.Task, distribution datasetModels.Distribution, logData log.Data) error {
headers := datasetSDK.Headers{
AccessToken: e.serviceAuthToken,
}

currentVersion, respHeaders, err := e.clientList.DatasetAPI.GetVersionWithHeaders(ctx, headers, task.Target.DatasetID, task.Target.EditionID, task.Target.VersionID)
if err != nil {
return err
}

e.applyDistributionUpdate(ctx, &currentVersion, distribution, logData)

// Update version with eTag
eTag := respHeaders.ETag
log.Info(ctx, "updating dataset version with new distribution", log.Data{"task_id": task.ID, "e_tag": eTag})

headers.IfMatch = eTag

_, err = e.clientList.DatasetAPI.PutVersion(ctx, headers, task.Target.DatasetID, task.Target.EditionID, task.Target.VersionID, currentVersion)
return err
}

// applyDistributionUpdate modifies the version's distributions
// with the new distribution.
func (e *DatasetDownloadTaskExecutor) applyDistributionUpdate(ctx context.Context, currentVersion *datasetModels.Version, distribution datasetModels.Distribution, logData log.Data) {
// Initialize distributions slice if nil
if currentVersion.Distributions == nil {
distributions := []datasetModels.Distribution{distribution}
Expand All @@ -165,17 +234,11 @@ func (e *DatasetDownloadTaskExecutor) updateDownloadMetadata(ctx context.Context
// Distribution exists - enrich it with full metadata
// Version task created this with Title + Format only
// Now enriching with DownloadURL, ByteSize, and MediaType
log.Info(
ctx,
"enriching existing distribution with download metadata",
log.Data{
"task_id": task.ID,
"title": distribution.Title,
"download_url": distribution.DownloadURL,
"byte_size": distribution.ByteSize,
"index": index,
},
)
logData["title"] = distribution.Title
logData["download_url"] = distribution.DownloadURL
logData["byte_size"] = distribution.ByteSize
logData["index"] = index
log.Info(ctx, "enriching existing distribution with download metadata", logData)
(*currentVersion.Distributions)[index] = distribution
} else {
// Distribution not found - append as new
Expand All @@ -191,13 +254,14 @@ func (e *DatasetDownloadTaskExecutor) updateDownloadMetadata(ctx context.Context
)
}
}
}

_, err = e.clientList.DatasetAPI.PutVersion(ctx, headers, task.Target.DatasetID, task.Target.EditionID, task.Target.VersionID, currentVersion)
if err != nil {
return err
// isConflictError checks if the error is an HTTP 409 Conflict.
func isConflictError(err error) bool {
if err == nil {
return false
}

return nil
return strings.Contains(err.Error(), strconv.Itoa(http.StatusConflict))
}

// findDistributionIndexByTitle finds the index of a distribution in the slice
Expand Down
49 changes: 49 additions & 0 deletions executor/task_dataset_download_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,4 +397,53 @@ func TestDatasetDownloadTaskExecutor(t *testing.T) {
})
})
})

Convey("Given a dataset download task executor and a dataset client that returns a 409 Conflict then succeeds", t, func() {
mockJobService := &applicationMocks.JobServiceMock{
UpdateTaskStateFunc: func(ctx context.Context, taskID string, state domain.State) error { return nil },
}
putVersionCalls := 0

mockClientList := &clients.ClientList{
DatasetAPI: &datasetSDKMock.ClienterMock{
GetVersionWithHeadersFunc: func(ctx context.Context, headers sdk.Headers, datasetID, edition, version string) (datasetModels.Version, sdk.ResponseHeaders, error) {
return datasetModels.Version{
Distributions: &[]datasetModels.Distribution{},
}, sdk.ResponseHeaders{ETag: "etag-1"}, nil
},
PutVersionFunc: func(ctx context.Context, headers sdk.Headers, datasetID, editionID, versionID string, version datasetModels.Version) (datasetModels.Version, error) {
putVersionCalls++
if putVersionCalls == 1 {
return datasetModels.Version{}, errors.New("Etag conflict, received status 409")
}
return datasetModels.Version{}, nil
},
},
UploadService: &uploadSDKMock.ClienterMock{
UploadFunc: func(ctx context.Context, fileContent io.ReadCloser, metadata api.Metadata, headers uploadSDK.Headers) error {
return nil
},
},
Zebedee: &clientMocks.ZebedeeClientMock{
GetResourceStreamFunc: func(ctx context.Context, userAuthToken, collectionID, lang, path string) (io.ReadCloser, error) {
return io.NopCloser(bytes.NewReader([]byte(testFileData))), nil
},
GetFileSizeFunc: func(ctx context.Context, userAccessToken, collectionID, lang, uri string) (zebedee.FileSize, error) {
return zebedee.FileSize{Size: len(testFileData)}, nil
},
},
}

ctx := context.Background()

executor := NewDatasetDownloadTaskExecutor(mockJobService, mockClientList, testServiceAuthToken)

Convey("When migrate is called for a task", func() {
err := executor.Migrate(ctx, testDownloadTask)
Convey("Then it retries and eventually succeeds", func() {
So(err, ShouldBeNil)
So(putVersionCalls, ShouldEqual, 2)
})
})
})
}
20 changes: 18 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
module github.com/ONSdigital/dis-migration-service

go 1.24.7
go 1.24.12

require (
github.com/ONSdigital/dis-redirect-api v0.3.8
github.com/ONSdigital/dp-api-clients-go/v2 v2.273.1
github.com/ONSdigital/dp-authorisation/v2 v2.33.1
github.com/ONSdigital/dp-cache v0.6.1
github.com/ONSdigital/dp-component-test v1.2.4-alpha
github.com/ONSdigital/dp-dataset-api v1.95.0
github.com/ONSdigital/dp-dataset-api v1.99.0
github.com/ONSdigital/dp-files-api v1.16.0
github.com/ONSdigital/dp-healthcheck v1.6.4
github.com/ONSdigital/dp-mongodb/v3 v3.11.0
Expand All @@ -35,9 +35,16 @@ require (
dario.cat/mergo v1.0.2 // indirect
github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/ONSdigital/dp-api-clients-go v1.43.0 // indirect
github.com/ONSdigital/dp-graph/v2 v2.18.0 // indirect
github.com/ONSdigital/dp-kafka/v3 v3.11.0 // indirect
github.com/ONSdigital/dp-kafka/v4 v4.2.0 // indirect
github.com/ONSdigital/dp-mongodb-in-memory v1.8.1 // indirect
github.com/ONSdigital/dp-net/v2 v2.22.0 // indirect
github.com/ONSdigital/dp-s3/v3 v3.3.0 // indirect
github.com/ONSdigital/golang-neo4j-bolt-driver v0.0.0-20241121114036-9f4b82bb9d37 // indirect
github.com/ONSdigital/graphson v0.3.0 // indirect
github.com/ONSdigital/gremgo-neptune v1.1.0 // indirect
github.com/Shopify/sarama v1.38.1 // indirect
github.com/aws/aws-sdk-go-v2 v1.41.1 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.4 // indirect
Expand Down Expand Up @@ -65,6 +72,7 @@ require (
github.com/chromedp/cdproto v0.0.0-20250803210736-d308e07a266d // indirect
github.com/chromedp/chromedp v0.14.2 // indirect
github.com/chromedp/sysutil v1.1.0 // indirect
github.com/cloudflare/cloudflare-go/v6 v6.4.0 // indirect
github.com/containerd/errdefs v1.0.0 // indirect
github.com/containerd/errdefs/pkg v0.3.0 // indirect
github.com/containerd/log v0.1.0 // indirect
Expand Down Expand Up @@ -97,6 +105,7 @@ require (
github.com/gobwas/ws v1.4.0 // indirect
github.com/gofrs/uuid v4.4.0+incompatible // indirect
github.com/golang-jwt/jwt/v4 v4.5.2 // indirect
github.com/golang/glog v1.2.5 // indirect
github.com/golang/snappy v1.0.0 // indirect
github.com/gopherjs/gopherjs v1.17.2 // indirect
github.com/gorilla/schema v1.4.1 // indirect
Expand All @@ -114,6 +123,7 @@ require (
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/jinzhu/copier v0.4.0 // indirect
github.com/jtolds/gls v4.20.0+incompatible // indirect
github.com/klauspost/compress v1.18.3 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
Expand Down Expand Up @@ -142,12 +152,17 @@ require (
github.com/shirou/gopsutil/v4 v4.25.12 // indirect
github.com/sirupsen/logrus v1.9.4 // indirect
github.com/smarty/assertions v1.16.0 // indirect
github.com/spf13/afero v1.15.0 // indirect
github.com/spf13/pflag v1.0.10 // indirect
github.com/square/mongo-lock v0.0.0-20230808145049-cfcf499f6bf0 // indirect
github.com/stretchr/testify v1.11.1 // indirect
github.com/testcontainers/testcontainers-go v0.40.0 // indirect
github.com/testcontainers/testcontainers-go/modules/mongodb v0.40.0 // indirect
github.com/testcontainers/testcontainers-go/modules/redis v0.40.0 // indirect
github.com/tidwall/gjson v1.18.0 // indirect
github.com/tidwall/match v1.2.0 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
github.com/tidwall/sjson v1.2.5 // indirect
github.com/tklauser/go-sysconf v0.3.16 // indirect
github.com/tklauser/numcpus v0.11.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
Expand All @@ -156,6 +171,7 @@ require (
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama v0.43.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.64.0 // indirect
go.opentelemetry.io/contrib/propagators/autoprop v0.64.0 // indirect
go.opentelemetry.io/contrib/propagators/aws v1.39.0 // indirect
Expand Down
Loading
Loading