@@ -19,6 +19,7 @@ import (
1919 "encoding/hex"
2020 "fmt"
2121 "log"
22+ "log/slog"
2223
2324 connect "connectrpc.com/connect"
2425 "github.com/blinklabs-io/adder/event"
@@ -27,7 +28,6 @@ import (
2728 ocommon "github.com/blinklabs-io/gouroboros/protocol/common"
2829 submit "github.com/utxorpc/go-codegen/utxorpc/v1alpha/submit"
2930 "github.com/utxorpc/go-codegen/utxorpc/v1alpha/submit/submitconnect"
30- "golang.org/x/crypto/blake2b"
3131
3232 "github.com/blinklabs-io/cardano-node-api/internal/node"
3333)
@@ -104,83 +104,106 @@ func (s *submitServiceServer) SubmitTx(
104104 return connect .NewResponse (resp ), nil
105105}
106106
107- // WaitForTx
108107func (s * submitServiceServer ) WaitForTx (
109108 ctx context.Context ,
110109 req * connect.Request [submit.WaitForTxRequest ],
111110 stream * connect.ServerStream [submit.WaitForTxResponse ],
112111) error {
113112
113+ logger := slog .With ("component" , "WaitForTx" )
114114 ref := req .Msg .GetRef () // [][]byte
115- log .Printf ("Got a WaitForTx request with %d transactions" , len (ref ))
115+ logger .Info ("Received WaitForTx request" , "transaction_count" , len (ref ))
116+
117+ // Log the transaction references at debug level
118+ for i , r := range ref {
119+ logger .Debug ("Transaction reference" , "index" , i , "ref" , hex .EncodeToString (r ))
120+ }
116121
117122 // Setup event channel
118- eventChan := make (chan event.Event , 10 )
123+ eventChan := make (chan event.Event , 100 ) // Increased buffer size for high-throughput
119124 connCfg := node.ConnectionConfig {
120125 ChainSyncEventChan : eventChan ,
121126 }
127+
122128 // Connect to node
129+ logger .Debug ("Establishing connection to Ouroboros node..." )
123130 oConn , err := node .GetConnection (& connCfg )
124131 if err != nil {
132+ logger .Error ("Failed to connect to node" , "error" , err )
125133 return err
126134 }
127135 defer func () {
128- // Close Ouroboros connection
136+ logger . Debug ( "Closing connection to Ouroboros node." )
129137 oConn .Close ()
130138 }()
131139
132- // Get our starting point
133- var point ocommon.Point
140+ // Get the current chain tip
134141 tip , err := oConn .ChainSync ().Client .GetCurrentTip ()
135142 if err != nil {
136- log . Printf ( "ERROR: %s " , err )
143+ logger . Error ( "Error retrieving current tip" , "error " , err )
137144 return err
138145 }
139- point = tip . Point
146+ logger . Debug ( "Current chain tip retrieved" , "tip" , tip )
140147
141148 // Start the sync with the node
142- err = oConn .ChainSync ().Client .Sync ([]ocommon.Point {point })
149+ logger .Debug ("Starting chain synchronization..." )
150+ err = oConn .ChainSync ().Client .Sync ([]ocommon.Point {tip .Point })
143151 if err != nil {
144- log . Printf ( "ERROR: %s " , err )
152+ logger . Error ( "Error during chain synchronization" , "error " , err )
145153 return err
146154 }
147155
156+ // Context cancellation handling
157+ go func () {
158+ <- ctx .Done ()
159+ logger .Debug ("Client canceled the request. Stopping event processing." )
160+ close (eventChan )
161+ }()
162+
148163 // Wait for events
164+ logger .Debug ("Waiting for transaction events..." )
149165 for {
150- evt , ok := <- eventChan
151- if ! ok {
152- log .Printf ("ERROR: channel closed" )
153- return fmt .Errorf ("ERROR: channel closed" )
154- }
166+ select {
167+ case <- ctx .Done ():
168+ logger .Info ("Context canceled. Exiting event loop." )
169+ return ctx .Err ()
170+ case evt , ok := <- eventChan :
171+ if ! ok {
172+ logger .Error ("Event channel closed unexpectedly." )
173+ return fmt .Errorf ("event channel closed" )
174+ }
155175
156- switch v := evt .Payload .(type ) {
157- case input_chainsync.TransactionEvent :
158- for _ , r := range ref {
159- resp := & submit.WaitForTxResponse {}
160- resp .Ref = r
161- resp .Stage = submit .Stage_STAGE_UNSPECIFIED
162- tc := evt .Context .(input_chainsync.TransactionContext )
163- // taken from gOuroboros generateTransactionHash
164- tmpHash , err := blake2b .New256 (nil )
165- if err != nil {
166- return err
167- }
168- tmpHash .Write (r )
169- txHash := hex .EncodeToString (tmpHash .Sum (nil ))
170- // Compare against our event's hash
171- if txHash == v .Transaction .Hash () {
172- resp .Stage = submit .Stage_STAGE_CONFIRMED
173- // Send it!
174- err = stream .Send (resp )
175- if err != nil {
176- return err
176+ // Process the event
177+ switch v := evt .Payload .(type ) {
178+ case input_chainsync.TransactionEvent :
179+ logger .Debug ("Received TransactionEvent" , "hash" , v .Transaction .Hash ())
180+ for _ , r := range ref {
181+ refHash := hex .EncodeToString (r )
182+ eventHash := v .Transaction .Hash ()
183+
184+ logger .Debug ("Comparing TransactionEvent with reference" , "event_hash" , eventHash , "reference_hash" , refHash )
185+ if refHash == eventHash {
186+ logger .Info ("Transaction matches reference" , "hash" , eventHash )
187+
188+ // Send confirmation response
189+ err = stream .Send (& submit.WaitForTxResponse {
190+ Ref : r ,
191+ Stage : submit .Stage_STAGE_CONFIRMED ,
192+ })
193+ if err != nil {
194+ if ctx .Err () != nil {
195+ logger .Warn ("Client disconnected while sending response" , "error" , ctx .Err ())
196+ return ctx .Err ()
197+ }
198+ logger .Error ("Error sending response to client" , "transaction_hash" , eventHash , "error" , err )
199+ return err
200+ }
201+ logger .Info ("Confirmation response sent" , "transaction_hash" , eventHash )
202+ return nil // Stop processing after confirming the transaction
177203 }
178- log .Printf (
179- "transaction: id: %d, hash: %s" ,
180- tc .TransactionIdx ,
181- tc .TransactionHash ,
182- )
183204 }
205+ default :
206+ logger .Debug ("Received unsupported event type" , "type" , evt .Type )
184207 }
185208 }
186209 }
0 commit comments