Skip to content

Commit 10acb6f

Browse files
authored
Merge pull request #787 from oasisprotocol/ptrus/fix/logs-tx-index
logs: Fix transaction index
2 parents 6774b94 + fdb6eee commit 10acb6f

File tree

4 files changed

+442
-2
lines changed

4 files changed

+442
-2
lines changed

indexer/utils.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ func (ib *indexBackend) StoreBlockData(ctx context.Context, oasisBlock *block.Bl
383383
// Ignore any other events.
384384
}
385385
}
386-
logs = append(logs, Logs2EthLogs(oasisLogs, blockNum, bhash, ethTx.Hash(), uint(len(logs)), uint32(txIndex))...)
386+
logs = append(logs, Logs2EthLogs(oasisLogs, blockNum, bhash, ethTx.Hash(), uint(len(logs)), uint32(len(ethTxs)))...)
387387

388388
// Emerald GasUsed events were added in version 7.0.0.
389389
// Default to using gas limit, which was the behaviour before.

main.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/oasisprotocol/oasis-web3-gateway/storage"
3636
"github.com/oasisprotocol/oasis-web3-gateway/storage/psql"
3737
"github.com/oasisprotocol/oasis-web3-gateway/version"
38+
"github.com/oasisprotocol/oasis-web3-gateway/worker"
3839
)
3940

