Skip to content

Commit d0c1381

Browse files
committed
Wait for files
1 parent cfc18ac commit d0c1381

File tree

2 files changed

+119
-14
lines changed

2 files changed

+119
-14
lines changed

utils/workflows/files.go

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,15 @@ func MoveFile(ctx workflow.Context, source, destination paths.Path, priority rcl
4949
}
5050

5151
func CopyFile(ctx workflow.Context, source, destination paths.Path) error {
52-
return Execute(ctx, activities.Util.CopyFile, activities.MoveFileInput{
53-
Source: source,
54-
Destination: destination,
55-
}).Wait(ctx)
52+
external := source.OnExternalDrive() || destination.OnExternalDrive()
53+
if external {
54+
return RcloneCopyFile(ctx, source, destination, rclone.PriorityNormal)
55+
} else {
56+
return Execute(ctx, activities.Util.CopyFile, activities.MoveFileInput{
57+
Source: source,
58+
Destination: destination,
59+
}).Wait(ctx)
60+
}
5661
}
5762

5863
func CopyToFolder(ctx workflow.Context, file, folder paths.Path, priority rclone.Priority) (paths.Path, error) {
@@ -199,6 +204,27 @@ func RcloneCheckFileExists(ctx workflow.Context, file paths.Path) (bool, error)
199204
}).Result(ctx)
200205
}
201206

207+
func RcloneWaitForFileExists(ctx workflow.Context, file paths.Path, retries int) error {
208+
for i := 0; i < retries; i++ {
209+
exists, err := RcloneCheckFileExists(ctx, file)
210+
if err != nil {
211+
return err
212+
}
213+
214+
if exists {
215+
return nil
216+
}
217+
218+
err = workflow.Sleep(ctx, time.Second*30)
219+
if err != nil {
220+
return err
221+
}
222+
}
223+
224+
return fmt.Errorf("could not find file %s with retries %d", file, retries)
225+
226+
}
227+
202228
func RcloneWaitForFileGone(ctx workflow.Context, file paths.Path, notificationChannel telegram.Chat, retries int) error {
203229
fileExists := true
204230

workflows/misc/masv_import.go

Lines changed: 89 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package miscworkflows
22

33
import (
4+
"encoding/json"
45
"fmt"
56
"github.com/bcc-code/bcc-media-flows/paths"
67
"github.com/bcc-code/bcc-media-flows/services/rclone"
78
"github.com/bcc-code/bcc-media-flows/services/telegram"
89
wfutils "github.com/bcc-code/bcc-media-flows/utils/workflows"
910
"go.temporal.io/sdk/workflow"
11+
"strings"
1012
"time"
1113
)
1214

@@ -20,6 +22,40 @@ type MASVImportParams struct {
2022
EventTime string
2123
}
2224

25+
type MASVMetadata struct {
26+
ID string `json:"id"`
27+
CreatedAt time.Time `json:"created_at"`
28+
UpdatedAt time.Time `json:"updated_at"`
29+
Package Package `json:"package"`
30+
}
31+
type Files struct {
32+
ID string `json:"id"`
33+
Kind string `json:"kind"`
34+
Name string `json:"name"`
35+
Path string `json:"path"`
36+
Size int `json:"size"`
37+
}
38+
type Metadata struct {
39+
Church string `json:"church"`
40+
PackageDescription string `json:"package_description"`
41+
PackageName string `json:"package_name"`
42+
SenderEmail string `json:"sender_email"`
43+
}
44+
type Package struct {
45+
ID string `json:"id"`
46+
Files []Files `json:"files"`
47+
Name string `json:"name"`
48+
PortalID string `json:"portal_id"`
49+
PortalName string `json:"portal_name"`
50+
Sender string `json:"sender"`
51+
Size int `json:"size"`
52+
State string `json:"state"`
53+
TotalFiles int `json:"total_files"`
54+
Metadata Metadata `json:"metadata"`
55+
CreatedAt time.Time `json:"created_at"`
56+
UpdatedAt time.Time `json:"updated_at"`
57+
}
58+
2359
func MASVImport(ctx workflow.Context, params MASVImportParams) error {
2460
logger := workflow.GetLogger(ctx)
2561
logger.Info("Starting MASVImport workflow", "id", params.ID, "name", params.Name)
@@ -39,36 +75,79 @@ func MASVImport(ctx workflow.Context, params MASVImportParams) error {
3975
if err != nil {
4076
return err
4177
}
78+
4279
dst := tmpRoot.Append("masv", params.ID)
4380
if err := wfutils.CreateFolder(ctx, dst); err != nil {
4481
return err
4582
}
4683

47-
fileAvailable := false
84+
srcFolder, err := paths.Parse(src)
85+
if err != nil {
86+
return err
87+
}
88+
89+
var metaFileInfo *rclone.RcloneFile
4890
for i := 0; i < 60; i++ {
49-
srcFolder, err := paths.Parse(src)
50-
if err != nil {
51-
return err
52-
}
5391

5492
files, err := wfutils.RcloneListFiles(ctx, srcFolder)
5593
if err != nil {
5694
return err
5795
}
5896

59-
if len(files) > 0 {
60-
fileAvailable = true
97+
for _, file := range files {
98+
99+
if !strings.HasSuffix(file.Name, "metadata.json") {
100+
continue
101+
}
102+
103+
metaFileInfo = &file
104+
break
105+
}
106+
107+
if metaFileInfo != nil {
61108
break
62109
}
63110

64111
err = workflow.Sleep(ctx, 30*time.Second)
112+
}
113+
114+
if metaFileInfo == nil {
115+
return fmt.Errorf("could not find metadata file for package %s", params.ID)
116+
}
117+
118+
metaRemotePath, err := paths.Parse("s3prod:" + metaFileInfo.Path)
119+
if err != nil {
120+
return err
121+
}
122+
123+
metaFilePath := tmpRoot.Append("masv_meta.json")
124+
err = wfutils.CopyFile(ctx, metaRemotePath, metaFilePath)
125+
if err != nil {
126+
return err
127+
}
128+
129+
metaBytes, err := wfutils.ReadFile(ctx, metaFilePath)
130+
if err != nil {
131+
return err
132+
}
133+
134+
masvMeta := &MASVMetadata{}
135+
err = json.Unmarshal(metaBytes, masvMeta)
136+
if err != nil {
137+
return err
138+
}
139+
140+
for _, f := range masvMeta.Package.Files {
141+
fpath := fmt.Sprintf("s3prod:/massiveio-bccm/upload/%s/%s", params.Name, f.Path)
142+
parsedPath, err := paths.Parse(fpath)
65143
if err != nil {
66144
return err
67145
}
68-
}
69146

70-
if !fileAvailable {
71-
return fmt.Errorf("could not find masv file in %s", src)
147+
err = wfutils.RcloneWaitForFileExists(ctx, parsedPath, 30)
148+
if err != nil {
149+
return err
150+
}
72151
}
73152

74153
// Copy the directory contents from s3 to temp using rclone

0 commit comments

Comments
 (0)