Skip to content

Commit 0d3fb1c

Browse files
authored
Merge pull request #1923 from Saartank/object-prestaging-tool-jan21
Add Prestage Functionality to Client
2 parents afd33af + 5f572b3 commit 0d3fb1c

File tree

16 files changed

+813
-91
lines changed

16 files changed

+813
-91
lines changed

client/acquire_token.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,17 @@ func (tg *tokenGenerator) SetToken(contents string) {
123123
tg.Token.Store(&info)
124124
}
125125

126+
// Copy the contents
127+
func (tg *tokenGenerator) Copy() *tokenGenerator {
128+
return &tokenGenerator{
129+
DirResp: tg.DirResp,
130+
Destination: tg.Destination,
131+
IsWrite: tg.IsWrite,
132+
EnableAcquire: tg.EnableAcquire,
133+
Sync: new(singleflight.Group),
134+
}
135+
}
136+
126137
// Read a token from a file; ensure
127138
func getTokenFromFile(tokenLocation string) (string, error) {
128139
//Read in the JSON

client/fed_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -976,3 +976,86 @@ func TestTokenGenerate(t *testing.T) {
976976
assert.Equal(t, transferResultsDownload[0].TransferredBytes, transferResultsUpload[0].TransferredBytes)
977977
}
978978
}
979+
980+
func TestPrestage(t *testing.T) {
981+
server_utils.ResetTestState()
982+
defer server_utils.ResetTestState()
983+
fed := fed_test_utils.NewFedTest(t, bothAuthOriginCfg)
984+
985+
te, err := client.NewTransferEngine(fed.Ctx)
986+
require.NoError(t, err)
987+
988+
// Other set-up items:
989+
// The cache will open the file to stat it, downloading the first block.
990+
// Make sure we are greater than 64kb in size.
991+
testFileContent := strings.Repeat("test file content", 10000)
992+
// Create the temporary file to upload
993+
tempFile, err := os.CreateTemp(t.TempDir(), "test")
994+
assert.NoError(t, err, "Error creating temp file")
995+
defer os.Remove(tempFile.Name())
996+
_, err = tempFile.WriteString(testFileContent)
997+
assert.NoError(t, err, "Error writing to temp file")
998+
tempFile.Close()
999+
1000+
tempToken, _ := getTempToken(t)
1001+
defer tempToken.Close()
1002+
defer os.Remove(tempToken.Name())
1003+
// Disable progress bars to not reuse the same mpb instance
1004+
viper.Set("Logging.DisableProgressBars", true)
1005+
1006+
oldPref, err := config.SetPreferredPrefix(config.PelicanPrefix)
1007+
assert.NoError(t, err)
1008+
defer func() {
1009+
_, err := config.SetPreferredPrefix(oldPref)
1010+
require.NoError(t, err)
1011+
}()
1012+
1013+
// Set path for object to upload/download
1014+
for _, export := range fed.Exports {
1015+
tempPath := tempFile.Name()
1016+
fileName := filepath.Base(tempPath)
1017+
uploadURL := fmt.Sprintf("pelican://%s:%s%s/prestage/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()),
1018+
export.FederationPrefix, fileName)
1019+
1020+
// Upload the file with COPY
1021+
transferResultsUpload, err := client.DoCopy(fed.Ctx, tempFile.Name(), uploadURL, false, client.WithTokenLocation(tempToken.Name()))
1022+
assert.NoError(t, err)
1023+
assert.Equal(t, int64(len(testFileContent)), transferResultsUpload[0].TransferredBytes)
1024+
1025+
// Check the cache info twice, make sure it's not cached.
1026+
tc, err := te.NewClient(client.WithTokenLocation(tempToken.Name()))
1027+
require.NoError(t, err)
1028+
innerFileUrl, err := url.Parse(uploadURL)
1029+
require.NoError(t, err)
1030+
age, size, err := tc.CacheInfo(fed.Ctx, innerFileUrl)
1031+
require.NoError(t, err)
1032+
assert.Equal(t, int64(len(testFileContent)), size)
1033+
// Due to an xrootd limitation, CacheInfo performs a GET request instead of a HEAD request.
1034+
// Once this limitation is resolved and CacheInfo is updated accordingly,
1035+
// the assertion should be changed to -1 instead of 0.
1036+
assert.Equal(t, 0, age)
1037+
1038+
age, size, err = tc.CacheInfo(fed.Ctx, innerFileUrl)
1039+
require.NoError(t, err)
1040+
assert.Equal(t, int64(len(testFileContent)), size)
1041+
// Due to an xrootd limitation, CacheInfo performs a GET request instead of a HEAD request.
1042+
// Once this limitation is resolved and CacheInfo is updated accordingly,
1043+
// the assertion should be changed to -1 instead of 0.
1044+
assert.Equal(t, 0, age)
1045+
1046+
// Prestage the object
1047+
tj, err := tc.NewPrestageJob(fed.Ctx, innerFileUrl)
1048+
require.NoError(t, err)
1049+
err = tc.Submit(tj)
1050+
require.NoError(t, err)
1051+
results, err := tc.Shutdown()
1052+
require.NoError(t, err)
1053+
assert.Equal(t, 1, len(results))
1054+
1055+
// Check if object is cached.
1056+
age, size, err = tc.CacheInfo(fed.Ctx, innerFileUrl)
1057+
require.NoError(t, err)
1058+
assert.Equal(t, int64(len(testFileContent)), size)
1059+
require.NotEqual(t, -1, age)
1060+
}
1061+
}

0 commit comments

Comments
 (0)