Skip to content

Commit fb1ec4a

Browse files
committed
[improve] Support http lookup getSchema() interface
1 parent f673e4d commit fb1ec4a

File tree

5 files changed

+209
-17
lines changed

5 files changed

+209
-17
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ bin/golangci-lint:
5050
# use golangCi-lint docker to avoid local golang env issues
5151
# https://golangci-lint.run/welcome/install/
5252
lint-docker:
53-
docker run --rm -v $(shell pwd):/app -w /app golangci/golangci-lint:v1.51.2 golangci-lint run -v
53+
docker run --rm -v $(shell pwd):/app -w /app golangci/golangci-lint:v1.61.0 golangci-lint run -v
5454

5555
container:
5656
docker build -t ${IMAGE_NAME} \

pulsar/consumer_partition.go

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -318,22 +318,39 @@ func (s *schemaInfoCache) Get(schemaVersion []byte) (schema Schema, err error) {
318318
return schema, nil
319319
}
320320

321-
pbSchema, err := s.client.lookupService.GetSchema(s.topic, schemaVersion)
321+
// cache missed, try to use lookupService to find schema info
322+
lookupSchema, err := s.client.lookupService.GetSchema(s.topic, schemaVersion)
322323
if err != nil {
323324
return nil, err
324325
}
325-
326-
if pbSchema == nil {
327-
err = fmt.Errorf("schema not found for topic: [ %v ], schema version : [ %v ]", s.topic, schemaVersion)
328-
return nil, err
326+
// http lookup service
327+
if lookupSchema.HTTPSchemaType != "" {
328+
schemaType, exists := HTTPSchemaTypeMap[strings.ToUpper(lookupSchema.HTTPSchemaType)]
329+
if !exists {
330+
err = fmt.Errorf("unsupported schema type [%s] for topic: [ %v ], schema version : [ %v ]",
331+
lookupSchema.HTTPSchemaType,
332+
s.topic,
333+
schemaVersion,
334+
)
335+
return nil, err
336+
}
337+
schema, err = NewSchema(
338+
schemaType,
339+
[]byte(lookupSchema.Data),
340+
lookupSchema.Properties,
341+
)
342+
} else {
343+
// pb cmd lookup service
344+
schema, err = NewSchema(
345+
SchemaType(lookupSchema.PbSchemaType),
346+
[]byte(lookupSchema.Data),
347+
lookupSchema.Properties,
348+
)
329349
}
330-
331-
var properties = internal.ConvertToStringMap(pbSchema.Properties)
332-
333-
schema, err = NewSchema(SchemaType(*pbSchema.Type), pbSchema.SchemaData, properties)
334350
if err != nil {
335351
return nil, err
336352
}
353+
337354
s.add(key, schema)
338355
return schema, nil
339356
}

pulsar/internal/lookup_service.go

Lines changed: 57 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
package internal
1919

2020
import (
21+
"encoding/binary"
2122
"errors"
2223
"fmt"
2324
"net/url"
25+
"strings"
2426

2527
"google.golang.org/protobuf/proto"
2628

@@ -34,6 +36,16 @@ type LookupResult struct {
3436
PhysicalAddr *url.URL
3537
}
3638

39+
// LookupSchema return lookup schema result
40+
// we need to define a LookupSchema struct as return type to avoid directly returning pulsar.Schema
41+
// in case we might meet importing cycle problem
42+
type LookupSchema struct {
43+
PbSchemaType int // pb binary cmd lookup return type schema index
44+
HTTPSchemaType string `json:"type"` // http lookup return type schema name
45+
Data string `json:"data"`
46+
Properties map[string]string `json:"properties"`
47+
}
48+
3749
// GetTopicsOfNamespaceMode for CommandGetTopicsOfNamespace_Mode
3850
type GetTopicsOfNamespaceMode string
3951

@@ -62,7 +74,7 @@ type LookupService interface {
6274
GetTopicsOfNamespace(namespace string, mode GetTopicsOfNamespaceMode) ([]string, error)
6375

6476
// GetSchema returns schema for a given version.
65-
GetSchema(topic string, schemaVersion []byte) (schema *pb.Schema, err error)
77+
GetSchema(topic string, schemaVersion []byte) (*LookupSchema, error)
6678

6779
GetBrokerAddress(brokerServiceURL string, proxyThroughServiceURL bool) (*LookupResult, error)
6880

@@ -97,7 +109,7 @@ func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, serviceNameResol
97109
}
98110
}
99111

