Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile.local
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.24
FROM golang:1.24.12

ENV GOCACHE=/go/.go/cache GOPATH=/go/.go/path TZ=Europe/London

Expand Down
92 changes: 80 additions & 12 deletions executor/task_dataset_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@ package executor

import (
"context"
"crypto/rand"
"fmt"
"math/big"
"net/http"
"strconv"
"strings"
"time"

"github.com/ONSdigital/dis-migration-service/application"
"github.com/ONSdigital/dis-migration-service/clients"
Expand All @@ -15,6 +22,11 @@ import (
"github.com/ONSdigital/log.go/v2/log"
)

const (
maxRetries = 5
baseDelay = 50 * time.Millisecond
)

// DatasetDownloadTaskExecutor executes migration tasks for dataset downloads.
type DatasetDownloadTaskExecutor struct {
jobService application.JobService
Expand Down Expand Up @@ -131,10 +143,6 @@ func (e *DatasetDownloadTaskExecutor) updateDownloadMetadata(ctx context.Context
return appErrors.ErrInvalidTask
}

headers := datasetSDK.Headers{
AccessToken: e.serviceAuthToken,
}

logData := log.Data{
"task_id": task.ID,
"dataset_id": task.Target.DatasetID,
Expand All @@ -143,12 +151,71 @@ func (e *DatasetDownloadTaskExecutor) updateDownloadMetadata(ctx context.Context
"title": distribution.Title,
}

//TODO: use the eTag here to prevent collisions. SDK needs to support it first.
currentVersion, _, err := e.clientList.DatasetAPI.GetVersionWithHeaders(ctx, headers, task.Target.DatasetID, task.Target.EditionID, task.Target.VersionID)
for attempt := 0; attempt <= maxRetries; attempt++ {
if attempt > 0 {
var randomDelay int64
maxDelay := big.NewInt(int64(baseDelay))
n, err := rand.Int(rand.Reader, maxDelay)
if err != nil {
randomDelay = int64(baseDelay)
} else {
randomDelay = n.Int64()
}
delay := baseDelay + time.Duration(randomDelay)

log.Info(ctx, "ETag conflict, retrying after delay", log.Data{
"task_id": task.ID,
"attempt": attempt,
"delay": delay.String(),
})

select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(delay):
}
}

err := e.tryUpdateDownloadMetadata(ctx, task, distribution, logData)
if err == nil {
return nil
}

if !isConflictError(err) {
return err
}
}

return fmt.Errorf("failed to update download metadata after %d attempts", maxRetries)
}

// tryUpdateDownloadMetadata attempts a single update
// of the dataset version with the distribution.
func (e *DatasetDownloadTaskExecutor) tryUpdateDownloadMetadata(ctx context.Context, task *domain.Task, distribution datasetModels.Distribution, logData log.Data) error {
headers := datasetSDK.Headers{
AccessToken: e.serviceAuthToken,
}

currentVersion, respHeaders, err := e.clientList.DatasetAPI.GetVersionWithHeaders(ctx, headers, task.Target.DatasetID, task.Target.EditionID, task.Target.VersionID)
if err != nil {
return err
}

e.applyDistributionUpdate(ctx, &currentVersion, distribution, logData)

// Update version with eTag
eTag := respHeaders.ETag
log.Info(ctx, "updating dataset version with new distribution", log.Data{"task_id": task.ID, "e_tag": eTag})

headers.IfMatch = eTag

_, err = e.clientList.DatasetAPI.PutVersion(ctx, headers, task.Target.DatasetID, task.Target.EditionID, task.Target.VersionID, currentVersion)
return err
}

// applyDistributionUpdate modifies the version's distributions
// with the new distribution.
func (e *DatasetDownloadTaskExecutor) applyDistributionUpdate(ctx context.Context, currentVersion *datasetModels.Version, distribution datasetModels.Distribution, logData log.Data) {
// Initialize distributions slice if nil
if currentVersion.Distributions == nil {
distributions := []datasetModels.Distribution{distribution}
Expand All @@ -169,7 +236,7 @@ func (e *DatasetDownloadTaskExecutor) updateDownloadMetadata(ctx context.Context
ctx,
"enriching existing distribution with download metadata",
log.Data{
"task_id": task.ID,
"task_id": logData["task_id"],
"title": distribution.Title,
"download_url": distribution.DownloadURL,
"byte_size": distribution.ByteSize,
Expand All @@ -191,13 +258,14 @@ func (e *DatasetDownloadTaskExecutor) updateDownloadMetadata(ctx context.Context
)
}
}
}

_, err = e.clientList.DatasetAPI.PutVersion(ctx, headers, task.Target.DatasetID, task.Target.EditionID, task.Target.VersionID, currentVersion)
if err != nil {
return err
// isConflictError checks if the error is an HTTP 409 Conflict.
func isConflictError(err error) bool {
if err == nil {
return false
}

return nil
return strings.Contains(err.Error(), strconv.Itoa(http.StatusConflict))
}

// findDistributionIndexByTitle finds the index of a distribution in the slice
Expand Down
49 changes: 49 additions & 0 deletions executor/task_dataset_download_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,4 +397,53 @@ func TestDatasetDownloadTaskExecutor(t *testing.T) {
})
})
})

