@@ -417,79 +417,18 @@ func (f *FederationEnvoy) syncer() {
417417 err )
418418 }
419419
420- // A new push request has just arrived. We'll perform a
421- // asynchronous registration with the local Universe registrar,
422- // then push it out in an async manner to the federation
423- // members.
420+ // Handle a new push request.
424421 case pushReq := <- f .pushRequests :
425422 log .Debug ("Federation envoy handling push request" )
426- ctx , cancel := f .WithCtxQuit ()
427-
428- // First, we'll attempt to registrar the proof leaf with
429- // the local registrar server.
430- newProof , err := f .cfg .LocalRegistrar .UpsertProofLeaf (
431- ctx , pushReq .ID , pushReq .Key , pushReq .Leaf ,
432- )
433- cancel ()
434- if err != nil {
435- err := fmt .Errorf ("unable to insert proof " +
436- "into local universe: %w" , err )
437-
438- log .Warnf (err .Error ())
439-
440- pushReq .err <- err
441- continue
442- }
443-
444- // Now that we know we were able to register the proof,
445- // we'll return back to the caller, and push the new
446- // proof out to the federation in the background.
447- pushReq .resp <- newProof
448-
449- // Fetch all universe servers in our federation.
450- fedServers , err := f .tryFetchServers ()
423+ err := f .handlePushRequest (pushReq )
451424 if err != nil {
452- err := fmt .Errorf ("unable to fetch " +
453- "federation servers: %w" , err )
454- log .Warnf (err .Error ())
455- pushReq .err <- err
456- continue
457- }
458-
459- if len (fedServers ) == 0 {
460- log .Warnf ("could not find any federation " +
461- "servers" )
462- continue
463- }
464-
465- if pushReq .LogProofSync {
466- // We are attempting to sync using the
467- // logged proof sync procedure. We will
468- // therefore narrow down the set of target
469- // servers based on the sync log. Only servers
470- // that are not yet push sync complete will be
471- // targeted.
472- fedServers , err = f .filterProofSyncPending (
473- fedServers , pushReq .ID , pushReq .Key ,
474- )
475- if err != nil {
476- log .Warnf ("failed to filter " +
477- "federation servers" )
478- continue
479- }
425+ // Warn, but don't exit the syncer. The syncer
426+ // should continue to run and attempt handle
427+ // more events.
428+ log .Warnf ("Unable to handle push request: %v" ,
429+ err )
480430 }
481431
482- // With the response sent above, we'll push this
483- // out to all the Universe servers in the
484- // background.
485- ctxPush , cancelPush := f .WithCtxQuitNoTimeout ()
486- f .pushProofToFederation (
487- ctxPush , pushReq .ID , pushReq .Key ,
488- pushReq .Leaf , fedServers ,
489- pushReq .LogProofSync ,
490- )
491- cancelPush ()
492-
493432 case pushReq := <- f .batchPushRequests :
494433 log .Debug ("Federation envoy handling batch push " +
495434 "request" )
@@ -626,6 +565,74 @@ func (f *FederationEnvoy) handleTickEvent() error {
626565 return nil
627566}
628567
568+ // handlePushRequest is called each time a new push request is received. It will
569+ // perform an asynchronous registration with the local Universe registrar, then
570+ // push the proof leaf out in an async manner to the federation members.
571+ func (f * FederationEnvoy ) handlePushRequest (pushReq * FederationPushReq ) error {
572+ if pushReq == nil {
573+ return fmt .Errorf ("nil push request" )
574+ }
575+
576+ // First, we'll attempt to registrar the proof leaf with the local
577+ // registrar server.
578+ ctx , cancel := f .WithCtxQuit ()
579+ defer cancel ()
580+ newProof , err := f .cfg .LocalRegistrar .UpsertProofLeaf (
581+ ctx , pushReq .ID , pushReq .Key , pushReq .Leaf ,
582+ )
583+ if err != nil {
584+ err = fmt .Errorf ("unable to insert proof into local " +
585+ "universe: %w" , err )
586+ pushReq .err <- err
587+ return err
588+ }
589+
590+ // Now that we know we were able to register the proof, we'll return
591+ // back to the caller, and push the new proof out to the federation in
592+ // the background.
593+ pushReq .resp <- newProof
594+
595+ // Fetch all universe servers in our federation.
596+ fedServers , err := f .tryFetchServers ()
597+ if err != nil {
598+ err = fmt .Errorf ("unable to fetch federation servers: %w" , err )
599+ pushReq .err <- err
600+ return err
601+ }
602+
603+ if len (fedServers ) == 0 {
604+ log .Warnf ("could not find any federation servers" )
605+ return nil
606+ }
607+
608+ if pushReq .LogProofSync {
609+ // We are attempting to sync using the logged proof sync
610+ // procedure. We will therefore narrow down the set of target
611+ // servers based on the sync log. Only servers that are not yet
612+ // push sync complete will be targeted.
613+ fedServers , err = f .filterProofSyncPending (
614+ fedServers , pushReq .ID , pushReq .Key ,
615+ )
616+ if err != nil {
617+ err = fmt .Errorf ("failed to filter federation " +
618+ "servers: %w" , err )
619+ pushReq .err <- err
620+ return err
621+ }
622+ }
623+
624+ // With the response sent above, we'll push this out to all the Universe
625+ // servers in the background.
626+ ctx , cancel = f .WithCtxQuitNoTimeout ()
627+ defer cancel ()
628+ f .pushProofToFederation (
629+ ctx , pushReq .ID , pushReq .Key , pushReq .Leaf , fedServers ,
630+ pushReq .LogProofSync ,
631+ )
632+
633+ return nil
634+ }
635+
629636// UpsertProofLeaf upserts a proof leaf within the target universe tree. This
630637// can be used to first push out a new update to the local registrar,
631638// ultimately queuing it to also be sent to the set of active universe servers.
0 commit comments