4041
var (
@@ -237,11 +238,23 @@ func runRoot() error {
237238

238239
// Initialize db again, now with configured timeouts.
239240
var storage storage.Storage
240-
storage, err = psql.InitDB(ctx, cfg.Database, false, dbReadOnly)
241+
rawStorage, err := psql.InitDB(ctx, cfg.Database, false, dbReadOnly)
241242
if err != nil {
242243
logger.Error("failed to initialize db", "err", err)
243244
return err
244245
}
246+
storage = rawStorage
247+
248+
// Start log tx_index fixer worker
249+
if bunDB, ok := rawStorage.DB.(*bun.DB); ok {
250+
logTxIndexFixer := worker.NewLogTxIndexFixer(bunDB)
251+
go func() {
252+
if err := logTxIndexFixer.Start(ctx); err != nil {
253+
logger.Error("log tx_index fixer worker failed", "err", err)
254+
}
255+
}()
256+
}
257+
245258
// Monitoring if enabled.
246259
if cfg.Gateway.Monitoring.Enabled() {
247260
storage = psql.NewMetricsWrapper(storage)

worker/log_tx_index_fixer.go

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
// Package worker provides background workers for maintenance tasks.
2+
package worker
3+
4+
import (
5+
"context"
6+
"database/sql"
7+
"errors"
8+
"fmt"
9+
"time"
10+
11+
"github.com/uptrace/bun"
12+
13+
"github.com/oasisprotocol/oasis-core/go/common/logging"
14+
)
15+
16+
const (
17+
batchSize = 10_000
18+
workerSleepTime = 1 * time.Second
19+
backoffSleepTime = 30 * time.Second
20+
)
21+
22+
// LogTxIndexFixer is a background worker that fixes log tx_index mismatches.
23+
//
24+
// Fixes historical log tx_index data that was incorrectly indexed prior to
25+
// the fix in issue #786 (https://github.com/oasisprotocol/oasis-web3-gateway/issues/786).
26+
type LogTxIndexFixer struct {
27+
db *bun.DB
28+
logger *logging.Logger
29+
}
30+
31+
// NewLogTxIndexFixer creates a new LogTxIndexFixer worker.
32+
func NewLogTxIndexFixer(db *bun.DB) *LogTxIndexFixer {
33+
return &LogTxIndexFixer{
34+
db: db,
35+
logger: logging.GetLogger("log-tx-index-fixer"),
36+
}
37+
}
38+
39+
// Start begins the worker with the given context.
40+
func (w *LogTxIndexFixer) Start(ctx context.Context) error {
41+
w.logger.Info("starting log tx_index fixer worker")
42+
43+
// Find the highest mismatched round once at the start.
44+
highestMismatchedRound, err := w.findHighestMismatchedRound(ctx)
45+
if err != nil {
46+
return fmt.Errorf("failed to find highest mismatched round: %w", err)
47+
}
48+
49+
if highestMismatchedRound == 0 {
50+
w.logger.Info("all log tx_index mismatches fixed, worker stopping")
51+
return nil
52+
}
53+
54+
w.logger.Info("found mismatches, starting fix process", "highest_round", highestMismatchedRound)
55+
56+
// Process batches from highest round down to 0.
57+
currentEndRound := highestMismatchedRound
58+
for {
59+
select {
60+
case <-ctx.Done():
61+
w.logger.Info("log tx_index fixer worker stopped")
62+
return ctx.Err()
63+
default:
64+
}
65+
66+
// Calculate batch range.
67+
var startRound uint64
68+
if currentEndRound >= batchSize {
69+
startRound = currentEndRound - batchSize + 1
70+
} else {
71+
startRound = 0
72+
}
73+
74+
w.logger.Debug("fixing batch", "start_round", startRound, "end_round", currentEndRound)
75+
76+
rowsAffected, err := w.fixRoundRange(ctx, startRound, currentEndRound)
77+
if err != nil {
78+
w.logger.Error("error fixing log tx_index batch", "err", err, "start_round", startRound, "end_round", currentEndRound)
79+
time.Sleep(backoffSleepTime)
80+
continue
81+
}
82+
83+
w.logger.Debug("fixed log tx_index batch",
84+
"start_round", startRound,
85+
"end_round", currentEndRound,
86+
"rows_affected", rowsAffected)
87+
88+
// Move to next batch, unless done.
89+
if startRound == 0 {
90+
break
91+
}
92+
currentEndRound = startRound - 1
93+
94+
time.Sleep(workerSleepTime)
95+
}
96+
97+
w.logger.Info("all log tx_index mismatches fixed, worker stopping")
98+
return nil
99+
}
100+
101+
// findHighestMismatchedRound finds the highest round number that has log tx_index mismatches.
102+
func (w *LogTxIndexFixer) findHighestMismatchedRound(ctx context.Context) (uint64, error) {
103+
query := `
104+
SELECT l.round
105+
FROM logs l
106+
JOIN transactions t ON l.tx_hash = t.hash
107+
WHERE l.tx_index != t.index
108+
ORDER BY l.round DESC
109+
LIMIT 1
110+
`
111+
112+
var round *uint64
113+
err := w.db.NewRaw(query).Scan(ctx, &round)
114+
if err != nil {
115+
if errors.Is(err, sql.ErrNoRows) {
116+
return 0, nil // No mismatches found
117+
}
118+
return 0, fmt.Errorf("finding highest mismatched round: %w", err)
119+
}
120+
121+
if round == nil {
122+
return 0, nil // No mismatches found
123+
}
124+
125+
return *round, nil
126+
}
127+
128+
// fixRoundRange fixes log tx_index mismatches for a range of rounds.
129+
func (w *LogTxIndexFixer) fixRoundRange(ctx context.Context, startRound, endRound uint64) (int64, error) {
130+
query := `
131+
UPDATE logs
132+
SET tx_index = t.index
133+
FROM transactions t
134+
WHERE logs.tx_hash = t.hash
135+
AND logs.round BETWEEN ? AND ?
136+
AND logs.tx_index != t.index
137+
`
138+
139+
result, err := w.db.NewRaw(query, startRound, endRound).Exec(ctx)
140+
if err != nil {
141+
return 0, err
142+
}
143+
144+
rowsAffected, err := result.RowsAffected()
145+
if err != nil {
146+
return 0, err
147+
}
148+
149+
return rowsAffected, nil
150+
}

0 commit comments

Comments
 (0)