@@ -40,7 +40,6 @@ import (
4040 "github.com/uber/kraken/utils/httputil"
4141 "github.com/uber/kraken/utils/listener"
4242 "github.com/uber/kraken/utils/log"
43- "go.uber.org/zap"
4443
4544 "github.com/go-chi/chi"
4645 chimiddleware "github.com/go-chi/chi/middleware"
@@ -78,8 +77,8 @@ func New(
7877 remotes tagreplication.Remotes ,
7978 tagReplicationManager persistedretry.Manager ,
8079 provider tagclient.Provider ,
81- depResolver tagtype.DependencyResolver ) * Server {
82-
80+ depResolver tagtype.DependencyResolver ,
81+ ) * Server {
8382 config = config .applyDefaults ()
8483
8584 stats = stats .Tagged (map [string ]string {
@@ -145,7 +144,7 @@ func (s *Server) ListenAndServe() error {
145144func (s * Server ) healthHandler (w http.ResponseWriter , r * http.Request ) error {
146145 _ , err := fmt .Fprintln (w , "OK" )
147146 if err != nil {
148- log .Desugar ( ).Error ("Health check write failed" , zap . Error ( err ) )
147+ log .With ( "error" , err ).Error ("Health check write failed" )
149148 return handler .Errorf ("write health check: %s" , err )
150149 }
151150 return nil
@@ -154,17 +153,17 @@ func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) error {
154153func (s * Server ) readinessCheckHandler (w http.ResponseWriter , r * http.Request ) error {
155154 err := s .backends .CheckReadiness ()
156155 if err != nil {
157- log .Desugar ( ).Error ("Backends readiness check failed" , zap . Error ( err ) )
156+ log .With ( "error" , err ).Error ("Backends readiness check failed" )
158157 return handler .Errorf ("not ready to serve traffic: %s" , err ).Status (http .StatusServiceUnavailable )
159158 }
160159 err = s .localOriginClient .CheckReadiness ()
161160 if err != nil {
162- log .Desugar ( ).Error ("Origin readiness check failed" , zap . Error ( err ) )
161+ log .With ( "error" , err ).Error ("Origin readiness check failed" )
163162 return handler .Errorf ("not ready to serve traffic: %s" , err ).Status (http .StatusServiceUnavailable )
164163 }
165164 _ , err = fmt .Fprintln (w , "OK" )
166165 if err != nil {
167- log .Desugar ( ).Error ("Readiness check write failed" , zap . Error ( err ) )
166+ log .With ( "error" , err ).Error ("Readiness check write failed" )
168167 return handler .Errorf ("write readiness check: %s" , err )
169168 }
170169 return nil
@@ -184,18 +183,30 @@ func (s *Server) putTagHandler(w http.ResponseWriter, r *http.Request) error {
184183 return handler .Errorf ("parse query arg `replicate`: %s" , err )
185184 }
186185
186+ log .With ("tag" , tag , "digest" , d .String (), "replicate" , replicate ).Info ("Putting tag" )
187+
187188 deps , err := s .depResolver .Resolve (tag , d )
188189 if err != nil {
190+ log .With ("tag" , tag , "digest" , d .String (), "error" , err ).Error ("Failed to resolve dependencies" )
189191 return fmt .Errorf ("resolve dependencies: %s" , err )
190192 }
193+
194+ log .With ("tag" , tag , "digest" , d .String (), "dependency_count" , len (deps )).Debug ("Resolved dependencies" )
195+
191196 if err := s .putTag (tag , d , deps ); err != nil {
197+ log .With ("tag" , tag , "digest" , d .String (), "error" , err ).Error ("Failed to put tag" )
192198 return err
193199 }
194200
201+ log .With ("tag" , tag , "digest" , d .String ()).Info ("Successfully put tag" )
202+
195203 if replicate {
204+ log .With ("tag" , tag , "digest" , d .String ()).Info ("Starting tag replication" )
196205 if err := s .replicateTag (tag , d , deps ); err != nil {
206+ log .With ("tag" , tag , "digest" , d .String (), "error" , err ).Error ("Failed to replicate tag" )
197207 return err
198208 }
209+ log .With ("tag" , tag , "digest" , d .String ()).Info ("Successfully replicated tag" )
199210 }
200211 w .WriteHeader (http .StatusOK )
201212 return nil
@@ -217,10 +228,15 @@ func (s *Server) duplicatePutTagHandler(w http.ResponseWriter, r *http.Request)
217228 }
218229 delay := req .Delay
219230
231+ log .With ("tag" , tag , "digest" , d .String (), "delay" , delay ).Debug ("Received duplicate put request from neighbor" )
232+
220233 if err := s .store .Put (tag , d , delay ); err != nil {
234+ log .With ("tag" , tag , "digest" , d .String (), "delay" , delay , "error" , err ).Error ("Failed to store tag from duplicate put" )
221235 return handler .Errorf ("storage: %s" , err )
222236 }
223237
238+ log .With ("tag" , tag , "digest" , d .String (), "delay" , delay ).Info ("Successfully stored tag from duplicate put" )
239+
224240 w .WriteHeader (http .StatusOK )
225241 return nil
226242}
@@ -231,15 +247,22 @@ func (s *Server) getTagHandler(w http.ResponseWriter, r *http.Request) error {
231247 return err
232248 }
233249
250+ log .With ("tag" , tag ).Debug ("Getting tag" )
251+
234252 d , err := s .store .Get (tag )
235253 if err != nil {
236254 if err == tagstore .ErrTagNotFound {
255+ log .With ("tag" , tag ).Debug ("Tag not found" )
237256 return handler .ErrorStatus (http .StatusNotFound )
238257 }
258+ log .With ("tag" , tag ).Errorf ("Failed to get tag from storage: %s" , err )
239259 return handler .Errorf ("storage: %s" , err )
240260 }
241261
262+ log .With ("tag" , tag , "digest" , d .String ()).Debug ("Successfully retrieved tag" )
263+
242264 if _ , err := io .WriteString (w , d .String ()); err != nil {
265+ log .With ("tag" , tag , "digest" , d .String (), "error" , err ).Error ("Failed to write digest" )
243266 return handler .Errorf ("write digest: %s" , err )
244267 }
245268 return nil
@@ -251,16 +274,23 @@ func (s *Server) hasTagHandler(w http.ResponseWriter, r *http.Request) error {
251274 return err
252275 }
253276
277+ log .With ("tag" , tag ).Debug ("Checking if tag exists" )
278+
254279 client , err := s .backends .GetClient (tag )
255280 if err != nil {
281+ log .With ("tag" , tag ).Errorf ("Failed to get backend client: %s" , err )
256282 return handler .Errorf ("backend manager: %s" , err )
257283 }
258284 if _ , err := client .Stat (tag , tag ); err != nil {
259285 if err == backenderrors .ErrBlobNotFound {
286+ log .With ("tag" , tag ).Debug ("Tag does not exist in backend" )
260287 return handler .ErrorStatus (http .StatusNotFound )
261288 }
289+ log .With ("tag" , tag ).Errorf ("Failed to check tag existence: %s" , err )
262290 return err
263291 }
292+
293+ log .With ("tag" , tag ).Debug ("Tag exists in backend" )
264294 return nil
265295}
266296
@@ -269,8 +299,11 @@ func (s *Server) hasTagHandler(w http.ResponseWriter, r *http.Request) error {
269299func (s * Server ) listHandler (w http.ResponseWriter , r * http.Request ) error {
270300 prefix := r .URL .Path [len ("/list/" ):]
271301
302+ log .With ("prefix" , prefix ).Debug ("Listing tags with prefix" )
303+
272304 client , err := s .backends .GetClient (prefix )
273305 if err != nil {
306+ log .With ("prefix" , prefix , "error" , err ).Error ("Failed to get backend client for list" )
274307 return handler .Errorf ("backend manager: %s" , err )
275308 }
276309
@@ -281,9 +314,12 @@ func (s *Server) listHandler(w http.ResponseWriter, r *http.Request) error {
281314
282315 result , err := client .List (prefix , opts ... )
283316 if err != nil {
317+ log .With ("prefix" , prefix , "error" , err ).Error ("Failed to list from backend" )
284318 return handler .Errorf ("error listing from backend: %s" , err )
285319 }
286320
321+ log .With ("prefix" , prefix , "result_count" , len (result .Names ), "continuation_token" , result .ContinuationToken ).Debug ("Successfully listed tags" )
322+
287323 resp , err := buildPaginationResponse (r .URL , result .ContinuationToken ,
288324 result .Names )
289325 if err != nil {
@@ -304,8 +340,11 @@ func (s *Server) listRepositoryHandler(w http.ResponseWriter, r *http.Request) e
304340 return err
305341 }
306342
343+ log .With ("repository" , repo ).Debug ("Listing repository tags" )
344+
307345 client , err := s .backends .GetClient (repo )
308346 if err != nil {
347+ log .With ("repository" , repo ).Errorf ("Failed to get backend client for repository list: %s" , err )
309348 return handler .Errorf ("backend manager: %s" , err )
310349 }
311350
@@ -316,6 +355,7 @@ func (s *Server) listRepositoryHandler(w http.ResponseWriter, r *http.Request) e
316355
317356 result , err := client .List (path .Join (repo , "_manifests/tags" ), opts ... )
318357 if err != nil {
358+ log .With ("repository" , repo ).Errorf ("Failed to list repository tags from backend: %s" , err )
319359 return handler .Errorf ("error listing from backend: %s" , err )
320360 }
321361
@@ -324,12 +364,14 @@ func (s *Server) listRepositoryHandler(w http.ResponseWriter, r *http.Request) e
324364 // Strip repo prefix.
325365 parts := strings .Split (name , ":" )
326366 if len (parts ) != 2 {
327- log .With ("name" , name ).Warn ("Repo list skipping name, expected repo:tag format" )
367+ log .With ("repository" , repo , " name" , name ).Warn ("Skipping invalid tag name format" )
328368 continue
329369 }
330370 tags = append (tags , parts [1 ])
331371 }
332372
373+ log .With ("repository" , repo , "tag_count" , len (tags ), "continuation_token" , result .ContinuationToken ).Debug ("Successfully listed repository tags" )
374+
333375 resp , err := buildPaginationResponse (r .URL , result .ContinuationToken , tags )
334376 if err != nil {
335377 return err
@@ -346,20 +388,35 @@ func (s *Server) replicateTagHandler(w http.ResponseWriter, r *http.Request) err
346388 return err
347389 }
348390
391+ log .With ("tag" , tag ).Info ("Received replicate tag request" )
392+
349393 d , err := s .store .Get (tag )
350394 if err != nil {
351395 if err == tagstore .ErrTagNotFound {
396+ log .With ("tag" , tag ).Warn ("Cannot replicate tag - not found in storage" )
352397 return handler .ErrorStatus (http .StatusNotFound )
353398 }
399+ log .With ("tag" , tag ).Errorf ("Failed to get tag for replication: %s" , err )
354400 return handler .Errorf ("storage: %s" , err )
355401 }
402+
403+ log .With ("tag" , tag , "digest" , d .String ()).Debug ("Retrieved tag for replication" )
404+
356405 deps , err := s .depResolver .Resolve (tag , d )
357406 if err != nil {
407+ log .With ("tag" , tag , "digest" , d .String ()).Errorf ("Failed to resolve dependencies for replication: %s" , err )
358408 return fmt .Errorf ("resolve dependencies: %s" , err )
359409 }
410+
411+ log .With ("tag" , tag , "digest" , d .String (), "dependency_count" , len (deps )).Debug ("Resolved dependencies for replication" )
412+
360413 if err := s .replicateTag (tag , d , deps ); err != nil {
414+ log .With ("tag" , tag , "digest" , d .String ()).Errorf ("Failed to replicate tag: %s" , err )
361415 return err
362416 }
417+
418+ log .With ("tag" , tag , "digest" , d .String ()).Info ("Successfully initiated tag replication" )
419+
363420 w .WriteHeader (http .StatusOK )
364421 return nil
365422}
@@ -378,15 +435,23 @@ func (s *Server) duplicateReplicateTagHandler(w http.ResponseWriter, r *http.Req
378435 return handler .Errorf ("decode body: %s" , err )
379436 }
380437
438+ log .With ("tag" , tag , "digest" , d .String (), "delay" , req .Delay , "dependency_count" , len (req .Dependencies )).Debug ("Received duplicate replicate request from neighbor" )
439+
381440 destinations := s .remotes .Match (tag )
382441
442+ log .With ("tag" , tag , "digest" , d .String (), "destination_count" , len (destinations )).Debug ("Matched remote destinations for duplicate replicate" )
443+
383444 for _ , dest := range destinations {
384445 task := tagreplication .NewTask (tag , d , req .Dependencies , dest , req .Delay )
385446 if err := s .tagReplicationManager .Add (task ); err != nil {
447+ log .With ("tag" , tag , "digest" , d .String (), "destination" , dest , "delay" , req .Delay ).Errorf ("Failed to add replicate task from duplicate: %s" , err )
386448 return handler .Errorf ("add replicate task: %s" , err )
387449 }
450+ log .With ("tag" , tag , "digest" , d .String (), "destination" , dest ).Debug ("Added replicate task from duplicate" )
388451 }
389452
453+ log .With ("tag" , tag , "digest" , d .String (), "tasks_added" , len (destinations )).Info ("Successfully processed duplicate replicate request" )
454+
390455 return nil
391456}
392457
@@ -398,65 +463,98 @@ func (s *Server) getOriginHandler(w http.ResponseWriter, r *http.Request) error
398463}
399464
400465func (s * Server ) putTag (tag string , d core.Digest , deps core.DigestList ) error {
401- for _ , dep := range deps {
466+ log .With ("tag" , tag , "digest" , d .String (), "dependency_count" , len (deps )).Debug ("Validating tag dependencies" )
467+
468+ for i , dep := range deps {
402469 if _ , err := s .localOriginClient .Stat (tag , dep ); err == blobclient .ErrBlobNotFound {
470+ log .With ("tag" , tag , "digest" , d .String (), "missing_dependency" , dep .String (), "dependency_index" , i ).Error ("Missing dependency blob" )
403471 return handler .Errorf ("cannot upload tag, missing dependency %s" , dep )
404472 } else if err != nil {
473+ log .With ("tag" , tag , "digest" , d .String (), "dependency" , dep .String (), "dependency_index" , i ).Errorf ("Failed to check dependency blob: %s" , err )
405474 return handler .Errorf ("check blob: %s" , err )
406475 }
407476 }
408477
478+ log .With ("tag" , tag , "digest" , d .String ()).Debug ("All dependencies validated successfully" )
479+
409480 if err := s .store .Put (tag , d , 0 ); err != nil {
481+ log .With ("tag" , tag , "digest" , d .String (), "error" , err ).Error ("Failed to store tag" )
410482 return handler .Errorf ("storage: %s" , err )
411483 }
412484
485+ log .With ("tag" , tag , "digest" , d .String ()).Info ("Tag stored locally" )
486+
413487 neighbors := s .neighbors .Resolve ()
488+ neighborCount := len (neighbors )
489+
490+ log .With ("tag" , tag , "digest" , d .String (), "neighbor_count" , neighborCount ).Debug ("Starting neighbor replication" )
414491
415492 var delay time.Duration
416493 var successes int
417494 for addr := range neighbors {
418495 delay += s .config .DuplicatePutStagger
419496 client := s .provider .Provide (addr )
420497 if err := client .DuplicatePut (tag , d , delay ); err != nil {
421- log .Errorf ( "Error duplicating put task to %s: %s" , addr , err )
498+ log .With ( "tag" , tag , "digest" , d . String (), "neighbor" , addr , "delay" , delay , "error" , err ). Error ( "Failed to duplicate put to neighbor" )
422499 } else {
423500 successes ++
501+ log .With ("tag" , tag , "digest" , d .String (), "neighbor" , addr , "delay" , delay ).Debug ("Successfully duplicated put to neighbor" )
424502 }
425503 }
504+
505+ log .With ("tag" , tag , "digest" , d .String (), "total_neighbors" , neighborCount , "successful_neighbors" , successes , "failed_neighbors" , neighborCount - successes ).Info ("Completed neighbor replication" )
506+
426507 if len (neighbors ) != 0 && successes == 0 {
427508 s .stats .Counter ("duplicate_put_failures" ).Inc (1 )
509+ log .With ("tag" , tag , "digest" , d .String (), "neighbor_count" , neighborCount ).Error ("All neighbor replications failed" )
428510 }
429511 return nil
430512}
431513
432514func (s * Server ) replicateTag (tag string , d core.Digest , deps core.DigestList ) error {
433515 destinations := s .remotes .Match (tag )
516+
517+ log .With ("tag" , tag , "digest" , d .String (), "destination_count" , len (destinations )).Debug ("Checking remote destinations for tag replication" )
518+
434519 if len (destinations ) == 0 {
520+ log .With ("tag" , tag , "digest" , d .String ()).Debug ("No remote destinations configured for tag" )
435521 return nil
436522 }
437523
524+ log .With ("tag" , tag , "digest" , d .String (), "destinations" , destinations ).Info ("Adding remote replication tasks" )
525+
438526 for _ , dest := range destinations {
439527 task := tagreplication .NewTask (tag , d , deps , dest , 0 )
440528 if err := s .tagReplicationManager .Add (task ); err != nil {
529+ log .With ("tag" , tag , "digest" , d .String (), "destination" , dest ).Errorf ("Failed to add remote replication task: %s" , err )
441530 return handler .Errorf ("add replicate task: %s" , err )
442531 }
532+ log .With ("tag" , tag , "digest" , d .String (), "destination" , dest ).Debug ("Added remote replication task" )
443533 }
444534
445535 neighbors := s .neighbors .Resolve ()
536+ neighborCount := len (neighbors )
537+
538+ log .With ("tag" , tag , "digest" , d .String (), "neighbor_count" , neighborCount ).Debug ("Notifying neighbors about remote replication" )
446539
447540 var delay time.Duration
448541 var successes int
449542 for addr := range neighbors { // Loops in random order.
450543 delay += s .config .DuplicateReplicateStagger
451544 client := s .provider .Provide (addr )
452545 if err := client .DuplicateReplicate (tag , d , deps , delay ); err != nil {
453- log .Errorf ( "Error duplicating replicate task to %s : %s", addr , err )
546+ log .With ( "tag" , tag , "digest" , d . String (), "neighbor" , addr , "delay" , delay ). Errorf ( "Failed to notify neighbor about replication : %s" , err )
454547 } else {
455548 successes ++
549+ log .With ("tag" , tag , "digest" , d .String (), "neighbor" , addr , "delay" , delay ).Debug ("Successfully notified neighbor about replication" )
456550 }
457551 }
552+
553+ log .With ("tag" , tag , "digest" , d .String (), "remote_destinations" , len (destinations ), "notified_neighbors" , successes , "failed_neighbors" , neighborCount - successes ).Info ("Completed remote replication setup" )
554+
458555 if len (neighbors ) != 0 && successes == 0 {
459556 s .stats .Counter ("duplicate_replicate_failures" ).Inc (1 )
557+ log .With ("tag" , tag , "digest" , d .String (), "neighbor_count" , neighborCount ).Error ("All neighbor replication notifications failed" )
460558 }
461559 return nil
462560}
@@ -497,8 +595,8 @@ func buildPaginationOptions(u *url.URL) ([]backend.ListOption, error) {
497595}
498596
499597func buildPaginationResponse (u * url.URL , continuationToken string ,
500- result []string ) ( * tagmodels. ListResponse , error ) {
501-
598+ result []string ,
599+ ) ( * tagmodels. ListResponse , error ) {
502600 nextUrlString := ""
503601 if continuationToken != "" {
504602 // Deep copy url.
0 commit comments