Convey("Given a dataset download task executor and a dataset client that returns a 409 Conflict then succeeds", t, func() {
mockJobService := &applicationMocks.JobServiceMock{
UpdateTaskStateFunc: func(ctx context.Context, taskID string, state domain.State) error { return nil },
}
putVersionCalls := 0

mockClientList := &clients.ClientList{
DatasetAPI: &datasetSDKMock.ClienterMock{
GetVersionWithHeadersFunc: func(ctx context.Context, headers sdk.Headers, datasetID, edition, version string) (datasetModels.Version, sdk.ResponseHeaders, error) {
return datasetModels.Version{
Distributions: &[]datasetModels.Distribution{},
}, sdk.ResponseHeaders{ETag: "etag-1"}, nil
},
PutVersionFunc: func(ctx context.Context, headers sdk.Headers, datasetID, editionID, versionID string, version datasetModels.Version) (datasetModels.Version, error) {
putVersionCalls++
if putVersionCalls == 1 {
return datasetModels.Version{}, errors.New("Etag conflict, received status 409")
}
return datasetModels.Version{}, nil
},
},
UploadService: &uploadSDKMock.ClienterMock{
UploadFunc: func(ctx context.Context, fileContent io.ReadCloser, metadata api.Metadata, headers uploadSDK.Headers) error {
return nil
},
},
Zebedee: &clientMocks.ZebedeeClientMock{
GetResourceStreamFunc: func(ctx context.Context, userAuthToken, collectionID, lang, path string) (io.ReadCloser, error) {
return io.NopCloser(bytes.NewReader([]byte(testFileData))), nil
},
GetFileSizeFunc: func(ctx context.Context, userAccessToken, collectionID, lang, uri string) (zebedee.FileSize, error) {
return zebedee.FileSize{Size: len(testFileData)}, nil
},
},
}

ctx := context.Background()

executor := NewDatasetDownloadTaskExecutor(mockJobService, mockClientList, testServiceAuthToken)

Convey("When migrate is called for a task", func() {
err := executor.Migrate(ctx, testDownloadTask)
Convey("Then it retries and eventually succeeds", func() {
So(err, ShouldBeNil)
So(putVersionCalls, ShouldEqual, 2)
})
})
})
}
20 changes: 18 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
module github.com/ONSdigital/dis-migration-service

go 1.24.7
go 1.24.12

