Skip to content

Commit 194896a

Browse files
authored
Added convenience method StoreMessage that calls StoreOffsets (@finncolman, #676)
* Added convenience method StoreMessage that calls StoreOffsets * Added '.' at the end of the comment * Added Offset less than 0 check * Moved the check to the correct method Co-authored-by: Finn Colman <[email protected]>
1 parent 7182357 commit 194896a

File tree

2 files changed

+24
-0
lines changed

2 files changed

+24
-0
lines changed

kafka/consumer.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,20 @@ func (c *Consumer) StoreOffsets(offsets []TopicPartition) (storedOffsets []Topic
292292
return storedOffsets, nil
293293
}
294294

295+
// StoreMessage stores offset based on the provided message.
296+
// This is a convenience method that uses StoreOffsets to do the actual work.
297+
func (c *Consumer) StoreMessage(m *Message) (storedOffsets []TopicPartition, err error) {
298+
if m.TopicPartition.Error != nil {
299+
return nil, newErrorFromString(ErrInvalidArg, "Can't store errored message")
300+
}
301+
if m.TopicPartition.Offset < 0 {
302+
return nil, newErrorFromString(ErrInvalidArg, "Can't store message with offset less than 0")
303+
}
304+
offsets := []TopicPartition{m.TopicPartition}
305+
offsets[0].Offset++
306+
return c.StoreOffsets(offsets)
307+
}
308+
295309
// Seek seeks the given topic partitions using the offset from the TopicPartition.
296310
//
297311
// If timeoutMs is not 0 the call will wait this long for the

kafka/consumer_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,16 @@ func TestConsumerAPIs(t *testing.T) {
8585
t.Errorf("StoreOffsets(empty) failed: %s", err)
8686
}
8787

88+
// test StoreMessage doesn't fail either
89+
stored, err = c.StoreMessage(&Message{TopicPartition: TopicPartition{Topic: &topic, Partition: 0, Offset: 1}})
90+
if err != nil && err.(Error).Code() != ErrUnknownPartition {
91+
t.Errorf("StoreMessage() failed: %s", err)
92+
toppar := stored[0]
93+
if toppar.Error != nil && toppar.Error.(Error).Code() == ErrUnknownPartition {
94+
t.Errorf("StoreMessage() TopicPartition error: %s", toppar.Error)
95+
}
96+
}
97+
8898
topic1 := "gotest1"
8999
topic2 := "gotest2"
90100
err = c.Assign([]TopicPartition{{Topic: &topic1, Partition: 2},

0 commit comments

Comments
 (0)