@@ -16,9 +16,16 @@ import (
1616 "github.com/lightninglabs/loop"
1717 "github.com/lightninglabs/loop/lndclient"
1818 "github.com/lightninglabs/loop/looprpc"
19+ "github.com/lightningnetwork/lnd/lntypes"
1920 "google.golang.org/grpc"
2021)
2122
23+ var (
24+ // maxMsgRecvSize is the largest message our REST proxy will receive. We
25+ // set this to 200MiB atm.
26+ maxMsgRecvSize = grpc .MaxCallRecvMsgSize (1 * 1024 * 1024 * 200 )
27+ )
28+
2229// listenerCfg holds closures used to retrieve listeners for the gRPC services.
2330type listenerCfg struct {
2431 // grpcListener returns a listener to use for the gRPC server.
@@ -68,14 +75,18 @@ func daemon(config *config, lisCfg *listenerCfg) error {
6875 return err
6976 }
7077
78+ swaps := make (map [lntypes.Hash ]loop.SwapInfo )
7179 for _ , s := range swapsList {
7280 swaps [s .SwapHash ] = * s
7381 }
7482
7583 // Instantiate the loopd gRPC server.
7684 server := swapClientServer {
77- impl : swapClient ,
78- lnd : & lnd .LndServices ,
85+ impl : swapClient ,
86+ lnd : & lnd .LndServices ,
87+ swaps : swaps ,
88+ subscribers : make (map [int ]chan <- interface {}),
89+ statusChan : make (chan loop.SwapInfo ),
7990 }
8091
8192 serverOpts := []grpc.ServerOption {}
@@ -92,12 +103,26 @@ func daemon(config *config, lisCfg *listenerCfg) error {
92103 }
93104 defer grpcListener .Close ()
94105
106+ // The default JSON marshaler of the REST proxy only sets OrigName to
107+ // true, which instructs it to use the same field names as specified in
108+ // the proto file and not switch to camel case. What we also want is
109+ // that the marshaler prints all values, even if they are falsey.
110+ customMarshalerOption := proxy .WithMarshalerOption (
111+ proxy .MIMEWildcard , & proxy.JSONPb {
112+ OrigName : true ,
113+ EmitDefaults : true ,
114+ },
115+ )
116+
95117 // We'll also create and start an accompanying proxy to serve clients
96118 // through REST.
97119 ctx , cancel := context .WithCancel (context .Background ())
98120 defer cancel ()
99- mux := proxy .NewServeMux ()
100- proxyOpts := []grpc.DialOption {grpc .WithInsecure ()}
121+ mux := proxy .NewServeMux (customMarshalerOption )
122+ proxyOpts := []grpc.DialOption {
123+ grpc .WithInsecure (),
124+ grpc .WithDefaultCallOptions (maxMsgRecvSize ),
125+ }
101126 err = looprpc .RegisterSwapClientHandlerFromEndpoint (
102127 ctx , mux , config .RPCListen , proxyOpts ,
103128 )
@@ -130,8 +155,6 @@ func daemon(config *config, lisCfg *listenerCfg) error {
130155 log .Infof ("REST proxy disabled" )
131156 }
132157
133- statusChan := make (chan loop.SwapInfo )
134-
135158 mainCtx , cancel := context .WithCancel (context .Background ())
136159 var wg sync.WaitGroup
137160
@@ -141,7 +164,7 @@ func daemon(config *config, lisCfg *listenerCfg) error {
141164 defer wg .Done ()
142165
143166 log .Infof ("Starting swap client" )
144- err := swapClient .Run (mainCtx , statusChan )
167+ err := swapClient .Run (mainCtx , server . statusChan )
145168 if err != nil {
146169 log .Error (err )
147170 }
@@ -159,25 +182,7 @@ func daemon(config *config, lisCfg *listenerCfg) error {
159182 defer wg .Done ()
160183
161184 log .Infof ("Waiting for updates" )
162- for {
163- select {
164- case swap := <- statusChan :
165- swapsLock .Lock ()
166- swaps [swap .SwapHash ] = swap
167-
168- for _ , subscriber := range subscribers {
169- select {
170- case subscriber <- swap :
171- case <- mainCtx .Done ():
172- return
173- }
174- }
175-
176- swapsLock .Unlock ()
177- case <- mainCtx .Done ():
178- return
179- }
180- }
185+ server .processStatusUpdates (mainCtx )
181186 }()
182187
183188 // Start the grpc server.
0 commit comments