100-
func (ls *lookupService) GetSchema(topic string, schemaVersion []byte) (schema *pb.Schema, err error) {
112+
func (ls *lookupService) GetSchema(topic string, schemaVersion []byte) (*LookupSchema, error) {
101113
id := ls.rpcClient.NewRequestID()
102114
req := &pb.CommandGetSchema{
103115
RequestId: proto.Uint64(id),
@@ -106,12 +118,23 @@ func (ls *lookupService) GetSchema(topic string, schemaVersion []byte) (schema *
106118
}
107119
res, err := ls.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_GET_SCHEMA, req)
108120
if err != nil {
109-
return nil, err
121+
return &LookupSchema{}, err
110122
}
111123
if res.Response.Error != nil {
112-
return nil, errors.New(res.Response.GetError().String())
124+
return &LookupSchema{}, errors.New(res.Response.GetError().String())
113125
}
114-
return res.Response.GetSchemaResponse.Schema, nil
126+
127+
// deserialize pb.schema and convert it to LookupSchema struct
128+
pbSchema := res.Response.GetSchemaResponse.Schema
129+
if pbSchema == nil {
130+
err = fmt.Errorf("schema not found for topic: [ %v ], schema version : [ %v ]", topic, schemaVersion)
131+
return &LookupSchema{}, err
132+
}
133+
return &LookupSchema{
134+
PbSchemaType: int(*pbSchema.Type),
135+
Data: string(pbSchema.SchemaData),
136+
Properties: ConvertToStringMap(pbSchema.Properties),
137+
}, nil
115138
}
116139

117140
func (ls *lookupService) GetBrokerAddress(brokerServiceURL string, proxyThroughServiceURL bool) (*LookupResult, error) {
@@ -273,6 +296,8 @@ const HTTPAdminServiceV1Format string = "/admin/%s/partitions"
273296
const HTTPAdminServiceV2Format string = "/admin/v2/%s/partitions"
274297
const HTTPTopicUnderNamespaceV1 string = "/admin/namespaces/%s/destinations?mode=%s"
275298
const HTTPTopicUnderNamespaceV2 string = "/admin/v2/namespaces/%s/topics?mode=%s"
299+
const HTTPSchemaV2 string = "/admin/v2/schemas/%s/schema"
300+
const HTTPSchemaWithVersionV2 string = "/admin/v2/schemas/%s/schema/%d"
276301

277302
type httpLookupData struct {
278303
BrokerURL string `json:"brokerUrl"`
@@ -371,8 +396,33 @@ func (h *httpLookupService) GetTopicsOfNamespace(namespace string, mode GetTopic
371396
return topics, nil
372397
}
373398

374-
func (h *httpLookupService) GetSchema(_ string, _ []byte) (schema *pb.Schema, err error) {
375-
return nil, errors.New("GetSchema is not supported by httpLookupService")
399+
func (h *httpLookupService) GetSchema(topic string, schemaVersion []byte) (*LookupSchema, error) {
400+
topicName, err := ParseTopicName(topic)
401+
if err != nil {
402+
return nil, err
403+
}
404+
topicRestPath := fmt.Sprintf("%s/%s", topicName.Namespace, topicName.Topic)
405+
var path string
406+
if schemaVersion != nil {
407+
path = fmt.Sprintf(HTTPSchemaWithVersionV2, topicRestPath, int64(binary.BigEndian.Uint64(schemaVersion)))
408+
} else {
409+
path = fmt.Sprintf(HTTPSchemaV2, topicRestPath)
410+
}
411+
httpSchema := &LookupSchema{}
412+
if err := h.httpClient.Get(path, &httpSchema, nil); err != nil {
413+
if strings.HasPrefix(err.Error(), "Code: 404") {
414+
// Unlike other language pulsar sdks,
415+
// golang sdk directly use pb.Schema{} binary command instead of lookup service
416+
// in consumer/producer schema initialization.
417+
//
418+
// Golang sdk only use this GetSchema() interface in tableView schema query and msg deserialize,
419+
// so that we don't need to take more care about 404 NotFound http exception.
420+
err = fmt.Errorf("schema not found for topic: [ %v ], schema version : [ %v ]", topic, schemaVersion)
421+
}
422+
h.log.Errorf("schema [ %v ] request error, schema version : [ %v ]", topic, schemaVersion)
423+
return &LookupSchema{}, err
424+
}
425+
return httpSchema, nil
376426
}
377427

378428
func (h *httpLookupService) ServiceNameResolver() *ServiceNameResolver {

pulsar/schema.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,27 @@ const (
6161
ProtoNative = 20 //Protobuf native message encoding and decoding
6262
)
6363

64+
var HTTPSchemaTypeMap = map[string]SchemaType{
65+
"NONE": BYTES,
66+
"STRING": STRING,
67+
"JSON": JSON,
68+
"PROTOBUF": PROTOBUF,
69+
"AVRO": AVRO,
70+
"BOOLEAN": BOOLEAN,
71+
"INT8": INT8,
72+
"INT16": INT16,
73+
"INT32": INT32,
74+
"INT64": INT64,
75+
"FLOAT": FLOAT,
76+
"DOUBLE": DOUBLE,
77+
"KEYVALUE": KeyValue,
78+
"BYTES": BYTES,
79+
"AUTO": AUTO,
80+
"AUTOCONSUME": AutoConsume,
81+
"AUTOPUBLISH": AutoPublish,
82+
"PROTOBUF_NATIVE": ProtoNative,
83+
}
84+
6485
// Encapsulates data around the schema definition
6586
type SchemaInfo struct {
6687
Name string

pulsar/schema_test.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,18 @@ func createClient() Client {
5858
return client
5959
}
6060

61+
func createHTTPLookupClient() Client {
62+
// create client
63+
lookupURL := "http://localhost:8080"
64+
client, err := NewClient(ClientOptions{
65+
URL: lookupURL,
66+
})
67+
if err != nil {
68+
log.Fatal(err)
69+
}
70+
return client
71+
}
72+
6173
func TestBytesSchema(t *testing.T) {
6274
client := createClient()
6375
defer client.Close()
@@ -167,6 +179,54 @@ func TestJsonSchema(t *testing.T) {
167179
defer consumer.Close()
168180
}
169181

182+
func TestHTTPLookupJsonSchema(t *testing.T) {
183+
client := createHTTPLookupClient()
184+
defer client.Close()
185+
186+
properties := make(map[string]string)
187+
properties["pulsar"] = "hello"
188+
jsonSchemaWithProperties := NewJSONSchema(exampleSchemaDef, properties)
189+
producer1, err := client.CreateProducer(ProducerOptions{
190+
Topic: "httpLookupJsonTopic",
191+
Schema: jsonSchemaWithProperties,
192+
})
193+
assert.Nil(t, err)
194+
195+
_, err = producer1.Send(context.Background(), &ProducerMessage{
196+
Value: &testJSON{
197+
ID: 100,
198+
Name: "pulsar",
199+
},
200+
})
201+
if err != nil {
202+
log.Fatal(err)
203+
}
204+
producer1.Close()
205+
206+
//create consumer
207+
var s testJSON
208+
209+
consumerJS, err := NewJSONSchemaWithValidation(exampleSchemaDef, nil)
210+
assert.Nil(t, err)
211+
assert.NotNil(t, consumerJS)
212+
consumerJS = NewJSONSchema(exampleSchemaDef, nil) // test this legacy function
213+
consumer, err := client.Subscribe(ConsumerOptions{
214+
Topic: "httpLookupJsonTopic",
215+
SubscriptionName: "sub-1",
216+
Schema: consumerJS,
217+
SubscriptionInitialPosition: SubscriptionPositionEarliest,
218+
})
219+
assert.Nil(t, err)
220+
msg, err := consumer.Receive(context.Background())
221+
assert.Nil(t, err)
222+
err = msg.GetSchemaValue(&s)
223+
assert.Nil(t, err)
224+
assert.Equal(t, s.ID, 100)
225+
assert.Equal(t, s.Name, "pulsar")
226+
227+
defer consumer.Close()
228+
}
229+
170230
func TestProtoSchema(t *testing.T) {
171231
client := createClient()
172232
defer client.Close()
@@ -318,6 +378,50 @@ func TestAvroSchema(t *testing.T) {
318378
defer consumer.Close()
319379
}
320380

381+
func TestHTTPLookupAvroSchema(t *testing.T) {
382+
client := createHTTPLookupClient()
383+
defer client.Close()
384+
385+
// create producer
386+
asProducer, err := NewAvroSchemaWithValidation(exampleSchemaDef, nil)
387+
assert.Nil(t, err)
388+
assert.NotNil(t, asProducer)
389+
asProducer = NewAvroSchema(exampleSchemaDef, nil)
390+
producer, err := client.CreateProducer(ProducerOptions{
391+
Topic: "httpLookup-avro-topic",
392+
Schema: asProducer,
393+
})
394+
assert.Nil(t, err)
395+
if _, err := producer.Send(context.Background(), &ProducerMessage{
396+
Value: testAvro{
397+
ID: 100,
398+
Name: "pulsar",
399+
},
400+
}); err != nil {
401+
log.Fatal(err)
402+
}
403+
404+
//create consumer
405+
unobj := testAvro{}
406+
407+
asConsumer := NewAvroSchema(exampleSchemaDef, nil)
408+
consumer, err := client.Subscribe(ConsumerOptions{
409+
Topic: "httpLookup-avro-topic",
410+
SubscriptionName: "sub-1",
411+
Schema: asConsumer,
412+
SubscriptionInitialPosition: SubscriptionPositionEarliest,
413+
})
414+
assert.Nil(t, err)
415+
416+
msg, err := consumer.Receive(context.Background())
417+
assert.Nil(t, err)
418+
err = msg.GetSchemaValue(&unobj)
419+
assert.Nil(t, err)
420+
assert.Equal(t, unobj.ID, 100)
421+
assert.Equal(t, unobj.Name, "pulsar")
422+
defer consumer.Close()
423+
}
424+
321425
func TestStringSchema(t *testing.T) {
322426
client := createClient()
323427
defer client.Close()

0 commit comments

Comments
 (0)