From 67af4332cafcced84faa833c99982a36c92d98fb Mon Sep 17 00:00:00 2001 From: dao-jun Date: Sat, 21 Oct 2023 21:34:51 +0800 Subject: [PATCH 1/4] Introduce Boolean Schema --- pulsar/schema.go | 39 +++++++++++++++++++++++++++++++++++++++ pulsar/schema_test.go | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+) diff --git a/pulsar/schema.go b/pulsar/schema.go index 5c063e32a3..a1aed53a27 100644 --- a/pulsar/schema.go +++ b/pulsar/schema.go @@ -101,6 +101,8 @@ func NewSchema(schemaType SchemaType, schemaData []byte, properties map[string]s s = NewProtoSchema(schemaDef, properties) case AVRO: s = NewAvroSchema(schemaDef, properties) + case BOOLEAN: + s = NewBooleanSchema(properties) case INT8: s = NewInt8Schema(properties) case INT16: @@ -384,6 +386,43 @@ func (as *AvroSchema) GetSchemaInfo() *SchemaInfo { return &as.SchemaInfo } +var _ Schema = (*BooleanSchema)(nil) + +type BooleanSchema struct { + SchemaInfo +} + +func NewBooleanSchema(properties map[string]string) *BooleanSchema { + boolSchema := new(BooleanSchema) + boolSchema.SchemaInfo.Properties = properties + boolSchema.SchemaInfo.Name = "Boolean" + boolSchema.SchemaInfo.Type = BOOLEAN + boolSchema.SchemaInfo.Schema = "" + return boolSchema +} + +func (bs *BooleanSchema) Encode(v interface{}) ([]byte, error) { + var buf bytes.Buffer + err := WriteElements(&buf, v.(bool)) + return buf.Bytes(), err +} + +func (bs *BooleanSchema) Decode(data []byte, v interface{}) error { + buf := bytes.NewReader(data) + return ReadElements(buf, v) +} + +func (bs *BooleanSchema) Validate(message []byte) error { + if len(message) != 1 { + return newError(InvalidMessage, "size of data received by BooleanSchema is not 1") + } + return nil +} + +func (bs *BooleanSchema) GetSchemaInfo() *SchemaInfo { + return &bs.SchemaInfo +} + type StringSchema struct { SchemaInfo } diff --git a/pulsar/schema_test.go b/pulsar/schema_test.go index c2008f6de9..f6e7acd97d 100644 --- a/pulsar/schema_test.go +++ b/pulsar/schema_test.go @@ -492,3 +492,37 @@ func TestDoubleSchema(t *testing.T) { assert.Equal(t, res, float64(1)) defer consumer.Close() } + +func TestBooleanSchema(t *testing.T) { + client := createClient() + defer client.Close() + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: "booleanTopic", + Schema: NewBooleanSchema(nil), + }) + assert.Nil(t, err) + ctx := context.Background() + if _, err := producer.Send(ctx, &ProducerMessage{ + Value: true, + }); err != nil { + log.Fatal(err) + } + defer producer.Close() + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: "booleanTopic", + SubscriptionName: "sub-2", + Schema: NewBooleanSchema(nil), + SubscriptionInitialPosition: SubscriptionPositionEarliest, + }) + assert.Nil(t, err) + + var res bool + msg, err := consumer.Receive(ctx) + assert.Nil(t, err) + err = msg.GetSchemaValue(&res) + assert.Nil(t, err) + assert.Equal(t, res, true) + defer consumer.Close() +} From 82dc89f478d055a94082daac851720c72c4ea5de Mon Sep 17 00:00:00 2001 From: dao-jun Date: Sat, 21 Oct 2023 21:37:47 +0800 Subject: [PATCH 2/4] Call Schema.Validate before decode --- pulsar/impl_message.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go index 89a709f14a..a2bac2445f 100644 --- a/pulsar/impl_message.go +++ b/pulsar/impl_message.go @@ -357,8 +357,14 @@ func (msg *message) GetSchemaValue(v interface{}) error { if err != nil { return err } + if err := schema.Validate(msg.payLoad); err != nil { + return err + } return schema.Decode(msg.payLoad, v) } + if err := msg.schema.Validate(msg.payLoad); err != nil { + return err + } return msg.schema.Decode(msg.payLoad, v) } From 4f0491426023cdb351d01c882dc6f1d3bfb16356 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Sat, 21 Oct 2023 22:31:46 +0800 Subject: [PATCH 3/4] fix StringSchema.Validate --- pulsar/schema.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/schema.go b/pulsar/schema.go index a1aed53a27..69b52a49c3 100644 --- a/pulsar/schema.go +++ b/pulsar/schema.go @@ -448,7 +448,7 @@ func (ss *StringSchema) Decode(data []byte, v interface{}) error { } func (ss *StringSchema) Validate(message []byte) error { - return ss.Decode(message, nil) + return nil } func (ss *StringSchema) GetSchemaInfo() *SchemaInfo { From fe5f290af1751e9ae6bcbb850e5adc540c3a18b3 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Sat, 21 Oct 2023 22:51:35 +0800 Subject: [PATCH 4/4] fix Schema.Validate --- pulsar/schema.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pulsar/schema.go b/pulsar/schema.go index 69b52a49c3..1b1bb1c660 100644 --- a/pulsar/schema.go +++ b/pulsar/schema.go @@ -180,7 +180,7 @@ func (js *JSONSchema) Decode(data []byte, v interface{}) error { } func (js *JSONSchema) Validate(message []byte) error { - return js.Decode(message, nil) + return nil } func (js *JSONSchema) GetSchemaInfo() *SchemaInfo { @@ -229,7 +229,7 @@ func (ps *ProtoSchema) Decode(data []byte, v interface{}) error { } func (ps *ProtoSchema) Validate(message []byte) error { - return ps.Decode(message, nil) + return nil } func (ps *ProtoSchema) GetSchemaInfo() *SchemaInfo { @@ -307,7 +307,7 @@ func (ps *ProtoNativeSchema) Decode(data []byte, v interface{}) error { } func (ps *ProtoNativeSchema) Validate(message []byte) error { - return ps.Decode(message, nil) + return nil } func (ps *ProtoNativeSchema) GetSchemaInfo() *SchemaInfo { @@ -379,7 +379,7 @@ func (as *AvroSchema) Decode(data []byte, v interface{}) error { } func (as *AvroSchema) Validate(message []byte) error { - return as.Decode(message, nil) + return nil } func (as *AvroSchema) GetSchemaInfo() *SchemaInfo { @@ -478,7 +478,7 @@ func (bs *BytesSchema) Decode(data []byte, v interface{}) error { } func (bs *BytesSchema) Validate(message []byte) error { - return bs.Decode(message, nil) + return nil } func (bs *BytesSchema) GetSchemaInfo() *SchemaInfo {