Skip to content

Commit 6e4c6cf

Browse files
authored
Merge pull request #785 Add read message builder
2 parents 192de17 + 1bd74d6 commit 6e4c6cf

File tree

3 files changed

+88
-0
lines changed

3 files changed

+88
-0
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Added builder for topic reader message (usable for tests)
2+
13
## v3.48.5
24
* Removed `log.Secret` helper as unnessesarry in public API after refactoring logging subsystem
35
* Enriched the error with important details from initial discovery

internal/topic/topicreaderinternal/message.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,3 +115,79 @@ type errorReader struct {
115115
func (u errorReader) Read(p []byte) (n int, err error) {
116116
return 0, u.err
117117
}
118+
119+
type PublicMessageBuilder struct {
120+
mess *PublicMessage
121+
}
122+
123+
func NewPublicMessageBuilder() *PublicMessageBuilder {
124+
return &PublicMessageBuilder{
125+
mess: &PublicMessage{},
126+
}
127+
}
128+
129+
// Seqno set message Seqno
130+
func (pmb *PublicMessageBuilder) Seqno(seqNo int64) *PublicMessageBuilder {
131+
pmb.mess.SeqNo = seqNo
132+
return pmb
133+
}
134+
135+
// CreatedAt set message CreatedAt
136+
func (pmb *PublicMessageBuilder) CreatedAt(createdAt time.Time) *PublicMessageBuilder {
137+
pmb.mess.CreatedAt = createdAt
138+
return pmb
139+
}
140+
141+
// MessageGroupID set message MessageGroupID
142+
func (pmb *PublicMessageBuilder) MessageGroupID(messageGroupID string) *PublicMessageBuilder {
143+
pmb.mess.MessageGroupID = messageGroupID
144+
return pmb
145+
}
146+
147+
// WriteSessionMetadata set message WriteSessionMetadata
148+
func (pmb *PublicMessageBuilder) WriteSessionMetadata(writeSessionMetadata map[string]string) *PublicMessageBuilder {
149+
pmb.mess.WriteSessionMetadata = writeSessionMetadata
150+
return pmb
151+
}
152+
153+
// Offset set message Offset
154+
func (pmb *PublicMessageBuilder) Offset(offset int64) *PublicMessageBuilder {
155+
pmb.mess.Offset = offset
156+
return pmb
157+
}
158+
159+
// WrittenAt set message WrittenAt
160+
func (pmb *PublicMessageBuilder) WrittenAt(writtenAt time.Time) *PublicMessageBuilder {
161+
pmb.mess.WrittenAt = writtenAt
162+
return pmb
163+
}
164+
165+
// ProducerID set message ProducerID
166+
func (pmb *PublicMessageBuilder) ProducerID(producerID string) *PublicMessageBuilder {
167+
pmb.mess.ProducerID = producerID
168+
return pmb
169+
}
170+
171+
// DataAndUncompressedSize set message uncompressed content and field UncompressedSize
172+
func (pmb *PublicMessageBuilder) DataAndUncompressedSize(data []byte) *PublicMessageBuilder {
173+
copyData := make([]byte, len(data))
174+
copy(copyData, data)
175+
pmb.mess.data = oneTimeReader{reader: bytes.NewReader(data)}
176+
pmb.mess.dataConsumed = false
177+
pmb.mess.rawDataLen = len(copyData)
178+
pmb.mess.UncompressedSize = len(copyData)
179+
return pmb
180+
}
181+
182+
// UncompressedSize set message UncompressedSize
183+
func (pmb *PublicMessageBuilder) UncompressedSize(uncompressedSize int) *PublicMessageBuilder {
184+
pmb.mess.UncompressedSize = uncompressedSize
185+
return pmb
186+
}
187+
188+
// Build return builded message and reset internal state for create new message
189+
func (pmb *PublicMessageBuilder) Build() *PublicMessage {
190+
mess := pmb.mess
191+
pmb.mess = &PublicMessage{}
192+
return mess
193+
}

testutil/topic.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package testutil
2+
3+
import "github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicreaderinternal"
4+
5+
type TopicReaderMessageBuilder = topicreaderinternal.PublicMessageBuilder
6+
7+
// NewTopicReaderMessageBuilder create builder, which can create Message (use for tests only)
8+
func NewTopicReaderMessageBuilder() *TopicReaderMessageBuilder {
9+
return topicreaderinternal.NewPublicMessageBuilder()
10+
}

0 commit comments

Comments
 (0)