Skip to content

Commit 5b06749

Browse files
committed
Fix for concurrency bug
1 parent c634f0a commit 5b06749

File tree

5 files changed

+158
-12
lines changed

5 files changed

+158
-12
lines changed

ds3/buildclient/buildClient.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ import (
99
"os"
1010
)
1111

12-
const endpointEnv = "DS3_ENDPOINT"
13-
const accessKeyEnv = "DS3_ACCESS_KEY"
14-
const secretKeyEnv = "DS3_SECRET_KEY"
15-
const proxyEnv = "DS3_PROXY"
12+
const EndpointEnv = "DS3_ENDPOINT"
13+
const AccessKeyEnv = "DS3_ACCESS_KEY"
14+
const SecretKeyEnv = "DS3_SECRET_KEY"
15+
const ProxyEnv = "DS3_PROXY"
1616

1717
// Creates a client from cli arguments
1818
func FromArgs(args *commands.Arguments) (*ds3.Client, error) {
@@ -22,10 +22,10 @@ func FromArgs(args *commands.Arguments) (*ds3.Client, error) {
2222
// Creates a client from environment variables
2323
func FromEnv() (*ds3.Client, error) {
2424
//Retrieve the environment variables
25-
endpoint := os.Getenv(endpointEnv)
26-
accessKey := os.Getenv(accessKeyEnv)
27-
secretKey := os.Getenv(secretKeyEnv)
28-
proxy := os.Getenv(proxyEnv)
25+
endpoint := os.Getenv(EndpointEnv)
26+
accessKey := os.Getenv(AccessKeyEnv)
27+
secretKey := os.Getenv(SecretKeyEnv)
28+
proxy := os.Getenv(ProxyEnv)
2929

3030
// Validate required arguments and create client if all required values are present
3131
switch {

ds3/networking/headers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ type signatureFields struct {
5353
Path string
5454
}
5555

56-
func buildAuthHeaderValue(creds *Credentials, fields *signatureFields) (string) {
56+
func (fields *signatureFields) BuildAuthHeaderValue(creds *Credentials) (string) {
5757
// Build the string that we need to compute the MAC on.
5858
stringToSign := fmt.Sprintf(
5959
"%s\n%s\n%s\n%s\n%s%s",

ds3/networking/httpRequestBuilder.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,14 +108,14 @@ func (builder *HttpRequestBuilder) Build(conn *ConnectionInfo) (*http.Request, e
108108

109109
builder.signatureFields.Date = getCurrentTime()
110110

111-
authHeaderVal := buildAuthHeaderValue(conn.Credentials, &builder.signatureFields)
111+
authHeaderVal := builder.signatureFields.BuildAuthHeaderValue(conn.Credentials)
112112

113113
// Set the http request headers such as authorization and date.
114114
return builder.addHttpRequestHeaders(httpRequest, authHeaderVal)
115115
}
116116

117117
func (builder *HttpRequestBuilder) buildUrl(conn *ConnectionInfo) string {
118-
httpUrl := conn.Endpoint
118+
var httpUrl url.URL = *conn.Endpoint
119119
httpUrl.Path = builder.signatureFields.Path
120120
httpUrl.RawQuery = encodeQueryParams(builder.queryParams)
121121
return httpUrl.String()

ds3_cli/commands/getObject.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func getObject(client *ds3.Client, args *Arguments) error {
2525
// Build the request.
2626
request := models.NewGetObjectRequest(args.Bucket, args.Key)
2727
if args.End > 0 {
28-
request.WithRange(args.Start, args.End)
28+
request.WithRanges(models.Range{ int64(args.Start), int64(args.End) })
2929
}
3030

3131
// Perform the request.
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
package ds3_integration
2+
3+
import (
4+
"testing"
5+
"sync"
6+
"log"
7+
"fmt"
8+
"runtime/debug"
9+
"spectra/ds3_go_sdk/ds3"
10+
"spectra/ds3_go_sdk/ds3_utils/ds3Testing"
11+
"spectra/ds3_go_sdk/ds3/models"
12+
"spectra/ds3_go_sdk/ds3_integration/utils"
13+
"spectra/ds3_go_sdk/ds3/networking"
14+
"os"
15+
"spectra/ds3_go_sdk/ds3/buildclient"
16+
"net/url"
17+
)
18+
19+
func putFileWithClient(name string, offset int64, jobId string, content *[]byte, group *sync.WaitGroup, t *testing.T) {
20+
defer group.Done()
21+
log.Printf("Putting file %s", name)
22+
23+
_, err := client.PutObject(models.NewPutObjectRequest(testBucket, name, ds3.BuildByteReaderWithSizeDecorator(*content)).
24+
WithOffset(offset).
25+
WithJob(jobId))
26+
27+
if err != nil {
28+
t.Errorf("Unexpected error when putting file '%s': '%s'.", name, err.Error())
29+
debug.PrintStack()
30+
t.Fail()
31+
}
32+
}
33+
34+
func TestConcurrentClientUsage(t *testing.T) {
35+
defer testutils.DeleteBucketContents(client, testBucket)
36+
37+
content, err := testutils.LoadBook(testutils.BookTitles[0])
38+
ds3Testing.AssertNilError(t, err)
39+
40+
size := int64(len(content))
41+
42+
// create a job for putting n instances of file
43+
n := 30
44+
45+
var ds3Objects []models.Ds3PutObject
46+
for i := 0; i < n; i++ {
47+
ds3Objects = append(ds3Objects, models.Ds3PutObject{ Name:fmt.Sprintf("file%d", i), Size:size })
48+
}
49+
50+
bulkPut, err := client.PutBulkJobSpectraS3(models.NewPutBulkJobSpectraS3Request(testBucket, ds3Objects))
51+
ds3Testing.AssertNilError(t, err)
52+
53+
// launch go routines to put files concurrently
54+
var group sync.WaitGroup
55+
group.Add(n)
56+
57+
for _, chunk := range bulkPut.MasterObjectList.Objects {
58+
allocateChunk, allocateErr := client.AllocateJobChunkSpectraS3(models.NewAllocateJobChunkSpectraS3Request(chunk.ChunkId))
59+
ds3Testing.AssertNilError(t, allocateErr)
60+
for _, obj := range allocateChunk.Objects.Objects {
61+
go putFileWithClient(*obj.Name, obj.Offset, bulkPut.MasterObjectList.JobId, &content, &group, t)
62+
}
63+
}
64+
65+
group.Wait()
66+
}
67+
68+
func putFileWithSendNetwork(name string, offset int64, jobId string, content *[]byte, bucketName string, group *sync.WaitGroup, t *testing.T, network networking.Network, info *networking.ConnectionInfo) {
69+
defer group.Done()
70+
71+
// manually create http request
72+
httpRequest, err := networking.NewHttpRequestBuilder().
73+
WithHttpVerb(ds3.HTTP_VERB_PUT).
74+
WithPath("/" + bucketName + "/" + name).
75+
WithOptionalQueryParam("job", &jobId).
76+
WithOptionalQueryParam("offset", networking.Int64PtrToStrPtr(&offset)).
77+
WithReader(ds3.BuildByteReaderWithSizeDecorator(*content)).
78+
WithChecksum(models.NewNoneChecksum()).
79+
WithHeaders(make(map[string]string)).
80+
Build(info)
81+
82+
//networkRetryDecorator := networking.NewNetworkRetryDecorator(network, 5)
83+
t.Logf("Created request to for file '%s' to url '%s'", name, httpRequest.URL.String())
84+
response, err := network.Invoke(httpRequest)
85+
if err != nil {
86+
t.Errorf("Unexpected error when putting file '%s': '%s'.", name, err.Error())
87+
debug.PrintStack()
88+
t.Fail()
89+
}
90+
91+
_, err = models.NewPutObjectResponse(response)
92+
if err != nil {
93+
t.Errorf("Unexpected error when putting file '%s': '%s'.", name, err.Error())
94+
debug.PrintStack()
95+
t.Fail()
96+
}
97+
}
98+
99+
func TestConcurrentSendNetworkUsage(t *testing.T) {
100+
defer testutils.DeleteBucketContents(client, testBucket)
101+
102+
content, err := testutils.LoadBook(testutils.BookTitles[0])
103+
ds3Testing.AssertNilError(t, err)
104+
105+
size := int64(len(content))
106+
107+
// create a job for putting n instances of file
108+
n := 30
109+
110+
var ds3Objects []models.Ds3PutObject
111+
for i := 0; i < n; i++ {
112+
ds3Objects = append(ds3Objects, models.Ds3PutObject{ Name:fmt.Sprintf("file%d", i), Size:size })
113+
}
114+
115+
bulkPut, err := client.PutBulkJobSpectraS3(models.NewPutBulkJobSpectraS3Request(testBucket, ds3Objects))
116+
ds3Testing.AssertNilError(t, err)
117+
118+
// launch go routines to put files concurrently
119+
var group sync.WaitGroup
120+
group.Add(n)
121+
122+
//Retrieve the environment variables
123+
endpoint := os.Getenv(buildclient.EndpointEnv)
124+
accessKey := os.Getenv(buildclient.AccessKeyEnv)
125+
secretKey := os.Getenv(buildclient.SecretKeyEnv)
126+
127+
endpointUrl, err := url.Parse(endpoint)
128+
ds3Testing.AssertNilError(t, err)
129+
info := networking.ConnectionInfo{
130+
Endpoint: endpointUrl,
131+
Credentials: &networking.Credentials{AccessId: accessKey, Key: secretKey},
132+
Proxy: nil}
133+
134+
network := networking.NewSendNetwork(&info)
135+
136+
for _, chunk := range bulkPut.MasterObjectList.Objects {
137+
//TODO Try get available job chunks ready for client processing instead
138+
allocateChunk, allocateErr := client.AllocateJobChunkSpectraS3(models.NewAllocateJobChunkSpectraS3Request(chunk.ChunkId))
139+
ds3Testing.AssertNilError(t, allocateErr)
140+
for _, obj := range allocateChunk.Objects.Objects {
141+
go putFileWithSendNetwork(*obj.Name, obj.Offset, bulkPut.MasterObjectList.JobId, &content, testBucket, &group, t, network, &info)
142+
}
143+
}
144+
145+
group.Wait()
146+
}

0 commit comments

Comments
 (0)