@@ -134,10 +134,10 @@ func (f v1MessageSetBuilder) bytes() []byte {
134
134
wb .writeInt64 (msg .Offset ) // offset
135
135
}
136
136
wb .writeBytes (newWB ().call (func (wb * kafkaWriteBuffer ) {
137
- wb .writeInt32 (- 1 ) // crc, unused
138
- wb .writeInt8 (1 ) // magic
139
- wb .writeInt8 (0 ) // attributes -- zero, no compression for the inner message
140
- wb .writeInt64 (msg .Time .UnixMilli ()) // timestamp
137
+ wb .writeInt32 (- 1 ) // crc, unused
138
+ wb .writeInt8 (1 ) // magic
139
+ wb .writeInt8 (0 ) // attributes -- zero, no compression for the inner message
140
+ wb .writeInt64 (1000 * msg .Time .Unix ()) // timestamp
141
141
wb .writeBytes (msg .Key )
142
142
wb .writeBytes (msg .Value )
143
143
}))
@@ -147,15 +147,15 @@ func (f v1MessageSetBuilder) bytes() []byte {
147
147
})
148
148
if f .codec != nil {
149
149
bs = newWB ().call (func (wb * kafkaWriteBuffer ) {
150
- wb .writeInt64 (f .msgs [len (f .msgs )- 1 ].Offset ) // offset of the wrapper message is the last offset of the inner messages
150
+ wb .writeInt64 (f .msgs [len (f .msgs )- 1 ].Offset ) // offset of the wrapper message is the last offset of the inner messages
151
151
wb .writeBytes (newWB ().call (func (wb * kafkaWriteBuffer ) {
152
152
bs := mustCompress (bs , f .codec )
153
- wb .writeInt32 (- 1 ) // crc, unused
154
- wb .writeInt8 (1 ) // magic
155
- wb .writeInt8 (f .codec .Code ()) // attributes
156
- wb .writeInt64 (f .msgs [0 ].Time .UnixMilli ()) // timestamp
157
- wb .writeBytes (nil ) // key is always nil for compressed
158
- wb .writeBytes (bs ) // the value is the compressed message
153
+ wb .writeInt32 (- 1 ) // crc, unused
154
+ wb .writeInt8 (1 ) // magic
155
+ wb .writeInt8 (f .codec .Code ()) // attributes
156
+ wb .writeInt64 (1000 * f .msgs [0 ].Time .Unix ()) // timestamp
157
+ wb .writeBytes (nil ) // key is always nil for compressed
158
+ wb .writeBytes (bs ) // the value is the compressed message
159
159
}))
160
160
})
161
161
}
@@ -179,23 +179,23 @@ func (f v2MessageSetBuilder) bytes() []byte {
179
179
return newWB ().call (func (wb * kafkaWriteBuffer ) {
180
180
wb .writeInt64 (f .msgs [0 ].Offset )
181
181
wb .writeBytes (newWB ().call (func (wb * kafkaWriteBuffer ) {
182
- wb .writeInt32 (0 ) // leader epoch
183
- wb .writeInt8 (2 ) // magic = 2
184
- wb .writeInt32 (0 ) // crc, unused
185
- wb .writeInt16 (attributes ) // record set attributes
186
- wb .writeInt32 (0 ) // record set last offset delta
187
- wb .writeInt64 (f .msgs [0 ].Time .UnixMilli ()) // record set first timestamp
188
- wb .writeInt64 (f .msgs [0 ].Time .UnixMilli ()) // record set last timestamp
189
- wb .writeInt64 (0 ) // record set producer id
190
- wb .writeInt16 (0 ) // record set producer epoch
191
- wb .writeInt32 (0 ) // record set base sequence
192
- wb .writeInt32 (int32 (len (f .msgs ))) // record set count
182
+ wb .writeInt32 (0 ) // leader epoch
183
+ wb .writeInt8 (2 ) // magic = 2
184
+ wb .writeInt32 (0 ) // crc, unused
185
+ wb .writeInt16 (attributes ) // record set attributes
186
+ wb .writeInt32 (0 ) // record set last offset delta
187
+ wb .writeInt64 (1000 * f .msgs [0 ].Time .Unix ()) // record set first timestamp
188
+ wb .writeInt64 (1000 * f .msgs [0 ].Time .Unix ()) // record set last timestamp
189
+ wb .writeInt64 (0 ) // record set producer id
190
+ wb .writeInt16 (0 ) // record set producer epoch
191
+ wb .writeInt32 (0 ) // record set base sequence
192
+ wb .writeInt32 (int32 (len (f .msgs ))) // record set count
193
193
bs := newWB ().call (func (wb * kafkaWriteBuffer ) {
194
194
for i , msg := range f .msgs {
195
195
wb .Write (newWB ().call (func (wb * kafkaWriteBuffer ) {
196
196
bs := newWB ().call (func (wb * kafkaWriteBuffer ) {
197
197
wb .writeInt8 (0 ) // record attributes, not used here
198
- wb .writeVarInt (time .Now ().UnixMilli () - msg .Time .UnixMilli ()) // timestamp
198
+ wb .writeVarInt (1000 * ( time .Now ().Unix () - msg .Time .Unix ())) // timestamp
199
199
wb .writeVarInt (int64 (i )) // offset delta
200
200
wb .writeVarInt (int64 (len (msg .Key ))) // key len
201
201
wb .Write (msg .Key ) // key bytes
0 commit comments