Skip to content

Commit e58856a

Browse files
fix index_in_epoch value for batch processed epoch changes.
1 parent c58c1ec commit e58856a

File tree

8 files changed

+32
-33
lines changed

8 files changed

+32
-33
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
## 1.11.0
22
Release built: _not released yet_
33

4+
### Database changes
5+
- Fixed an invalid `index_in_epoch` value for transactions processed in a batch when they belonged to different epochs.
6+
47
## 1.10.1
58
Release built: 6.03.2025
69

src/RadixDlt.NetworkGateway.DataAggregator/Services/ITopOfLedgerProvider.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,6 @@ public interface ITopOfLedgerProvider
7272
Task<TransactionSummary> GetTopOfLedger(CancellationToken token);
7373

7474
Task<long> GetLastCommittedStateVersion(CancellationToken token);
75+
76+
Task<long> GetLastCommittedEpoch(CancellationToken token);
7577
}

src/RadixDlt.NetworkGateway.DataAggregator/Services/LedgerTransactionsProcessor.cs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@ public sealed class LedgerTransactionsProcessor : ILedgerTransactionsProcessor
9292
private readonly IEnumerable<ILedgerConfirmationServiceObserver> _observers;
9393
private readonly ITopOfLedgerProvider _topOfLedgerProvider;
9494
private readonly IFetchedTransactionStore _fetchedTransactionStore;
95-
private readonly INetworkConfigurationProvider _networkConfigurationProvider;
9695
private readonly IClock _clock;
9796

