Skip to content

Commit 582778e

Browse files
authored
fix(utxorpc): waitForTx call (#304)
Signed-off-by: Ales Verbic <[email protected]>
1 parent b471b25 commit 582778e

File tree

2 files changed

+88
-51
lines changed

2 files changed

+88
-51
lines changed

internal/node/chainsync.go

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,31 @@ func chainSyncRollForwardHandler(
8282
) error {
8383
cfg := config.GetConfig()
8484
if connCfg.ChainSyncEventChan != nil {
85-
var block ledger.Block
8685
switch v := blockData.(type) {
8786
case ledger.Block:
88-
block = v
87+
// Emit block-level event
88+
blockEvt := event.New(
89+
"chainsync.block",
90+
time.Now(),
91+
input_chainsync.NewBlockContext(v, cfg.Node.NetworkMagic),
92+
input_chainsync.NewBlockEvent(v, true),
93+
)
94+
connCfg.ChainSyncEventChan <- blockEvt
95+
// Emit transaction-level events
96+
for t, transaction := range v.Transactions() {
97+
// TODO: do we need to resolve inputs?
98+
// resolvedInputs, err := resolveTransactionInputs(transaction, connCfg)
99+
// if err != nil {
100+
// return fmt.Errorf("failed to resolve inputs for transaction: %w", err)
101+
// }
102+
txEvt := event.New(
103+
"chainsync.transaction",
104+
time.Now(),
105+
input_chainsync.NewTransactionContext(v, transaction, uint32(t), cfg.Node.NetworkMagic),
106+
input_chainsync.NewTransactionEvent(v, transaction, true, nil),
107+
)
108+
connCfg.ChainSyncEventChan <- txEvt
109+
}
89110
/*
90111
case ledger.BlockHeader:
91112
blockSlot := v.SlotNumber()
@@ -102,13 +123,6 @@ func chainSyncRollForwardHandler(
102123
default:
103124
return fmt.Errorf("unknown block data")
104125
}
105-
evt := event.New(
106-
"chainsync.block",
107-
time.Now(),
108-
input_chainsync.NewBlockContext(block, cfg.Node.NetworkMagic),
109-
input_chainsync.NewBlockEvent(block, true),
110-
)
111-
connCfg.ChainSyncEventChan <- evt
112126
}
113127
return nil
114128
}

internal/utxorpc/submit.go

Lines changed: 65 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"encoding/hex"
2020
"fmt"
2121
"log"
22+
"log/slog"
2223

2324
connect "connectrpc.com/connect"
2425
"github.com/blinklabs-io/adder/event"
@@ -27,7 +28,6 @@ import (
2728
ocommon "github.com/blinklabs-io/gouroboros/protocol/common"
2829
submit "github.com/utxorpc/go-codegen/utxorpc/v1alpha/submit"
2930
"github.com/utxorpc/go-codegen/utxorpc/v1alpha/submit/submitconnect"
30-
"golang.org/x/crypto/blake2b"
3131

3232
"github.com/blinklabs-io/cardano-node-api/internal/node"
3333
)
@@ -104,83 +104,106 @@ func (s *submitServiceServer) SubmitTx(
104104
return connect.NewResponse(resp), nil
105105
}
106106

107-
// WaitForTx
108107
func (s *submitServiceServer) WaitForTx(
109108
ctx context.Context,
110109
req *connect.Request[submit.WaitForTxRequest],
111110
stream *connect.ServerStream[submit.WaitForTxResponse],
112111
) error {
113112

113+
logger := slog.With("component", "WaitForTx")
114114
ref := req.Msg.GetRef() // [][]byte
115-
log.Printf("Got a WaitForTx request with %d transactions", len(ref))
115+
logger.Info("Received WaitForTx request", "transaction_count", len(ref))
116+
117+
// Log the transaction references at debug level
118+
for i, r := range ref {
119+
logger.Debug("Transaction reference", "index", i, "ref", hex.EncodeToString(r))
120+
}
116121

117122
// Setup event channel
118-
eventChan := make(chan event.Event, 10)
123+
eventChan := make(chan event.Event, 100) // Increased buffer size for high-throughput
119124
connCfg := node.ConnectionConfig{
120125
ChainSyncEventChan: eventChan,
121126
}
127+
122128
// Connect to node
129+
logger.Debug("Establishing connection to Ouroboros node...")
123130
oConn, err := node.GetConnection(&connCfg)
124131
if err != nil {
132+
logger.Error("Failed to connect to node", "error", err)
125133
return err
126134
}
127135
defer func() {
128-
// Close Ouroboros connection
136+
logger.Debug("Closing connection to Ouroboros node.")
129137
oConn.Close()
130138
}()
131139

132-
// Get our starting point
133-
var point ocommon.Point
140+
// Get the current chain tip
134141
tip, err := oConn.ChainSync().Client.GetCurrentTip()
135142
if err != nil {
136-
log.Printf("ERROR: %s", err)
143+
logger.Error("Error retrieving current tip", "error", err)
137144
return err
138145
}
139-
point = tip.Point
146+
logger.Debug("Current chain tip retrieved", "tip", tip)
140147

141148
// Start the sync with the node
142-
err = oConn.ChainSync().Client.Sync([]ocommon.Point{point})
149+
logger.Debug("Starting chain synchronization...")
150+
err = oConn.ChainSync().Client.Sync([]ocommon.Point{tip.Point})
143151
if err != nil {
144-
log.Printf("ERROR: %s", err)
152+
logger.Error("Error during chain synchronization", "error", err)
145153
return err
146154
}
147155

156+
// Context cancellation handling
157+
go func() {
158+
<-ctx.Done()
159+
logger.Debug("Client canceled the request. Stopping event processing.")
160+
close(eventChan)
161+
}()
162+
148163
// Wait for events
164+
logger.Debug("Waiting for transaction events...")
149165
for {
150-
evt, ok := <-eventChan
151-
if !ok {
152-
log.Printf("ERROR: channel closed")
153-
return fmt.Errorf("ERROR: channel closed")
154-
}
166+
select {
167+
case <-ctx.Done():
168+
logger.Info("Context canceled. Exiting event loop.")
169+
return ctx.Err()
170+
case evt, ok := <-eventChan:
171+
if !ok {
172+
logger.Error("Event channel closed unexpectedly.")
173+
return fmt.Errorf("event channel closed")
174+
}
155175

156-
switch v := evt.Payload.(type) {
157-
case input_chainsync.TransactionEvent:
158-
for _, r := range ref {
159-
resp := &submit.WaitForTxResponse{}
160-
resp.Ref = r
161-
resp.Stage = submit.Stage_STAGE_UNSPECIFIED
162-
tc := evt.Context.(input_chainsync.TransactionContext)
163-
// taken from gOuroboros generateTransactionHash
164-
tmpHash, err := blake2b.New256(nil)
165-
if err != nil {
166-
return err
167-
}
168-
tmpHash.Write(r)
169-
txHash := hex.EncodeToString(tmpHash.Sum(nil))
170-
// Compare against our event's hash
171-
if txHash == v.Transaction.Hash() {
172-
resp.Stage = submit.Stage_STAGE_CONFIRMED
173-
// Send it!
174-
err = stream.Send(resp)
175-
if err != nil {
176-
return err
176+
// Process the event
177+
switch v := evt.Payload.(type) {
178+
case input_chainsync.TransactionEvent:
179+
logger.Debug("Received TransactionEvent", "hash", v.Transaction.Hash())
180+
for _, r := range ref {
181+
refHash := hex.EncodeToString(r)
182+
eventHash := v.Transaction.Hash()
183+
184+
logger.Debug("Comparing TransactionEvent with reference", "event_hash", eventHash, "reference_hash", refHash)
185+
if refHash == eventHash {
186+
logger.Info("Transaction matches reference", "hash", eventHash)
187+
188+
// Send confirmation response
189+
err = stream.Send(&submit.WaitForTxResponse{
190+
Ref: r,
191+
Stage: submit.Stage_STAGE_CONFIRMED,
192+
})
193+
if err != nil {
194+
if ctx.Err() != nil {
195+
logger.Warn("Client disconnected while sending response", "error", ctx.Err())
196+
return ctx.Err()
197+
}
198+
logger.Error("Error sending response to client", "transaction_hash", eventHash, "error", err)
199+
return err
200+
}
201+
logger.Info("Confirmation response sent", "transaction_hash", eventHash)
202+
return nil // Stop processing after confirming the transaction
177203
}
178-
log.Printf(
179-
"transaction: id: %d, hash: %s",
180-
tc.TransactionIdx,
181-
tc.TransactionHash,
182-
)
183204
}
205+
default:
206+
logger.Debug("Received unsupported event type", "type", evt.Type)
184207
}
185208
}
186209
}

0 commit comments

Comments
 (0)