1- package topicreaderinternal
1+ package topicreadercommon
22
33import (
44 "context"
@@ -20,17 +20,17 @@ type PublicBatch struct {
2020
2121 Messages []* PublicMessage
2222
23- commitRange commitRange // от всех сообщений батча
23+ commitRange CommitRange // от всех сообщений батча
2424}
2525
26- func newBatch (session * partitionSession , messages []* PublicMessage ) (* PublicBatch , error ) {
26+ func NewBatch (session * PartitionSession , messages []* PublicMessage ) (* PublicBatch , error ) {
2727 for i := 0 ; i < len (messages ); i ++ {
2828 msg := messages [i ]
2929
30- if msg .commitRange .partitionSession == nil {
31- msg .commitRange .partitionSession = session
30+ if msg .commitRange .PartitionSession == nil {
31+ msg .commitRange .PartitionSession = session
3232 }
33- if session != msg .commitRange .partitionSession {
33+ if session != msg .commitRange .PartitionSession {
3434 return nil , xerrors .WithStackTrace (errBadSessionWhileMessageBatchCreate )
3535 }
3636
@@ -39,17 +39,17 @@ func newBatch(session *partitionSession, messages []*PublicMessage) (*PublicBatc
3939 }
4040
4141 prev := messages [i - 1 ]
42- if prev .commitRange .commitOffsetEnd != msg .commitRange .commitOffsetStart {
42+ if prev .commitRange .CommitOffsetEnd != msg .commitRange .CommitOffsetStart {
4343 return nil , xerrors .WithStackTrace (errBadMessageOffsetWhileMessageBatchCreate )
4444 }
4545 }
4646
47- offset := commitRange {
48- partitionSession : session ,
47+ offset := CommitRange {
48+ PartitionSession : session ,
4949 }
5050 if len (messages ) > 0 {
51- offset .commitOffsetStart = messages [0 ].commitRange .commitOffsetStart
52- offset .commitOffsetEnd = messages [len (messages )- 1 ].commitRange .commitOffsetEnd
51+ offset .CommitOffsetStart = messages [0 ].commitRange .CommitOffsetStart
52+ offset .CommitOffsetEnd = messages [len (messages )- 1 ].commitRange .CommitOffsetEnd
5353 }
5454
5555 return & PublicBatch {
@@ -58,13 +58,13 @@ func newBatch(session *partitionSession, messages []*PublicMessage) (*PublicBatc
5858 }, nil
5959}
6060
61- func newBatchFromStream (
62- decoders decoderMap ,
63- session * partitionSession ,
61+ func NewBatchFromStream (
62+ decoders DecoderMap ,
63+ session * PartitionSession ,
6464 sb rawtopicreader.Batch , //nolint:gocritic
6565) (* PublicBatch , error ) {
6666 messages := make ([]* PublicMessage , len (sb .MessageData ))
67- prevOffset := session .lastReceivedMessageOffset ()
67+ prevOffset := session .LastReceivedMessageOffset ()
6868 for i := range sb .MessageData {
6969 sMess := & sb .MessageData [i ]
7070
@@ -82,9 +82,9 @@ func newBatchFromStream(
8282 dstMess .data = createReader (decoders , sb .Codec , sMess .Data )
8383 dstMess .UncompressedSize = int (sMess .UncompressedSize )
8484
85- dstMess .commitRange .partitionSession = session
86- dstMess .commitRange .commitOffsetStart = prevOffset + 1
87- dstMess .commitRange .commitOffsetEnd = sMess .Offset + 1
85+ dstMess .commitRange .PartitionSession = session
86+ dstMess .commitRange .CommitOffsetStart = prevOffset + 1
87+ dstMess .commitRange .CommitOffsetEnd = sMess .Offset + 1
8888
8989 if len (sMess .MetadataItems ) > 0 {
9090 dstMess .Metadata = make (map [string ][]byte , len (sMess .MetadataItems ))
@@ -96,15 +96,15 @@ func newBatchFromStream(
9696 prevOffset = sMess .Offset
9797 }
9898
99- session .setLastReceivedMessageOffset (prevOffset )
99+ session .SetLastReceivedMessageOffset (prevOffset )
100100
101- return newBatch (session , messages )
101+ return NewBatch (session , messages )
102102}
103103
104104// Context is cancelled when code should stop to process messages batch
105105// for example - lost connection to server or receive stop partition signal without graceful flag
106106func (m * PublicBatch ) Context () context.Context {
107- return m .commitRange .partitionSession .Context ()
107+ return m .commitRange .PartitionSession .Context ()
108108}
109109
110110// Topic is path of source topic of the messages in the batch
@@ -117,56 +117,14 @@ func (m *PublicBatch) PartitionID() int64 {
117117 return m .partitionSession ().PartitionID
118118}
119119
120- func (m * PublicBatch ) partitionSession () * partitionSession {
121- return m .commitRange .partitionSession
120+ func (m * PublicBatch ) partitionSession () * PartitionSession {
121+ return m .commitRange .PartitionSession
122122}
123123
124124func (m * PublicBatch ) getCommitRange () PublicCommitRange {
125125 return m .commitRange .getCommitRange ()
126126}
127127
128- func (m * PublicBatch ) append (b * PublicBatch ) (* PublicBatch , error ) {
129- var res * PublicBatch
130- if m == nil {
131- res = & PublicBatch {}
132- } else {
133- res = m
134- }
135-
136- if res .commitRange .partitionSession != b .commitRange .partitionSession {
137- return nil , xerrors .WithStackTrace (errors .New ("ydb: bad partition session for merge" ))
138- }
139-
140- if res .commitRange .commitOffsetEnd != b .commitRange .commitOffsetStart {
141- return nil , xerrors .WithStackTrace (errors .New ("ydb: bad offset interval for merge" ))
142- }
143-
144- res .Messages = append (res .Messages , b .Messages ... )
145- res .commitRange .commitOffsetEnd = b .commitRange .commitOffsetEnd
146-
147- return res , nil
148- }
149-
150- func (m * PublicBatch ) cutMessages (count int ) (head , rest * PublicBatch ) {
151- switch {
152- case count == 0 :
153- return nil , m
154- case count >= len (m .Messages ):
155- return m , nil
156- default :
157- // slice[0:count:count] - limit slice capacity and prevent overwrite rest by append messages to head
158- // explicit 0 need for prevent typos, when type slice[count:count] instead of slice[:count:count]
159- head , _ = newBatch (m .commitRange .partitionSession , m .Messages [0 :count :count ])
160- rest , _ = newBatch (m .commitRange .partitionSession , m .Messages [count :])
161-
162- return head , rest
163- }
164- }
165-
166- func (m * PublicBatch ) isEmpty () bool {
167- return m == nil || len (m .Messages ) == 0
168- }
169-
170128func splitBytesByMessagesInBatches (batches []* PublicBatch , totalBytesCount int ) error {
171129 restBytes := totalBytesCount
172130
@@ -219,3 +177,55 @@ func splitBytesByMessagesInBatches(batches []*PublicBatch, totalBytesCount int)
219177
220178 return nil
221179}
180+
181+ func BatchAppend (original , appended * PublicBatch ) (* PublicBatch , error ) {
182+ var res * PublicBatch
183+ if original == nil {
184+ res = & PublicBatch {}
185+ } else {
186+ res = original
187+ }
188+
189+ if res .commitRange .PartitionSession != appended .commitRange .PartitionSession {
190+ return nil , xerrors .WithStackTrace (errors .New ("ydb: bad partition session for merge" ))
191+ }
192+
193+ if res .commitRange .CommitOffsetEnd != appended .commitRange .CommitOffsetStart {
194+ return nil , xerrors .WithStackTrace (errors .New ("ydb: bad offset interval for merge" ))
195+ }
196+
197+ res .Messages = append (res .Messages , appended .Messages ... )
198+ res .commitRange .CommitOffsetEnd = appended .commitRange .CommitOffsetEnd
199+
200+ return res , nil
201+ }
202+
203+ func BatchCutMessages (b * PublicBatch , count int ) (head , rest * PublicBatch ) {
204+ switch {
205+ case count == 0 :
206+ return nil , b
207+ case count >= len (b .Messages ):
208+ return b , nil
209+ default :
210+ // slice[0:count:count] - limit slice capacity and prevent overwrite rest by append messages to head
211+ // explicit 0 need for prevent typos, when type slice[count:count] instead of slice[:count:count]
212+ head , _ = NewBatch (b .commitRange .PartitionSession , b .Messages [0 :count :count ])
213+ rest , _ = NewBatch (b .commitRange .PartitionSession , b .Messages [count :])
214+
215+ return head , rest
216+ }
217+ }
218+
219+ func BatchIsEmpty (b * PublicBatch ) bool {
220+ return b == nil || len (b .Messages ) == 0
221+ }
222+
223+ func BatchGetPartitionSessionID (item * PublicBatch ) rawtopicreader.PartitionSessionID {
224+ return item .partitionSession ().PartitionSessionID
225+ }
226+
227+ func BatchSetCommitRangeForTest (b * PublicBatch , commitRange CommitRange ) * PublicBatch {
228+ b .commitRange = commitRange
229+
230+ return b
231+ }
0 commit comments