@@ -89,6 +89,15 @@ func NewInterceptor(lnd *lndclient.LndServices, store Store,
8989 }
9090}
9191
92+ // interceptContext is a struct that contains all information about a call that
93+ // is intercepted by the interceptor.
94+ type interceptContext struct {
95+ mainCtx context.Context
96+ opts []grpc.CallOption
97+ metadata * metadata.MD
98+ token * Token
99+ }
100+
92101// UnaryInterceptor is an interceptor method that can be used directly by gRPC
93102// for unary calls. If the store contains a token, it is attached as credentials
94103// to every call before patching it through. The response error is also
@@ -105,38 +114,119 @@ func (i *Interceptor) UnaryInterceptor(ctx context.Context, method string,
105114 i .lock .Lock ()
106115 defer i .lock .Unlock ()
107116
108- addLsatCredentials := func (token * Token ) error {
109- macaroon , err := token .PaidMacaroon ()
110- if err != nil {
111- return err
112- }
113- opts = append (opts , grpc .PerRPCCredentials (
114- macaroons .NewMacaroonCredential (macaroon ),
115- ))
116- return nil
117+ // Create the context that we'll use to initiate the real request. This
118+ // contains the means to extract response headers and possibly also an
119+ // auth token, if we already have paid for one.
120+ iCtx , err := i .newInterceptContext (ctx , opts )
121+ if err != nil {
122+ return err
123+ }
124+
125+ // Try executing the call now. If anything goes wrong, we only handle
126+ // the LSAT error message that comes in the form of a gRPC status error.
127+ rpcCtx , cancel := context .WithTimeout (ctx , i .callTimeout )
128+ defer cancel ()
129+ err = invoker (rpcCtx , method , req , reply , cc , iCtx .opts ... )
130+ if ! isPaymentRequired (err ) {
131+ return err
132+ }
133+
134+ // Find out if we need to pay for a new token or perhaps resume
135+ // a previously aborted payment.
136+ err = i .handlePayment (iCtx )
137+ if err != nil {
138+ return err
139+ }
140+
141+ // Execute the same request again, now with the LSAT
142+ // token added as an RPC credential.
143+ rpcCtx2 , cancel2 := context .WithTimeout (ctx , i .callTimeout )
144+ defer cancel2 ()
145+ return invoker (rpcCtx2 , method , req , reply , cc , iCtx .opts ... )
146+ }
147+
148+ // StreamInterceptor is an interceptor method that can be used directly by gRPC
149+ // for streaming calls. If the store contains a token, it is attached as
150+ // credentials to every stream establishment call before patching it through.
151+ // The response error is also intercepted for every initial stream initiation.
152+ // If there is an error returned and it is indicating a payment challenge, a
153+ // token is acquired and paid for automatically. The original request is then
154+ // repeated back to the server, now with the new token attached.
155+ func (i * Interceptor ) StreamInterceptor (ctx context.Context ,
156+ desc * grpc.StreamDesc , cc * grpc.ClientConn , method string ,
157+ streamer grpc.Streamer , opts ... grpc.CallOption ) (grpc.ClientStream ,
158+ error ) {
159+
160+ // To avoid paying for a token twice if two parallel requests are
161+ // happening, we require an exclusive lock here.
162+ i .lock .Lock ()
163+ defer i .lock .Unlock ()
164+
165+ // Create the context that we'll use to initiate the real request. This
166+ // contains the means to extract response headers and possibly also an
167+ // auth token, if we already have paid for one.
168+ iCtx , err := i .newInterceptContext (ctx , opts )
169+ if err != nil {
170+ return nil , err
171+ }
172+
173+ // Try establishing the stream now. If anything goes wrong, we only
174+ // handle the LSAT error message that comes in the form of a gRPC status
175+ // error. The context of a stream will be used for the whole lifetime of
176+ // it, so we can't really clamp down on the initial call with a timeout.
177+ stream , err := streamer (ctx , desc , cc , method , iCtx .opts ... )
178+ if ! isPaymentRequired (err ) {
179+ return stream , err
180+ }
181+
182+ // Find out if we need to pay for a new token or perhaps resume
183+ // a previously aborted payment.
184+ err = i .handlePayment (iCtx )
185+ if err != nil {
186+ return nil , err
187+ }
188+
189+ // Execute the same request again, now with the LSAT token added
190+ // as an RPC credential.
191+ return streamer (ctx , desc , cc , method , iCtx .opts ... )
192+ }
193+
194+ // newInterceptContext creates the initial intercept context that can capture
195+ // metadata from the server and sends the local token to the server if one
196+ // already exists.
197+ func (i * Interceptor ) newInterceptContext (ctx context.Context ,
198+ opts []grpc.CallOption ) (* interceptContext , error ) {
199+
200+ iCtx := & interceptContext {
201+ mainCtx : ctx ,
202+ opts : opts ,
203+ metadata : & metadata.MD {},
117204 }
118205
119206 // Let's see if the store already contains a token and what state it
120207 // might be in. If a previous call was aborted, we might have a pending
121208 // token that needs to be handled separately.
122- token , err := i .store .CurrentToken ()
209+ var err error
210+ iCtx .token , err = i .store .CurrentToken ()
123211 switch {
124212 // If there is no token yet, nothing to do at this point.
125213 case err == ErrNoToken :
126214
127215 // Some other error happened that we have to surface.
128216 case err != nil :
129217 log .Errorf ("Failed to get token from store: %v" , err )
130- return fmt .Errorf ("getting token from store failed: %v" , err )
218+ return nil , fmt .Errorf ("getting token from store failed: %v" ,
219+ err )
131220
132221 // Only if we have a paid token append it. We don't resume a pending
133222 // payment just yet, since we don't even know if a token is required for
134223 // this call. We also never send a pending payment to the server since
135224 // we know it's not valid.
136- case ! token .isPending ():
137- if err = addLsatCredentials (token ); err != nil {
225+ case ! iCtx . token .isPending ():
226+ if err = i . addLsatCredentials (iCtx ); err != nil {
138227 log .Errorf ("Adding macaroon to request failed: %v" , err )
139- return fmt .Errorf ("adding macaroon failed: %v" , err )
228+ return nil , fmt .Errorf ("adding macaroon failed: %v" ,
229+ err )
140230 }
141231 }
142232
@@ -145,60 +235,59 @@ func (i *Interceptor) UnaryInterceptor(ctx context.Context, method string,
145235 // option. We execute the request and inspect the error. If it's the
146236 // LSAT specific payment required error, we might execute the same
147237 // method again later with the paid LSAT token.
148- trailerMetadata := & metadata.MD {}
149- opts = append (opts , grpc .Trailer (trailerMetadata ))
150- rpcCtx , cancel := context .WithTimeout (ctx , i .callTimeout )
151- defer cancel ()
152- err = invoker (rpcCtx , method , req , reply , cc , opts ... )
153-
154- // Only handle the LSAT error message that comes in the form of
155- // a gRPC status error.
156- if isPaymentRequired (err ) {
157- paidToken , err := i .handlePayment (ctx , token , trailerMetadata )
158- if err != nil {
159- return err
160- }
161- if err = addLsatCredentials (paidToken ); err != nil {
162- log .Errorf ("Adding macaroon to request failed: %v" , err )
163- return fmt .Errorf ("adding macaroon failed: %v" , err )
164- }
165-
166- // Execute the same request again, now with the LSAT
167- // token added as an RPC credential.
168- rpcCtx2 , cancel2 := context .WithTimeout (ctx , i .callTimeout )
169- defer cancel2 ()
170- return invoker (rpcCtx2 , method , req , reply , cc , opts ... )
171- }
172- return err
238+ iCtx .opts = append (iCtx .opts , grpc .Trailer (iCtx .metadata ))
239+ return iCtx , nil
173240}
174241
175242// handlePayment tries to obtain a valid token by either tracking the payment
176243// status of a pending token or paying for a new one.
177- func (i * Interceptor ) handlePayment (ctx context.Context , token * Token ,
178- md * metadata.MD ) (* Token , error ) {
179-
244+ func (i * Interceptor ) handlePayment (iCtx * interceptContext ) error {
180245 switch {
181246 // Resume/track a pending payment if it was interrupted for some reason.
182- case token != nil && token .isPending ():
247+ case iCtx . token != nil && iCtx . token .isPending ():
183248 log .Infof ("Payment of LSAT token is required, resuming/" +
184249 "tracking previous payment from pending LSAT token" )
185- err := i .trackPayment (ctx , token )
250+ err := i .trackPayment (iCtx . mainCtx , iCtx . token )
186251 if err != nil {
187- return nil , err
252+ return err
188253 }
189- return token , nil
190254
191255 // We don't have a token yet, try to get a new one.
192- case token == nil :
256+ case iCtx . token == nil :
193257 // We don't have a token yet, get a new one.
194258 log .Infof ("Payment of LSAT token is required, paying invoice" )
195- return i .payLsatToken (ctx , md )
259+ var err error
260+ iCtx .token , err = i .payLsatToken (iCtx .mainCtx , iCtx .metadata )
261+ if err != nil {
262+ return err
263+ }
196264
197265 // We have a token and it's valid, nothing more to do here.
198266 default :
199267 log .Debugf ("Found valid LSAT token to add to request" )
200- return token , nil
201268 }
269+
270+ if err := i .addLsatCredentials (iCtx ); err != nil {
271+ log .Errorf ("Adding macaroon to request failed: %v" , err )
272+ return fmt .Errorf ("adding macaroon failed: %v" , err )
273+ }
274+ return nil
275+ }
276+
277+ // addLsatCredentials adds an LSAT token to the given intercept context.
278+ func (i * Interceptor ) addLsatCredentials (iCtx * interceptContext ) error {
279+ if iCtx .token == nil {
280+ return fmt .Errorf ("cannot add nil token to context" )
281+ }
282+
283+ macaroon , err := iCtx .token .PaidMacaroon ()
284+ if err != nil {
285+ return err
286+ }
287+ iCtx .opts = append (iCtx .opts , grpc .PerRPCCredentials (
288+ macaroons .NewMacaroonCredential (macaroon ),
289+ ))
290+ return nil
202291}
203292
204293// payLsatToken reads the payment challenge from the response metadata and tries
0 commit comments