44 "errors"
55 "fmt"
66 log "github.com/sirupsen/logrus"
7+ "maps"
78 "math/rand"
89 "mokapi/engine/common"
910 "mokapi/kafka"
@@ -16,6 +17,7 @@ import (
1617 "mokapi/schema/encoding"
1718 "mokapi/schema/json/generator"
1819 "mokapi/schema/json/schema"
20+ "slices"
1921 "strings"
2022 "time"
2123)
@@ -165,24 +167,38 @@ func (c *KafkaClient) getPartition(t *store.Topic, partition int) (*store.Partit
165167}
166168
167169func (c * KafkaClient ) createRecordBatch (key , value interface {}, headers map [string ]interface {}, topic * asyncapi3.Channel , config * asyncapi3.Config ) (rb kafka.RecordBatch , err error ) {
168- n := len (topic .Messages )
169- if n == 0 {
170- err = fmt .Errorf ("message configuration missing" )
171- return
170+ contentType := config .DefaultContentType
171+ var payload * asyncapi3.SchemaRef
172+ keySchema := & asyncapi3.SchemaRef {
173+ Value : & asyncapi3.MultiSchemaFormat {
174+ Schema : & schema.Schema {Type : schema.Types {"string" }, Pattern : "[a-z]{9}" },
175+ },
172176 }
177+ var headerSchema * schema.Schema
173178
174- msg , err := selectMessage (value , topic .Name , config )
175- if err != nil {
176- return
177- }
179+ if len (topic .Messages ) > 0 {
180+ var msg * asyncapi3.Message
181+ msg , err = selectMessage (value , topic , config )
182+ if err != nil {
183+ return
184+ }
185+ payload = msg .Payload
186+
187+ if msg .Bindings .Kafka .Key != nil {
188+ keySchema = msg .Bindings .Kafka .Key
189+ }
190+
191+ if msg .ContentType != "" {
192+ contentType = msg .ContentType
193+ }
178194
179- keySchema := msg .Bindings . Kafka . Key
180- if keySchema == nil {
181- // use default key schema
182- keySchema = & asyncapi3. SchemaRef {
183- Value : & asyncapi3. MultiSchemaFormat {
184- Schema : & schema. Schema { Type : schema. Types { "string" }, Pattern : "[a-z]{9}" },
185- },
195+ if msg .Headers != nil {
196+ var ok bool
197+ headerSchema , ok = msg . Headers . Value . Schema .( * schema. Schema )
198+ if ! ok {
199+ err = fmt . Errorf ( "currently only json schema supported" )
200+ return
201+ }
186202 }
187203 }
188204
@@ -194,16 +210,13 @@ func (c *KafkaClient) createRecordBatch(key, value interface{}, headers map[stri
194210 }
195211
196212 if value == nil {
197- value , err = createValue (msg . Payload )
213+ value , err = createValue (payload )
198214 if err != nil {
199215 return rb , fmt .Errorf ("unable to generate kafka value: %v" , err )
200216 }
201217 }
202218
203- contentType := config .DefaultContentType
204- if msg .ContentType != "" {
205- contentType = msg .ContentType
206- } else if contentType == "" {
219+ if contentType == "" {
207220 // set default: https://github.com/asyncapi/spec/issues/319
208221 contentType = "application/json"
209222 }
@@ -212,7 +225,7 @@ func (c *KafkaClient) createRecordBatch(key, value interface{}, headers map[stri
212225 if b , ok := value .([]byte ); ok {
213226 v = b
214227 } else {
215- v , err = marshal (value , msg . Payload , contentType )
228+ v , err = marshal (value , payload , contentType )
216229 if err != nil {
217230 return
218231 }
@@ -229,16 +242,7 @@ func (c *KafkaClient) createRecordBatch(key, value interface{}, headers map[stri
229242 }
230243
231244 var recordHeaders []kafka.RecordHeader
232- var hs * schema.Schema
233- if msg .Headers != nil {
234- var ok bool
235- hs , ok = msg .Headers .Value .Schema .(* schema.Schema )
236- if ! ok {
237- err = fmt .Errorf ("currently only json schema supported" )
238- return
239- }
240- }
241- recordHeaders , err = getHeaders (headers , hs )
245+ recordHeaders , err = getHeaders (headers , headerSchema )
242246 if err != nil {
243247 return
244248 }
@@ -352,22 +356,24 @@ func marshalKey(key interface{}, r *asyncapi3.SchemaRef) ([]byte, error) {
352356 }
353357}
354358
355- func selectMessage (value any , topic string , cfg * asyncapi3.Config ) (* asyncapi3.Message , error ) {
359+ func selectMessage (value any , topic * asyncapi3. Channel , cfg * asyncapi3.Config ) (* asyncapi3.Message , error ) {
356360 noOperationDefined := true
357361
358362 // first try to get send operation
359- for name , op := range cfg .Operations {
363+ for _ , op := range cfg .Operations {
360364 if op .Value == nil || op .Value .Channel .Value == nil {
361365 continue
362366 }
363- if op .Value .Channel .Value . Name == topic && op .Value .Action == "send" {
367+ if op .Value .Channel .Value == topic && op .Value .Action == "send" {
364368 noOperationDefined = false
369+ var messages []* asyncapi3.MessageRef
365370 if len (op .Value .Messages ) == 0 {
366- log .Warnf ("no message defined for operation %s" , name )
371+ messages = slices .Collect (maps .Values (op .Value .Channel .Value .Messages ))
372+ } else {
373+ messages = op .Value .Messages
367374 }
368- for _ , msg := range op . Value . Messages {
375+ for _ , msg := range messages {
369376 if msg .Value == nil {
370- log .Errorf ("no message defined for operation %s" , name )
371377 continue
372378 }
373379 if valueMatchMessagePayload (value , msg .Value ) {
@@ -378,18 +384,20 @@ func selectMessage(value any, topic string, cfg *asyncapi3.Config) (*asyncapi3.M
378384 }
379385
380386 // second, try to get receive operation
381- for name , op := range cfg .Operations {
387+ for _ , op := range cfg .Operations {
382388 if op .Value == nil || op .Value .Channel .Value == nil {
383389 continue
384390 }
385- if op .Value .Channel .Value . Name == topic && op .Value .Action == "receive" {
391+ if op .Value .Channel .Value == topic && op .Value .Action == "receive" {
386392 noOperationDefined = false
393+ var messages []* asyncapi3.MessageRef
387394 if len (op .Value .Messages ) == 0 {
388- log .Errorf ("no message defined for operation %s" , name )
395+ messages = slices .Collect (maps .Values (op .Value .Channel .Value .Messages ))
396+ } else {
397+ messages = op .Value .Messages
389398 }
390- for _ , msg := range op . Value . Messages {
399+ for _ , msg := range messages {
391400 if msg .Value == nil {
392- log .Warnf ("no message defined for operation %s" , name )
393401 continue
394402 }
395403 if valueMatchMessagePayload (value , msg .Value ) {
@@ -404,13 +412,13 @@ func selectMessage(value any, topic string, cfg *asyncapi3.Config) (*asyncapi3.M
404412 }
405413
406414 if value != nil {
407- return nil , fmt .Errorf ("no matching 'send' or 'receive' operation found for value: %v" , value )
415+ return nil , fmt .Errorf ("no message configuration matches the message value for topic '%s' and value: %v" , topic . GetName () , value )
408416 }
409- return nil , fmt .Errorf ("no matching 'send' or 'receive' operation defined " )
417+ return nil , fmt .Errorf ("no message " )
410418}
411419
412420func valueMatchMessagePayload (value any , msg * asyncapi3.Message ) bool {
413- if value == nil {
421+ if value == nil || msg . Payload == nil {
414422 return true
415423 }
416424
0 commit comments