Skip to content

Commit 7038737

Browse files
committed
refactor
1 parent c04bade commit 7038737

File tree

1 file changed

+12
-19
lines changed

1 file changed

+12
-19
lines changed

source/logrepl/combined.go

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -119,28 +119,21 @@ func (c *CombinedIterator) NextN(ctx context.Context, n int) ([]opencdc.Record,
119119
return nil, fmt.Errorf("n must be greater than 0, got %d", n)
120120
}
121121

122-
// Check if the active iterator already implements NextN (like the CDCIterator)
123-
if nextN, ok := c.activeIterator.(interface {
124-
NextN(context.Context, int) ([]opencdc.Record, error)
125-
}); ok {
126-
records, err := nextN.NextN(ctx, n)
127-
if err != nil {
128-
// Snapshot iterator is done, handover to CDC iterator
129-
if errors.Is(err, snapshot.ErrIteratorDone) {
130-
if err := c.useCDCIterator(ctx); err != nil {
131-
return nil, err
132-
}
133-
sdk.Logger(ctx).Debug().Msg("Snapshot completed, switching to CDC mode")
134-
135-
// Retry with new iterator
136-
return c.NextN(ctx, n)
122+
records, err := c.activeIterator.NextN(ctx, n)
123+
if err != nil {
124+
// Snapshot iterator is done, handover to CDC iterator
125+
if errors.Is(err, snapshot.ErrIteratorDone) {
126+
if err := c.useCDCIterator(ctx); err != nil {
127+
return nil, err
137128
}
138-
return nil, fmt.Errorf("failed to fetch records in batch: %w", err)
129+
sdk.Logger(ctx).Debug().Msg("Snapshot completed, switching to CDC mode")
130+
131+
// Retry with new iterator
132+
return c.NextN(ctx, n)
139133
}
140-
return records, nil
134+
return nil, fmt.Errorf("failed to fetch records in batch: %w", err)
141135
}
142-
143-
return nil, nil
136+
return records, nil
144137
}
145138

146139
func (c *CombinedIterator) Ack(ctx context.Context, p opencdc.Position) error {

0 commit comments

Comments
 (0)