Skip to content

Commit d8ea67f

Browse files
authored
fix: do not mark a record as processed if it was not processed successfully (#36)
1 parent 338e921 commit d8ea67f

File tree

1 file changed

+9
-4
lines changed

1 file changed

+9
-4
lines changed

internal/consolidator/consolidator.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ func (c *Consolidator) Consolidate(ctx context.Context) error {
163163
log.Infof("Processing %d unprocessed records", len(records))
164164

165165
// Process each record (each record represents a batch of receipts for a single node)
166+
successfulRecords := make([]egress.EgressRecord, 0, len(records))
166167
for _, record := range records {
167168
var rcpt capegress.ConsolidateReceipt
168169
totalEgress := uint64(0)
@@ -204,6 +205,8 @@ func (c *Consolidator) Consolidate(ctx context.Context) error {
204205

205206
rcpt, err = c.execConsolidateInvocation(ctx, consolidateInv)
206207
if err != nil {
208+
bLog.Errorf("executing consolidation invocation: %v", err)
209+
207210
rcpt, err = c.issueErrorReceipt(consolidateInv, capegress.NewConsolidateError(err.Error()))
208211
if err != nil {
209212
bLog.Errorf("issuing error receipt: %v", err)
@@ -214,7 +217,7 @@ func (c *Consolidator) Consolidate(ctx context.Context) error {
214217
o, x := result.Unwrap(rcpt.Out())
215218
var emptyErr capegress.ConsolidateError
216219
if x != emptyErr {
217-
bLog.Errorf("invocation failed: %s", x.Message)
220+
bLog.Errorf("consolidation error: %s", x.Message)
218221
} else {
219222
totalEgress = o.TotalEgress
220223
}
@@ -225,6 +228,8 @@ func (c *Consolidator) Consolidate(ctx context.Context) error {
225228
continue
226229
}
227230

231+
successfulRecords = append(successfulRecords, record)
232+
228233
// Increment consolidated bytes counter for this node
229234
nodeAttr := attribute.String("node", record.Node.String())
230235
metrics.ConsolidatedBytesPerNode.Add(ctx, int64(totalEgress), metric.WithAttributeSet(attribute.NewSet(nodeAttr)))
@@ -233,13 +238,13 @@ func (c *Consolidator) Consolidate(ctx context.Context) error {
233238
}
234239

235240
// Mark records as processed
236-
if err := c.egressTable.MarkAsProcessed(ctx, records); err != nil {
241+
if err := c.egressTable.MarkAsProcessed(ctx, successfulRecords); err != nil {
237242
return fmt.Errorf("marking records as processed: %w", err)
238243
}
239244

240-
metrics.UnprocessedBatches.Add(ctx, int64(-len(records)))
245+
metrics.UnprocessedBatches.Add(ctx, int64(-len(successfulRecords)))
241246

242-
log.Infof("Consolidation cycle completed. Processed %d records", len(records))
247+
log.Infof("Consolidation cycle completed. Processed %d records (%d successful)", len(records), len(successfulRecords))
243248

244249
return nil
245250
}

0 commit comments

Comments
 (0)