@@ -185,9 +185,6 @@ func NewCourierDispatch(cfg *CourierCfg) *URLDispatch {
185185// address.
186186func (u * URLDispatch ) NewCourier (ctx context.Context , addr * url.URL ,
187187 lazyConnect bool ) (Courier , error ) {
188-
189- subscribers := make (map [uint64 ]* fn.EventReceiver [fn.Event ])
190-
191188 // Create new courier addr based on URL scheme.
192189 switch addr .Scheme {
193190 case HashmailCourierType :
@@ -197,33 +194,11 @@ func (u *URLDispatch) NewCourier(ctx context.Context, addr *url.URL,
197194 )
198195
199196 case UniverseRpcCourierType :
200- cfg := u . cfg . UniverseRpcCfg
201- backoffHandler := NewBackoffHandler (
202- cfg . BackoffCfg , u .cfg .TransferLog ,
197+ return NewUniverseRpcCourier (
198+ ctx , u . cfg . UniverseRpcCfg , u . cfg . TransferLog ,
199+ u .cfg .LocalArchive , addr , lazyConnect ,
203200 )
204201
205- // Connect to the universe RPC server.
206- dialOpts , err := serverDialOpts ()
207- if err != nil {
208- return nil , err
209- }
210-
211- serverAddr := fmt .Sprintf ("%s:%s" , addr .Hostname (), addr .Port ())
212- conn , err := grpc .Dial (serverAddr , dialOpts ... )
213- if err != nil {
214- return nil , err
215- }
216-
217- client := unirpc .NewUniverseClient (conn )
218-
219- return & UniverseRpcCourier {
220- client : client ,
221- backoffHandle : backoffHandler ,
222- cfg : u .cfg ,
223- subscribers : subscribers ,
224- rawConn : conn ,
225- }, nil
226-
227202 default :
228203 return nil , fmt .Errorf ("unknown courier address protocol " +
229204 "(consider updating tapd): %v" , addr .Scheme )
@@ -1169,17 +1144,24 @@ type UniverseRpcCourierCfg struct {
11691144// UniverseRpcCourier is a universe RPC proof courier service handle. It
11701145// implements the Courier interface.
11711146type UniverseRpcCourier struct {
1172- // client is the RPC client that the courier will use to interact with
1173- // the universe RPC server.
1174- client unirpc.UniverseClient
1147+ // cfg is the courier configuration.
1148+ cfg * UniverseRpcCourierCfg
11751149
1176- // cfg is the general courier configuration.
1177- cfg * CourierCfg
1150+ // addr is the address of the courier service.
1151+ addr * url.URL
1152+
1153+ // localArchive is the local archive that the courier will use to
1154+ // store and query for proofs.
1155+ localArchive Archiver
11781156
11791157 // rawConn is the raw connection that the courier will use to interact
11801158 // with the remote gRPC service.
11811159 rawConn * grpc.ClientConn
11821160
1161+ // client is the RPC client that the courier will use to interact with
1162+ // the universe RPC server.
1163+ client unirpc.UniverseClient
1164+
11831165 // backoffHandle is a handle to the backoff procedure used in proof
11841166 // delivery.
11851167 backoffHandle * BackoffHandler
@@ -1193,6 +1175,82 @@ type UniverseRpcCourier struct {
11931175 subscriberMtx sync.Mutex
11941176}
11951177
1178+ // NewUniverseRpcCourier creates a new universe RPC proof courier service
1179+ // handle.
1180+ func NewUniverseRpcCourier (ctx context.Context , cfg * UniverseRpcCourierCfg ,
1181+ transferLog TransferLog , localArchive Archiver , addr * url.URL ,
1182+ lazyConnect bool ) (* UniverseRpcCourier , error ) {
1183+
1184+ courier := UniverseRpcCourier {
1185+ cfg : cfg ,
1186+ addr : addr ,
1187+ localArchive : localArchive ,
1188+ backoffHandle : NewBackoffHandler (cfg .BackoffCfg , transferLog ),
1189+ subscribers : make (map [uint64 ]* fn.EventReceiver [fn.Event ]),
1190+ }
1191+
1192+ // If we're not lazy connecting, then we'll attempt to connect to the
1193+ // courier service immediately.
1194+ if ! lazyConnect {
1195+ err := courier .ensureConnect (ctx )
1196+ if err != nil {
1197+ return nil , fmt .Errorf ("unable to connect to courier " +
1198+ "service during courier handle " +
1199+ "instantiation: %w" , err )
1200+ }
1201+ }
1202+
1203+ return & courier , nil
1204+ }
1205+
1206+ // ensureConnect ensures that the courier handle is connected to the remote
1207+ // courier service.
1208+ //
1209+ // This method does nothing if a service connection is already established.
1210+ func (c * UniverseRpcCourier ) ensureConnect (ctx context.Context ) error {
1211+ // If we're already connected, we'll return early.
1212+ if c .rawConn != nil && c .client != nil {
1213+ // Check the gRPC connection state to determine if the
1214+ // connection is ready.
1215+ gRpcConnState := c .rawConn .GetState ()
1216+
1217+ connStatus , err := NewCourierConnStatusFromRpcStatus (
1218+ gRpcConnState ,
1219+ )
1220+ if err != nil {
1221+ return fmt .Errorf ("universe RPC courier unable to " +
1222+ "determine connection status: %w" , err )
1223+ }
1224+
1225+ if connStatus .IsPending () {
1226+ return nil
1227+ }
1228+ }
1229+
1230+ // At this point, we know that the connection is not ready. We'll now
1231+ // attempt to establish a new connection to the courier service.
1232+ dialOpts , err := serverDialOpts ()
1233+ if err != nil {
1234+ return err
1235+ }
1236+
1237+ // Ensure that the addr field has been set correctly.
1238+ if c .addr == nil {
1239+ return fmt .Errorf ("universe RPC courier address is not set" )
1240+ }
1241+
1242+ serverAddr := fmt .Sprintf ("%s:%s" , c .addr .Hostname (), c .addr .Port ())
1243+ conn , err := grpc .DialContext (ctx , serverAddr , dialOpts ... )
1244+ if err != nil {
1245+ return err
1246+ }
1247+
1248+ c .client = unirpc .NewUniverseClient (conn )
1249+ c .rawConn = conn
1250+
1251+ return nil
1252+ }
1253+
11961254// DeliverProof attempts to delivery a proof file to the receiver.
11971255func (c * UniverseRpcCourier ) DeliverProof (ctx context.Context ,
11981256 recipient Recipient , annotatedProof * AnnotatedProof ) error {
@@ -1272,6 +1330,15 @@ func (c *UniverseRpcCourier) DeliverProof(ctx context.Context,
12721330
12731331 // Setup delivery routine and start backoff procedure.
12741332 deliverFunc := func () error {
1333+ // Connect to the courier service if a connection hasn't
1334+ // been established yet.
1335+ err := c .ensureConnect (ctx )
1336+ if err != nil {
1337+ return fmt .Errorf ("unable to connect to " +
1338+ "courier service during delivery " +
1339+ "attempt: %w" , err )
1340+ }
1341+
12751342 // Submit proof to courier.
12761343 _ , err = c .client .InsertProof (ctx , & unirpc.AssetProof {
12771344 Key : & universeKey ,
@@ -1327,6 +1394,15 @@ func (c *UniverseRpcCourier) ReceiveProof(ctx context.Context,
13271394 // procedure.
13281395 var proofBlob []byte
13291396 receiveFunc := func () error {
1397+ // Connect to the courier service if a connection hasn't
1398+ // been established yet.
1399+ err := c .ensureConnect (ctx )
1400+ if err != nil {
1401+ return fmt .Errorf ("unable to connect to " +
1402+ "courier service during delivery " +
1403+ "attempt: %w" , err )
1404+ }
1405+
13301406 // Retrieve proof from courier.
13311407 resp , err := c .client .QueryProof (ctx , & universeKey )
13321408 if err != nil {
@@ -1352,7 +1428,7 @@ func (c *UniverseRpcCourier) ReceiveProof(ctx context.Context,
13521428 }
13531429
13541430 proofFile , err := FetchProofProvenance (
1355- ctx , c .cfg . LocalArchive , originLocator , fetchProof ,
1431+ ctx , c .localArchive , originLocator , fetchProof ,
13561432 )
13571433 if err != nil {
13581434 return nil , fmt .Errorf ("error fetching proof provenance: %w" ,
0 commit comments