require (
github.com/ONSdigital/dis-redirect-api v0.3.8
github.com/ONSdigital/dp-api-clients-go/v2 v2.273.1
github.com/ONSdigital/dp-authorisation/v2 v2.33.1
github.com/ONSdigital/dp-cache v0.6.1
github.com/ONSdigital/dp-component-test v1.2.4-alpha
github.com/ONSdigital/dp-dataset-api v1.95.0
github.com/ONSdigital/dp-dataset-api v1.99.0
github.com/ONSdigital/dp-files-api v1.16.0
github.com/ONSdigital/dp-healthcheck v1.6.4
github.com/ONSdigital/dp-mongodb/v3 v3.11.0
Expand All @@ -35,9 +35,16 @@ require (
dario.cat/mergo v1.0.2 // indirect
github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/ONSdigital/dp-api-clients-go v1.43.0 // indirect
github.com/ONSdigital/dp-graph/v2 v2.18.0 // indirect
github.com/ONSdigital/dp-kafka/v3 v3.11.0 // indirect
github.com/ONSdigital/dp-kafka/v4 v4.2.0 // indirect
github.com/ONSdigital/dp-mongodb-in-memory v1.8.1 // indirect
github.com/ONSdigital/dp-net/v2 v2.22.0 // indirect
github.com/ONSdigital/dp-s3/v3 v3.3.0 // indirect
github.com/ONSdigital/golang-neo4j-bolt-driver v0.0.0-20241121114036-9f4b82bb9d37 // indirect
github.com/ONSdigital/graphson v0.3.0 // indirect
github.com/ONSdigital/gremgo-neptune v1.1.0 // indirect
github.com/Shopify/sarama v1.38.1 // indirect
github.com/aws/aws-sdk-go-v2 v1.41.1 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.4 // indirect
Expand Down Expand Up @@ -65,6 +72,7 @@ require (
github.com/chromedp/cdproto v0.0.0-20250803210736-d308e07a266d // indirect
github.com/chromedp/chromedp v0.14.2 // indirect
github.com/chromedp/sysutil v1.1.0 // indirect
github.com/cloudflare/cloudflare-go/v6 v6.4.0 // indirect
github.com/containerd/errdefs v1.0.0 // indirect
github.com/containerd/errdefs/pkg v0.3.0 // indirect
github.com/containerd/log v0.1.0 // indirect
Expand Down Expand Up @@ -97,6 +105,7 @@ require (
github.com/gobwas/ws v1.4.0 // indirect
github.com/gofrs/uuid v4.4.0+incompatible // indirect
github.com/golang-jwt/jwt/v4 v4.5.2 // indirect
github.com/golang/glog v1.2.5 // indirect
github.com/golang/snappy v1.0.0 // indirect
github.com/gopherjs/gopherjs v1.17.2 // indirect
github.com/gorilla/schema v1.4.1 // indirect
Expand All @@ -114,6 +123,7 @@ require (
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/jinzhu/copier v0.4.0 // indirect
github.com/jtolds/gls v4.20.0+incompatible // indirect
github.com/klauspost/compress v1.18.3 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
Expand Down Expand Up @@ -142,12 +152,17 @@ require (
github.com/shirou/gopsutil/v4 v4.25.12 // indirect
github.com/sirupsen/logrus v1.9.4 // indirect
github.com/smarty/assertions v1.16.0 // indirect
github.com/spf13/afero v1.15.0 // indirect
github.com/spf13/pflag v1.0.10 // indirect
github.com/square/mongo-lock v0.0.0-20230808145049-cfcf499f6bf0 // indirect
github.com/stretchr/testify v1.11.1 // indirect
github.com/testcontainers/testcontainers-go v0.40.0 // indirect
github.com/testcontainers/testcontainers-go/modules/mongodb v0.40.0 // indirect
github.com/testcontainers/testcontainers-go/modules/redis v0.40.0 // indirect
github.com/tidwall/gjson v1.18.0 // indirect
github.com/tidwall/match v1.2.0 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
github.com/tidwall/sjson v1.2.5 // indirect
github.com/tklauser/go-sysconf v0.3.16 // indirect
github.com/tklauser/numcpus v0.11.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
Expand All @@ -156,6 +171,7 @@ require (
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama v0.43.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.64.0 // indirect
go.opentelemetry.io/contrib/propagators/autoprop v0.64.0 // indirect
go.opentelemetry.io/contrib/propagators/aws v1.39.0 // indirect
Expand Down
Loading