diff --git a/partition_table.go b/partition_table.go index 79baf898..f717145b 100644 --- a/partition_table.go +++ b/partition_table.go @@ -8,6 +8,7 @@ import ( "github.com/IBM/sarama" "github.com/hashicorp/go-multierror" + "github.com/lovoo/goka/multierr" "github.com/lovoo/goka/storage" ) @@ -329,8 +330,13 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr defer p.log.Debugf("... Loading done") partConsumer, err = p.consumer.ConsumePartition(p.topic, p.partition, loadOffset) - if err != nil { - return fmt.Errorf("Error creating partition consumer for topic %s, partition %d, offset %d: %v", p.topic, p.partition, storedOffset, err) + if err == sarama.ErrOffsetOutOfRange { + p.log.Printf("Offset %d out of range for topic %s, partition %d. Falling back to oldest offset.", loadOffset, p.topic, p.partition) + + partConsumer, err = p.consumer.ConsumePartition(p.topic, p.partition, sarama.OffsetOldest) + if err != nil { + return fmt.Errorf("Error creating partition consumer for topic %s, partition %d, offset %d: %v", p.topic, p.partition, storedOffset, err) + } } // close the consumer