@@ -12,6 +12,7 @@ import (
1212 "path/filepath"
1313 "runtime"
1414 "strings"
15+ "sync"
1516
1617 "github.com/pierrec/lz4/v4"
1718
@@ -25,6 +26,113 @@ type baseDirConfig struct {
2526 script bool
2627}
2728
29+ // tierName returns a human-readable name for a provider path.
30+ // Local filesystem paths are labelled "local cache"; rclone remotes
31+ // (which contain a colon that is not a Windows drive letter) are
32+ // labelled with the remote name portion (e.g. "WebDAV" from
33+ // "webdav:bucket/path"); script providers are labelled "script".
34+ func tierName (cfg baseDirConfig ) string {
35+ if cfg .script {
36+ return "script"
37+ }
38+ if util .IsRclonePath (cfg .path ) {
39+ // Extract the rclone remote name before the colon.
40+ if idx := strings .Index (cfg .path , ":" ); idx > 0 {
41+ return cfg .path [:idx ]
42+ }
43+ return "remote"
44+ }
45+ return "local cache"
46+ }
47+
48+ // downloadTracker accumulates per-tier download counts and drives
49+ // the progressive reporting output.
50+ type downloadTracker struct {
51+ mu sync.Mutex
52+
53+ // Per-tier counters keyed by the human-readable tier name.
54+ tierCounts map [string ]int
55+ // Ordered list of tier names in the order they were first seen,
56+ // so the summary prints in a deterministic order.
57+ tierOrder []string
58+ // Total files downloaded.
59+ total int
60+
61+ // individualLimit is the number of files reported individually
62+ // before switching to batch progress updates.
63+ individualLimit int
64+ // batchInterval is the number of files between batch progress
65+ // updates (after the individual limit is exceeded).
66+ batchInterval int
67+ // nextBatchAt tracks when the next batch progress line should
68+ // be emitted.
69+ nextBatchAt int
70+ }
71+
72+ func newDownloadTracker () * downloadTracker {
73+ return & downloadTracker {
74+ tierCounts : make (map [string ]int ),
75+ individualLimit : 10 ,
76+ batchInterval : 25 ,
77+ nextBatchAt : 0 , // computed after individual limit
78+ }
79+ }
80+
81+ // record logs a successful download from the given tier and emits
82+ // the appropriate progress line to stderr.
83+ func (t * downloadTracker ) record (oid string , tier string , path string , errWriter * bufio.Writer ) {
84+ t .mu .Lock ()
85+ defer t .mu .Unlock ()
86+
87+ if _ , seen := t .tierCounts [tier ]; ! seen {
88+ t .tierOrder = append (t .tierOrder , tier )
89+ }
90+ t .tierCounts [tier ]++
91+ t .total ++
92+
93+ shortOid := oid
94+ if len (shortOid ) > 8 {
95+ shortOid = shortOid [:8 ]
96+ }
97+
98+ if t .total <= t .individualLimit {
99+ // Phase 1: log each file individually.
100+ util .WriteToStderr (fmt .Sprintf ("LFS: [%d] %s <- %s (%s)\n " , t .total , shortOid , tier , path ), errWriter )
101+ if t .total == t .individualLimit {
102+ // Set up the first batch threshold.
103+ t .nextBatchAt = t .individualLimit + t .batchInterval
104+ }
105+ return
106+ }
107+
108+ // Phase 2: batch progress at regular intervals.
109+ if t .total >= t .nextBatchAt {
110+ util .WriteToStderr (fmt .Sprintf ("LFS: Progress -- %d files (%s)\n " , t .total , t .tierSummary ()), errWriter )
111+ t .nextBatchAt = t .total + t .batchInterval
112+ }
113+ }
114+
115+ // printSummary writes the final summary line. Called on terminate.
116+ func (t * downloadTracker ) printSummary (errWriter * bufio.Writer ) {
117+ t .mu .Lock ()
118+ defer t .mu .Unlock ()
119+
120+ if t .total == 0 {
121+ return
122+ }
123+ util .WriteToStderr (fmt .Sprintf ("LFS: Complete -- %d files (%s)\n " , t .total , t .tierSummary ()), errWriter )
124+ }
125+
126+ // tierSummary returns a comma-separated breakdown like
127+ // "139 from local cache, 3 from WebDAV". Must be called with mu held.
128+ func (t * downloadTracker ) tierSummary () string {
129+ parts := make ([]string , 0 , len (t .tierOrder ))
130+ for _ , name := range t .tierOrder {
131+ parts = append (parts , fmt .Sprintf ("%d from %s" , t .tierCounts [name ], name ))
132+ }
133+ return strings .Join (parts , ", " )
134+ }
135+
28136// Serve starts the protocol server
29137// usePullAction/usePushAction indicate whether to fall back to LFS actions
30138// for downloads and uploads respectively.
@@ -43,6 +151,8 @@ func Serve(pullBaseDir, pushBaseDir string, usePullAction, usePushAction, writeA
43151 return
44152 }
45153
154+ tracker := newDownloadTracker ()
155+
46156 for scanner .Scan () {
47157 line := scanner .Text ()
48158 var req api.Request
@@ -62,16 +172,16 @@ func Serve(pullBaseDir, pushBaseDir string, usePullAction, usePushAction, writeA
62172 }
63173 api .SendResponse (resp , writer , errWriter )
64174 case "download" :
65- util .WriteToStderr (fmt .Sprintf ("Received download request for %s\n " , req .Oid ), errWriter )
66- retrieve (pullBaseDir , gitDir , req .Oid , req .Size , usePullAction , req .Action , writer , errWriter )
175+ retrieve (pullBaseDir , gitDir , req .Oid , req .Size , usePullAction , req .Action , tracker , writer , errWriter )
67176 case "upload" :
68177 util .WriteToStderr (fmt .Sprintf ("Received upload request for %s\n " , req .Oid ), errWriter )
69178 if len (pushBaseDir ) == 0 {
70179 pushBaseDir = pullBaseDir
71180 }
72181 store (pushBaseDir , req .Oid , req .Size , usePushAction , writeAll , req .Action , req .Path , writer , errWriter )
73182 case "terminate" :
74- util .WriteToStderr ("Terminating test custom adapter gracefully.\n " , errWriter )
183+ tracker .printSummary (errWriter )
184+ util .WriteToStderr ("Terminating elastic-git-storage custom adapter gracefully.\n " , errWriter )
75185 break
76186 }
77187 }
@@ -95,7 +205,7 @@ func downloadTempPath(gitDir string, oid string) (string, error) {
95205 return filepath .Join (tmpfld , fmt .Sprintf ("%v.tmp" , oid )), nil
96206}
97207
98- func retrieve (baseDir , gitDir , oid string , size int64 , useAction bool , a * api.Action , writer , errWriter * bufio.Writer ) {
208+ func retrieve (baseDir , gitDir , oid string , size int64 , useAction bool , a * api.Action , tracker * downloadTracker , writer , errWriter * bufio.Writer ) {
99209
100210 dirs := splitBaseDirs (baseDir )
101211 var lastErr error
@@ -107,21 +217,19 @@ func retrieve(baseDir, gitDir, oid string, size int64, useAction bool, a *api.Ac
107217 err = tryRetrieveDir (d .path , gitDir , oid , size , d .compression , writer , errWriter )
108218 }
109219 if err == nil {
110- if i > 0 {
111- util .WriteToStderr (fmt .Sprintf ("LFS: retrieved %s from fallback provider %d: %s\n " , oid , i + 1 , d .path ), errWriter )
112- }
220+ tier := tierName (d )
221+ tracker .record (oid , tier , d .path , errWriter )
113222 return
114223 }
115224 if i == 0 && len (dirs ) > 1 {
116- util .WriteToStderr (fmt .Sprintf ("LFS: primary provider unavailable, falling back to provider %d: %s\n " , i + 2 , dirs [i + 1 ].path ), errWriter )
225+ util .WriteToStderr (fmt .Sprintf ("LFS: primary provider unavailable for %s , falling back to provider %d: %s\n " , oid , i + 2 , dirs [i + 1 ].path ), errWriter )
117226 }
118227 lastErr = err
119228 }
120229
121230 if useAction && a != nil {
122231 if err := retrieveFromAction (a , gitDir , oid , size , writer , errWriter ); err == nil {
123- providerCount := len (dirs )
124- util .WriteToStderr (fmt .Sprintf ("LFS: retrieved %s from fallback action (after %d provider(s) failed)\n " , oid , providerCount ), errWriter )
232+ tracker .record (oid , "LFS action" , "remote" , errWriter )
125233 return
126234 } else {
127235 lastErr = err
0 commit comments