@@ -18,17 +18,24 @@ var TopicNotFound = errors.New("topic not found")
1818var PartitionNotFound = errors .New ("partition not found" )
1919
2020type Record struct {
21- Key any
22- Value any
23- Headers map [string ]string
24- Partition int
21+ Offset int64 `json:"offset"`
22+ Key any `json:"key"`
23+ Value any `json:"value"`
24+ Headers []RecordHeader `json:"headers,omitempty"`
25+ Partition int `json:"partition"`
26+ }
27+
28+ type RecordHeader struct {
29+ Name string `json:"name"`
30+ Value string `json:"value"`
2531}
2632
2733type RecordResult struct {
2834 Partition int
2935 Offset int64
3036 Key []byte
3137 Value []byte
38+ Headers []RecordHeader
3239 Error string
3340}
3441
@@ -76,10 +83,10 @@ func (c *Client) Write(topic string, records []Record, ct *media.ContentType) ([
7683 Key : kafka .NewBytes (key ),
7784 Value : kafka .NewBytes (value ),
7885 }
79- for name , val := range r .Headers {
86+ for _ , h := range r .Headers {
8087 rec .Headers = append (rec .Headers , kafka.RecordHeader {
81- Key : name ,
82- Value : []byte (val ),
88+ Key : h . Name ,
89+ Value : []byte (h . Value ),
8390 })
8491 }
8592 b := kafka.RecordBatch {Records : []* kafka.Record {rec }}
@@ -98,12 +105,19 @@ func (c *Client) Write(topic string, records []Record, ct *media.ContentType) ([
98105 Error : res [0 ].BatchIndexErrorMessage ,
99106 })
100107 } else {
101- result = append ( result , RecordResult {
108+ rr := RecordResult {
102109 Offset : offset ,
103110 Key : kafka .Read (b .Records [0 ].Key ),
104111 Value : kafka .Read (b .Records [0 ].Value ),
105112 Partition : p .Index ,
106- })
113+ }
114+ for _ , h := range b .Records [0 ].Headers {
115+ rr .Headers = append (rr .Headers , RecordHeader {
116+ Name : h .Key ,
117+ Value : string (h .Value ),
118+ })
119+ }
120+ result = append (result , rr )
107121 c .store .UpdateMetrics (c .monitor , t , p , b )
108122 }
109123 }
@@ -133,36 +147,50 @@ func (c *Client) Read(topic string, partition int, offset int64, ct *media.Conte
133147 }
134148
135149 records := []Record {}
136- switch ct .Key () {
137- case "application/vnd.mokapi.kafka.binary+json" :
138- for _ , r := range b .Records {
139- var bKey []byte
140- base64 .StdEncoding .Encode (bKey , kafka .Read (r .Key ))
141- var bValue []byte
142- base64 .StdEncoding .Encode (bValue , kafka .Read (r .Value ))
143- records = append (records , Record {
144- Key : string (bKey ),
145- Value : string (bValue ),
146- })
150+ var getValue func (value []byte ) (any , error )
151+ switch {
152+ case ct .Key () == "application/vnd.mokapi.kafka.binary+json" :
153+ getValue = func (value []byte ) (any , error ) {
154+ return base64 .StdEncoding .EncodeToString (value ), nil
147155 }
148- case "application/json" , "" :
149- for _ , r := range b .Records {
150- key := string (kafka .Read (r .Key ))
156+ case ct .Key () == "application/json" , ct .IsAny ():
157+ getValue = func (value []byte ) (any , error ) {
151158 var val any
152- err := json .Unmarshal (kafka . Read ( r . Value ) , & val )
159+ err := json .Unmarshal (value , & val )
153160 if err != nil {
154161 return nil , fmt .Errorf ("parse record value as JSON failed: %v" , err )
155162 }
156-
157- records = append (records , Record {
158- Key : key ,
159- Value : val ,
160- })
163+ return val , nil
161164 }
165+
162166 default :
163167 return nil , fmt .Errorf ("unknown content type: %v" , ct )
164168 }
165169
170+ for _ , r := range b .Records {
171+ key := string (kafka .Read (r .Key ))
172+ val , err := getValue (kafka .Read (r .Value ))
173+ if err != nil {
174+ return nil , err
175+ }
176+
177+ rec := Record {
178+ Offset : r .Offset ,
179+ Partition : p .Index ,
180+ Key : key ,
181+ Value : val ,
182+ }
183+
184+ for _ , h := range r .Headers {
185+ rec .Headers = append (rec .Headers , RecordHeader {
186+ Name : h .Key ,
187+ Value : string (h .Value ),
188+ })
189+ }
190+
191+ records = append (records , rec )
192+ }
193+
166194 return records , nil
167195}
168196
@@ -198,19 +226,24 @@ func (c *Client) parse(v any, ct *media.ContentType) ([]byte, error) {
198226 return vt , nil
199227 case string :
200228 return []byte (vt ), nil
229+ default :
230+ return json .Marshal (v )
201231 }
202232 }
203- return nil , fmt .Errorf ("unknown content type: %v" , ct )
204233}
205234
206235func (c * Client ) parseKey (v any ) ([]byte , error ) {
236+ if v == nil {
237+ return nil , nil
238+ }
207239 switch vt := v .(type ) {
208240 case []byte :
209241 return vt , nil
210242 case string :
211243 return []byte (vt ), nil
244+ default :
245+ return json .Marshal (v )
212246 }
213- return nil , fmt .Errorf ("key not supported: %v" , v )
214247}
215248
216249func (r * Record ) UnmarshalJSON (b []byte ) error {
0 commit comments