@@ -429,60 +429,18 @@ func (f *FederationEnvoy) syncer() {
429429 err )
430430 }
431431
432+ // Handle a new batch push request.
432433 case pushReq := <- f .batchPushRequests :
433434 log .Debug ("Federation envoy handling batch push " +
434435 "request" )
435- ctx , cancel := f .WithCtxQuitNoTimeout ()
436-
437- // First, we'll attempt to registrar the proof leaf with
438- // the local registrar server.
439- err := f .cfg .LocalRegistrar .UpsertProofLeafBatch (
440- ctx , pushReq .Batch ,
441- )
442- cancel ()
443- if err != nil {
444- err := fmt .Errorf ("unable to insert proof " +
445- "batch into local universe: %w" , err )
446-
447- log .Warnf (err .Error ())
448-
449- pushReq .err <- err
450- continue
451- }
452-
453- // Now that we know we were able to register the proof,
454- // we'll return back to the caller.
455- pushReq .resp <- struct {}{}
456-
457- // Fetch all universe servers in our federation.
458- fedServers , err := f .tryFetchServers ()
436+ err := f .handleBatchPushRequest (pushReq )
459437 if err != nil {
460- err := fmt .Errorf ("unable to fetch " +
461- "federation servers: %w" , err )
462- log .Warnf (err .Error ())
463- pushReq .err <- err
464- continue
465- }
466-
467- if len (fedServers ) == 0 {
468- log .Warnf ("could not find any federation " +
469- "servers" )
470- continue
438+ // Warn, but don't exit the event handler
439+ // routine.
440+ log .Warnf ("Unable to handle batch push " +
441+ "request: %v" , err )
471442 }
472443
473- // With the response sent above, we'll push this out to
474- // all the Universe servers in the background.
475- ctxPush , cancelPush := f .WithCtxQuitNoTimeout ()
476- for idx := range pushReq .Batch {
477- item := pushReq .Batch [idx ]
478-
479- f .pushProofToFederation (
480- ctxPush , item .ID , item .Key , item .Leaf ,
481- fedServers , item .LogProofSync ,
482- )
483- }
484- cancelPush ()
485-
486444 case <- f .Quit :
487445 return
488446 }
@@ -633,6 +591,61 @@ func (f *FederationEnvoy) handlePushRequest(pushReq *FederationPushReq) error {
633591 return nil
634592}
635593
594+ // handleBatchPushRequest is called each time a new batch push request is
595+ // received. It will perform an asynchronous registration with the local
596+ // Universe registrar, then push each leaf from the batch out in an async manner
597+ // to the federation members.
598+ func (f * FederationEnvoy ) handleBatchPushRequest (
599+ pushReq * FederationProofBatchPushReq ) error {
600+
601+ if pushReq == nil {
602+ return fmt .Errorf ("nil batch push request" )
603+ }
604+
605+ ctx , cancel := f .WithCtxQuitNoTimeout ()
606+ defer cancel ()
607+
608+ // First, we'll attempt to registrar the proof leaf with the local
609+ // registrar server.
610+ err := f .cfg .LocalRegistrar .UpsertProofLeafBatch (ctx , pushReq .Batch )
611+ if err != nil {
612+ err = fmt .Errorf ("unable to insert proof batch into local " +
613+ "universe: %w" , err )
614+ pushReq .err <- err
615+ return err
616+ }
617+
618+ // Now that we know we were able to register the proof, we'll return
619+ // back to the caller.
620+ pushReq .resp <- struct {}{}
621+
622+ // Fetch all universe servers in our federation.
623+ fedServers , err := f .tryFetchServers ()
624+ if err != nil {
625+ err = fmt .Errorf ("unable to fetch federation servers: %w" , err )
626+ pushReq .err <- err
627+ return err
628+ }
629+
630+ if len (fedServers ) == 0 {
631+ log .Warnf ("could not find any federation servers" )
632+ return nil
633+ }
634+
635+ // With the response sent above, we'll push this out to all the Universe
636+ // servers in the background.
637+ for idx := range pushReq .Batch {
638+ item := pushReq .Batch [idx ]
639+
640+ f .pushProofToFederation (
641+ ctx , item .ID , item .Key , item .Leaf , fedServers ,
642+ item .LogProofSync ,
643+ )
644+ }
645+
646+ return nil
647+ }
648+
636649// UpsertProofLeaf upserts a proof leaf within the target universe tree. This
637650// can be used to first push out a new update to the local registrar,
638651// ultimately queuing it to also be sent to the set of active universe servers.
0 commit comments