Skip to content

Commit 23df1b2

Browse files
committed
Add read message builder
1 parent 192de17 commit 23df1b2

File tree

2 files changed

+83
-0
lines changed

2 files changed

+83
-0
lines changed

internal/topic/topicreaderinternal/message.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,3 +115,79 @@ type errorReader struct {
115115
func (u errorReader) Read(p []byte) (n int, err error) {
116116
return 0, u.err
117117
}
118+
119+
type PublicMessageBuilder struct {
120+
mess *PublicMessage
121+
}
122+
123+
func NewPublicMessageBuilder() *PublicMessageBuilder {
124+
return &PublicMessageBuilder{
125+
mess: &PublicMessage{},
126+
}
127+
}
128+
129+
// Seqno set message Seqno
130+
func (pmb *PublicMessageBuilder) Seqno(seqNo int64) *PublicMessageBuilder {
131+
pmb.mess.SeqNo = seqNo
132+
return pmb
133+
}
134+
135+
// CreatedAt set message CreatedAt
136+
func (pmb *PublicMessageBuilder) CreatedAt(createdAt time.Time) *PublicMessageBuilder {
137+
pmb.mess.CreatedAt = createdAt
138+
return pmb
139+
}
140+
141+
// MessageGroupID set message MessageGroupID
142+
func (pmb *PublicMessageBuilder) MessageGroupID(messageGroupID string) *PublicMessageBuilder {
143+
pmb.mess.MessageGroupID = messageGroupID
144+
return pmb
145+
}
146+
147+
// WriteSessionMetadata set message WriteSessionMetadata
148+
func (pmb *PublicMessageBuilder) WriteSessionMetadata(writeSessionMetadata map[string]string) *PublicMessageBuilder {
149+
pmb.mess.WriteSessionMetadata = writeSessionMetadata
150+
return pmb
151+
}
152+
153+
// Offset set message Offset
154+
func (pmb *PublicMessageBuilder) Offset(offset int64) *PublicMessageBuilder {
155+
pmb.mess.Offset = offset
156+
return pmb
157+
}
158+
159+
// WrittenAt set message WrittenAt
160+
func (pmb *PublicMessageBuilder) WrittenAt(writtenAt time.Time) *PublicMessageBuilder {
161+
pmb.mess.WrittenAt = writtenAt
162+
return pmb
163+
}
164+
165+
// ProducerID set message ProducerID
166+
func (pmb *PublicMessageBuilder) ProducerID(producerID string) *PublicMessageBuilder {
167+
pmb.mess.ProducerID = producerID
168+
return pmb
169+
}
170+
171+
// DataAndUncompressedSize set message uncompressed content and field UncompressedSize
172+
func (pmb *PublicMessageBuilder) DataAndUncompressedSize(data []byte) *PublicMessageBuilder {
173+
copyData := make([]byte, len(data))
174+
copy(copyData, data)
175+
pmb.mess.data = oneTimeReader{reader: bytes.NewReader(data)}
176+
pmb.mess.dataConsumed = false
177+
pmb.mess.rawDataLen = len(copyData)
178+
pmb.mess.UncompressedSize = len(copyData)
179+
return pmb
180+
}
181+
182+
// UncompressedSize set message UncompressedSize
183+
func (pmb *PublicMessageBuilder) UncompressedSize(uncompressedSize int) *PublicMessageBuilder {
184+
pmb.mess.UncompressedSize = uncompressedSize
185+
return pmb
186+
}
187+
188+
// Build return builded message and reset internal state for create new message
189+
func (pmb *PublicMessageBuilder) Build() *PublicMessage {
190+
mess := pmb.mess
191+
pmb.mess = &PublicMessage{}
192+
return mess
193+
}

topic/topicreader/reader.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,13 @@ func (r *Reader) ReadMessage(ctx context.Context) (*Message, error) {
6060
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
6161
type Message = topicreaderinternal.PublicMessage
6262

63+
type MessageBuilder = topicreaderinternal.PublicMessageBuilder
64+
65+
// NewMessageBuilder create builder, which can create Message (use for tests only)
66+
func NewMessageBuilder() *MessageBuilder {
67+
return topicreaderinternal.NewPublicMessageBuilder()
68+
}
69+
6370
// MessageContentUnmarshaler
6471
//
6572
// # Experimental

0 commit comments

Comments
 (0)