9897
private LedgerConfirmationOptions Config { get; set; }
@@ -105,7 +104,6 @@ public LedgerTransactionsProcessor(
105104
IEnumerable<ILedgerConfirmationServiceObserver> observers,
106105
IFetchedTransactionStore fetchedTransactionStore,
107106
ITopOfLedgerProvider topOfLedgerProvider,
108-
INetworkConfigurationProvider networkConfigurationProvider,
109107
IClock clock)
110108
{
111109
_logger = logger;
@@ -116,19 +114,17 @@ public LedgerTransactionsProcessor(
116114
_fetchedTransactionStore = fetchedTransactionStore;
117115
_topOfLedgerProvider = topOfLedgerProvider;
118116
_clock = clock;
119-
_networkConfigurationProvider = networkConfigurationProvider;
120117
Config = _ledgerConfirmationOptionsMonitor.CurrentValue;
121118
}
122119

123120
public async Task ProcessTransactions(CancellationToken token)
124121
{
125-
var networkConfiguration = _networkConfigurationProvider.GetNetworkConfiguration();
126122
var lastCommittedTransactionSummary = await _topOfLedgerProvider.GetTopOfLedger(token);
127123
await _observers.ForEachAsync(x => x.PreHandleLedgerExtension(_clock.UtcNow));
128124

129125
Config = _ledgerConfirmationOptionsMonitor.CurrentValue;
130126

131-
var transactions = ConstructLedgerExtension(lastCommittedTransactionSummary);
127+
var transactions = ConstructLedgerExtension(lastCommittedTransactionSummary.StateVersion);
132128

133129
if (transactions.Count == 0)
134130
{
@@ -146,9 +142,9 @@ public async Task ProcessTransactions(CancellationToken token)
146142
await DelayBetweenIngestionBatchesIfRequested(commitReport);
147143
}
148144

149-
private List<CoreModel.CommittedTransaction> ConstructLedgerExtension(TransactionSummary topOfLedger)
145+
private List<CoreModel.CommittedTransaction> ConstructLedgerExtension(long topOfLedgerStateVersion)
150146
{
151-
var startStateVersion = topOfLedger.StateVersion + 1;
147+
var startStateVersion = topOfLedgerStateVersion + 1;
152148
var transactions = _fetchedTransactionStore.GetTransactionBatch(startStateVersion, (int)Config.MaxCommitBatchSize, (int)Config.MinCommitBatchSize);
153149
return transactions;
154150
}

src/RadixDlt.NetworkGateway.DataAggregator/Workers/NodeWorkers/NodeTransactionFetchWorker.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,8 +278,7 @@ private async Task<FetchedTransactions> FetchTransactions(long fromStateVersion,
278278

279279
private async Task<(long StateVersionInclusiveLowerBound, long StateVersionInclusiveUpperBound)?> GetTransactionsToFetch(string nodeName, CancellationToken cancellationToken)
280280
{
281-
var lastCommittedTransactionSummary = await _topOfLedgerProvider.GetTopOfLedger(cancellationToken);
282-
var processTransactionFromStateVersion = lastCommittedTransactionSummary.StateVersion;
281+
var processTransactionFromStateVersion = await _topOfLedgerProvider.GetLastCommittedStateVersion(cancellationToken);
283282
var maxUpperLimit = processTransactionFromStateVersion + _ledgerConfirmationOptionsMonitor.CurrentValue.MaxTransactionPipelineSizePerNode + 1;
284283

285284
var shouldFetchNewTransactions = _fetchedTransactionStore.ShouldFetchNewTransactions(nodeName, processTransactionFromStateVersion);

src/RadixDlt.NetworkGateway.PostgresIntegration/LedgerExtension/PostgresLedgerExtenderService.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,9 +131,9 @@ public async Task<CommitTransactionsReport> CommitTransactions(ConsistentLedgerE
131131

132132
try
133133
{
134-
var topOfLedgerSummary = await _topOfLedgerProvider.GetTopOfLedger(token);
134+
var lastCommittedStateVersion = await _topOfLedgerProvider.GetLastCommittedStateVersion(token);
135135

136-
TransactionConsistencyValidator.AssertLatestTransactionConsistent(ledgerExtension.LatestTransactionSummary.StateVersion, topOfLedgerSummary.StateVersion);
136+
TransactionConsistencyValidator.AssertLatestTransactionConsistent(ledgerExtension.LatestTransactionSummary.StateVersion, lastCommittedStateVersion);
137137

138138
var extendLedgerReport = await ProcessTransactions(dbContext, ledgerExtension, token);
139139

src/RadixDlt.NetworkGateway.PostgresIntegration/LedgerExtension/Processors/LedgerTransactionProcessor.cs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
using System.Collections.Generic;
7777
using System.Diagnostics;
7878
using System.Linq;
79+
using System.Runtime.InteropServices.JavaScript;
7980
using System.Threading.Tasks;
8081
using CoreModel = RadixDlt.CoreApiSdk.Model;
8182

@@ -89,11 +90,11 @@ private record TransactionData(
8990
CoreModel.CommittedTransaction RawCommittedTransaction
9091
)
9192
{
92-
public long? NewEpochIndex { get; set; }
93+
public long? RoundUpdateEpochIndex { get; set; }
9394

9495
public long? NewRoundIndex { get; set; }
9596

96-
public DateTime? RoundTimestampUpdate { get; set; }
97+
public DateTime? NewRoundTimestamp { get; set; }
9798
}
9899

99100
private readonly ProcessorContext _context;
@@ -134,15 +135,13 @@ public void VisitTransaction(CoreModel.CommittedTransaction committedTransaction
134135

135136
if (committedTransaction.LedgerTransaction is CoreModel.RoundUpdateLedgerTransaction roundUpdateTransaction)
136137
{
137-
var newEpochIndex = _lastProcessedTransactionSummary.Epoch != roundUpdateTransaction.RoundUpdateTransaction.Epoch ? roundUpdateTransaction.RoundUpdateTransaction.Epoch : (long?)null;
138-
139138
_transactionData.Update(
140139
stateVersion,
141140
existing =>
142141
{
143-
existing.NewEpochIndex = newEpochIndex;
142+
existing.RoundUpdateEpochIndex = roundUpdateTransaction.RoundUpdateTransaction.Epoch;
144143
existing.NewRoundIndex = roundUpdateTransaction.RoundUpdateTransaction.RoundInEpoch;
145-
existing.RoundTimestampUpdate = DateTimeOffset.FromUnixTimeMilliseconds(roundUpdateTransaction.RoundUpdateTransaction.ProposerTimestamp.UnixTimestampMs).UtcDateTime;
144+
existing.NewRoundTimestamp = DateTimeOffset.FromUnixTimeMilliseconds(roundUpdateTransaction.RoundUpdateTransaction.ProposerTimestamp.UnixTimestampMs).UtcDateTime;
146145
});
147146
}
148147
}
@@ -155,7 +154,7 @@ public void VisitUpsert(CoreModel.IUpsertedSubstate substate, ReferencedEntity r
155154
{
156155
_transactionData.Update(
157156
stateVersion,
158-
existing => { existing.RoundTimestampUpdate = DateTimeOffset.FromUnixTimeMilliseconds(currentTime.Value.ProposerTimestamp.UnixTimestampMs).UtcDateTime; });
157+
existing => { existing.NewRoundTimestamp = DateTimeOffset.FromUnixTimeMilliseconds(currentTime.Value.ProposerTimestamp.UnixTimestampMs).UtcDateTime; });
159158
}
160159
}
161160

@@ -171,17 +170,18 @@ public void ProcessChanges()
171170
var stateVersion = data.StateVersion;
172171
var committedTransaction = data.RawCommittedTransaction;
173172

174-
var roundTimestamp = data.RoundTimestampUpdate ?? _lastProcessedTransactionSummary.RoundTimestamp;
175-
173+
var roundInEpoch = data.NewRoundIndex ?? _lastProcessedTransactionSummary.RoundInEpoch;
174+
var roundTimestamp = data.NewRoundTimestamp ?? _lastProcessedTransactionSummary.RoundTimestamp;
176175
var normalizedRoundTimestamp =
177176
roundTimestamp < _lastProcessedTransactionSummary.NormalizedRoundTimestamp ? _lastProcessedTransactionSummary.NormalizedRoundTimestamp
178177
: roundTimestamp > data.CreatedAt ? data.CreatedAt
179178
: roundTimestamp;
180179

181-
var epoch = data.NewEpochIndex ?? _lastProcessedTransactionSummary.Epoch;
182-
var roundInEpoch = data.NewRoundIndex ?? _lastProcessedTransactionSummary.RoundInEpoch;
183-
var indexInEpoch = data.NewEpochIndex.HasValue ? 0 : _lastProcessedTransactionSummary.IndexInEpoch + 1;
184-
var indexInRound = data.NewEpochIndex.HasValue ? 0 : _lastProcessedTransactionSummary.IndexInRound + 1;
180+
var epoch = data.RoundUpdateEpochIndex ?? _lastProcessedTransactionSummary.Epoch;
181+
bool isNewEpoch = epoch != _lastProcessedTransactionSummary.Epoch;
182+
183+
var indexInEpoch = isNewEpoch ? 0 : _lastProcessedTransactionSummary.IndexInEpoch + 1;
184+
var indexInRound = data.NewRoundIndex.HasValue ? 0 : _lastProcessedTransactionSummary.IndexInRound + 1;
185185

186186
LedgerTransaction ledgerTransaction = committedTransaction.LedgerTransaction switch
187187
{
@@ -277,7 +277,7 @@ public void ProcessChanges()
277277
}
278278

279279
var isUserTransaction = committedTransaction.LedgerTransaction is CoreModel.UserLedgerTransaction or CoreModel.UserLedgerTransactionV2;
280-
var isUserTransactionOrEpochChange = isUserTransaction || data.NewEpochIndex.HasValue;
280+
var isUserTransactionOrEpochChange = isUserTransaction || isNewEpoch;
281281

282282
if (_context.StorageOptions.StoreReceiptStateUpdates == LedgerTransactionStorageOption.StoreForAllTransactions ||
283283
(_context.StorageOptions.StoreReceiptStateUpdates == LedgerTransactionStorageOption.StoreOnlyForUserTransactions && isUserTransaction) ||

src/RadixDlt.NetworkGateway.PostgresIntegration/Services/PendingTransactions/PendingTransactionResubmissionService.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -302,8 +302,7 @@ private async Task<ContextualSubmissionResult> Resubmit(PendingTransactionWithCh
302302

303303
private async Task<long> GetCurrentEpoch(CancellationToken cancellationToken)
304304
{
305-
var topOfLedger = await _topOfLedgerProvider.GetTopOfLedger(cancellationToken);
306-
var signedEpoch = topOfLedger.Epoch;
307-
return (signedEpoch >= 0) ? signedEpoch : throw new InvalidStateException($"Epoch was negative: {signedEpoch}");
305+
var lastCommitedEpoch = await _topOfLedgerProvider.GetLastCommittedEpoch(cancellationToken);
306+
return (lastCommitedEpoch >= 0) ? lastCommitedEpoch : throw new InvalidStateException($"Epoch was negative: {lastCommitedEpoch}");
308307
}
309308
}

src/RadixDlt.NetworkGateway.PostgresIntegration/Services/TopOfLedgerProvider.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,12 @@ public TopOfLedgerProvider(Abstractions.IClock clock, INetworkConfigurationProvi
8787

8888
public async Task<long> GetLastCommittedStateVersion(CancellationToken token)
8989
{
90-
var dbContext = await _dbContextFactory.CreateDbContextAsync(token);
90+
return (await GetTopOfLedger(token)).StateVersion;
91+
}
9192

92-
return await dbContext.GetTopLedgerTransaction()
93-
.AsNoTracking()
94-
.Select(t => t.StateVersion)
95-
.FirstOrDefaultAsync(token); // Defaults to 0, which is perfect
93+
public async Task<long> GetLastCommittedEpoch(CancellationToken token)
94+
{
95+
return (await GetTopOfLedger(token)).Epoch;
9696
}
9797

9898
public async Task<TransactionSummary> GetTopOfLedger(CancellationToken token)

0 commit comments

Comments
 (0)