@@ -4,12 +4,16 @@ import (
44 "fmt"
55 "io/fs"
66 "magician/provider"
7+ "math"
78 "path/filepath"
89 "regexp"
910 "sort"
1011 "strconv"
1112 "strings"
1213 "sync"
14+ "time"
15+
16+ "github.com/fsnotify/fsnotify"
1317)
1418
1519type Result struct {
@@ -60,6 +64,11 @@ type Tester struct {
6064 cassettePaths map [provider.Version ]string // where cassettes are relative to baseDir by version
6165 logPaths map [logKey ]string // where logs are relative to baseDir by version and mode
6266 repoPaths map [provider.Version ]string // relative paths of already cloned repos by version
67+
68+ // the following are for async upload cassettes
69+ enableAsyncUploadCassettes bool
70+ watcher * fsnotify.Watcher
71+ uploadFunc func (head string , version provider.Version , fileName string ) error
6372}
6473
6574const accTestParallelism = 32
@@ -116,15 +125,15 @@ var safeToLog = map[string]bool{
116125} // true if shown, false if hidden (default false)
117126
118127// Create a new tester in the current working directory and write the service account key file.
119- func NewTester (env map [string ]string , cassetteBucket , logBucket string , rnr ExecRunner ) (* Tester , error ) {
128+ func NewTester (env map [string ]string , cassetteBucket , logBucket string , rnr ExecRunner , enableAsyncUpload bool ) (* Tester , error ) {
120129 var saKeyPath string
121130 if saKeyVal , ok := env ["SA_KEY" ]; ok {
122131 saKeyPath = "sa_key.json"
123132 if err := rnr .WriteFile (saKeyPath , saKeyVal ); err != nil {
124133 return nil , err
125134 }
126135 }
127- return & Tester {
136+ vt := & Tester {
128137 env : env ,
129138 rnr : rnr ,
130139 cassetteBucket : cassetteBucket ,
@@ -134,7 +143,13 @@ func NewTester(env map[string]string, cassetteBucket, logBucket string, rnr Exec
134143 cassettePaths : make (map [provider.Version ]string , provider .NumVersions ),
135144 logPaths : make (map [logKey ]string , provider .NumVersions * numModes ),
136145 repoPaths : make (map [provider.Version ]string , provider .NumVersions ),
137- }, nil
146+ }
147+
148+ if enableAsyncUpload {
149+ vt .enableAsyncUploadCassettes = true
150+ vt .uploadFunc = vt .uploadOneCassetteFile
151+ }
152+ return vt , nil
138153}
139154
140155func (vt * Tester ) SetRepoPath (version provider.Version , repoPath string ) {
@@ -194,10 +209,11 @@ func (vt *Tester) LogPath(mode Mode, version provider.Version) string {
194209}
195210
196211type RunOptions struct {
197- Mode Mode
198- Version provider.Version
199- TestDirs []string
200- Tests []string
212+ Mode Mode
213+ Version provider.Version
214+ TestDirs []string
215+ Tests []string
216+ UploadBranchName string
201217}
202218
203219// Run the vcr tests in the given mode and provider version and return the result.
@@ -348,6 +364,19 @@ func (vt *Tester) RunParallel(opt RunOptions) (Result, error) {
348364 return Result {}, fmt .Errorf ("error creating cassette dir: %v" , err )
349365 }
350366 vt .cassettePaths [opt .Version ] = cassettePath
367+
368+ if vt .enableAsyncUploadCassettes {
369+ w , err := fsnotify .NewWatcher ()
370+ if err != nil {
371+ return Result {}, fmt .Errorf ("failed to create watcher" )
372+ }
373+ defer w .Close ()
374+ if err := w .Add (cassettePath ); err != nil {
375+ return Result {}, fmt .Errorf ("failed to add cassette path into watcher" )
376+ }
377+ vt .watcher = w
378+ go vt .asyncUploadCassettes (opt .Version , opt .UploadBranchName , w )
379+ }
351380 }
352381
353382 running := make (chan struct {}, parallelJobs )
@@ -534,6 +563,77 @@ func (vt *Tester) UploadLogs(opts UploadLogsOptions) error {
534563 return nil
535564}
536565
566+ func (vt * Tester ) asyncUploadCassettes (version provider.Version , branch string , w * fsnotify.Watcher ) error {
567+ var (
568+ waitFor = 100 * time .Millisecond
569+ mu sync.Mutex
570+ timers = make (map [string ]* time.Timer )
571+
572+ // Callback we run.
573+ cb = func (e fsnotify.Event ) {
574+ err := vt .uploadFunc (branch , version , e .Name )
575+ if err != nil {
576+ fmt .Println ("upload failed: " , err )
577+ }
578+ mu .Lock ()
579+ delete (timers , e .Name )
580+ mu .Unlock ()
581+ }
582+ )
583+
584+ for {
585+ select {
586+ case err , ok := <- w .Errors :
587+ if ! ok { // Channel was closed (i.e. Watcher.Close() was called).
588+ return nil
589+ }
590+ fmt .Println (err )
591+ case e , ok := <- w .Events :
592+ if ! ok { // Channel was closed (i.e. Watcher.Close() was called).
593+ return nil
594+ }
595+ // ignore everything outside of Create and Write.
596+ if ! e .Has (fsnotify .Create ) && ! e .Has (fsnotify .Write ) {
597+ continue
598+ }
599+
600+ // Get timer.
601+ mu .Lock ()
602+ t , ok := timers [e .Name ]
603+ mu .Unlock ()
604+
605+ // No timer yet, so create one.
606+ if ! ok {
607+ t = time .AfterFunc (math .MaxInt64 , func () { cb (e ) })
608+ t .Stop ()
609+
610+ mu .Lock ()
611+ timers [e .Name ] = t
612+ mu .Unlock ()
613+ }
614+
615+ // Reset the timer for this path, so it will start from 100ms again.
616+ t .Reset (waitFor )
617+ }
618+ }
619+ }
620+
621+ func (vt * Tester ) uploadOneCassetteFile (head string , version provider.Version , fileName string ) error {
622+ uploadPath := fmt .Sprintf ("gs://%s/%s/refs/heads/%s/fixtures/" , vt .cassetteBucket , version , head )
623+ args := []string {
624+ "-m" ,
625+ "-q" ,
626+ "cp" ,
627+ fileName ,
628+ uploadPath ,
629+ }
630+ fmt .Printf ("Uploading %s to %s: %v\n " , fileName , uploadPath , "gsutil " + strings .Join (args , " " ))
631+ if _ , err := vt .rnr .Run ("gsutil" , args , nil ); err != nil {
632+ return fmt .Errorf ("error uploading file %s: %s" , fileName , err )
633+ }
634+ return nil
635+ }
636+
537637func (vt * Tester ) UploadCassettes (head string , version provider.Version ) error {
538638 cassettePath , ok := vt .cassettePaths [version ]
539639 if ! ok {
0 commit comments