Skip to content

Commit 3d0c06f

Browse files
committed
[improve] Support http lookup getSchema() interfa
1 parent 55c71e6 commit 3d0c06f

File tree

5 files changed

+270
-39
lines changed

5 files changed

+270
-39
lines changed

pulsar/consumer_partition.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -318,22 +318,22 @@ func (s *schemaInfoCache) Get(schemaVersion []byte) (schema Schema, err error) {
318318
return schema, nil
319319
}
320320

321-
pbSchema, err := s.client.lookupService.GetSchema(s.topic, schemaVersion)
321+
// cache missed, try to use lookupService to find schema info
322+
lookupSchema, err := s.client.lookupService.GetSchema(s.topic, schemaVersion)
322323
if err != nil {
323324
return nil, err
324325
}
325-
326-
if pbSchema == nil {
327-
err = fmt.Errorf("schema not found for topic: [ %v ], schema version : [ %v ]", s.topic, schemaVersion)
328-
return nil, err
329-
}
330-
331-
var properties = internal.ConvertToStringMap(pbSchema.Properties)
332-
333-
schema, err = NewSchema(SchemaType(*pbSchema.Type), pbSchema.SchemaData, properties)
326+
schema, err = NewSchema(
327+
// lookupSchema.SchemaType is internal package SchemaType type,
328+
// we need to cast it to pulsar.SchemaType as soon as we use it in current pulsar package
329+
SchemaType(lookupSchema.SchemaType),
330+
lookupSchema.Data,
331+
lookupSchema.Properties,
332+
)
334333
if err != nil {
335334
return nil, err
336335
}
336+
337337
s.add(key, schema)
338338
return schema, nil
339339
}

pulsar/internal/lookup_service.go

Lines changed: 69 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
package internal
1919

2020
import (
21+
"encoding/binary"
2122
"errors"
2223
"fmt"
2324
"net/url"
25+
"strings"
2426

2527
"google.golang.org/protobuf/proto"
2628

@@ -34,6 +36,13 @@ type LookupResult struct {
3436
PhysicalAddr *url.URL
3537
}
3638

39+
// LookupSchema return lookup schema result
40+
type LookupSchema struct {
41+
SchemaType SchemaType
42+
Data []byte
43+
Properties map[string]string
44+
}
45+
3746
// GetTopicsOfNamespaceMode for CommandGetTopicsOfNamespace_Mode
3847
type GetTopicsOfNamespaceMode string
3948

@@ -62,7 +71,7 @@ type LookupService interface {
6271
GetTopicsOfNamespace(namespace string, mode GetTopicsOfNamespaceMode) ([]string, error)
6372

6473
// GetSchema returns schema for a given version.
65-
GetSchema(topic string, schemaVersion []byte) (schema *pb.Schema, err error)
74+
GetSchema(topic string, schemaVersion []byte) (*LookupSchema, error)
6675

6776
GetBrokerAddress(brokerServiceURL string, proxyThroughServiceURL bool) (*LookupResult, error)
6877

@@ -97,7 +106,7 @@ func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, serviceNameResol
97106
}
98107
}
99108

100-
func (ls *lookupService) GetSchema(topic string, schemaVersion []byte) (schema *pb.Schema, err error) {
109+
func (ls *lookupService) GetSchema(topic string, schemaVersion []byte) (*LookupSchema, error) {
101110
id := ls.rpcClient.NewRequestID()
102111
req := &pb.CommandGetSchema{
103112
RequestId: proto.Uint64(id),
@@ -106,12 +115,23 @@ func (ls *lookupService) GetSchema(topic string, schemaVersion []byte) (schema *
106115
}
107116
res, err := ls.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_GET_SCHEMA, req)
108117
if err != nil {
109-
return nil, err
118+
return &LookupSchema{}, err
110119
}
111120
if res.Response.Error != nil {
112-
return nil, errors.New(res.Response.GetError().String())
121+
return &LookupSchema{}, errors.New(res.Response.GetError().String())
122+
}
123+
124+
// deserialize pbSchema and convert it to LookupSchema struct
125+
pbSchema := res.Response.GetSchemaResponse.Schema
126+
if pbSchema == nil {
127+
err = fmt.Errorf("schema not found for topic: [ %v ], schema version : [ %v ]", topic, schemaVersion)
128+
return &LookupSchema{}, err
113129
}
114-
return res.Response.GetSchemaResponse.Schema, nil
130+
return &LookupSchema{
131+
SchemaType: SchemaType(int(*pbSchema.Type)),
132+
Data: pbSchema.SchemaData,
133+
Properties: ConvertToStringMap(pbSchema.Properties),
134+
}, nil
115135
}
116136

117137
func (ls *lookupService) GetBrokerAddress(brokerServiceURL string, proxyThroughServiceURL bool) (*LookupResult, error) {
@@ -273,6 +293,8 @@ const HTTPAdminServiceV1Format string = "/admin/%s/partitions"
273293
const HTTPAdminServiceV2Format string = "/admin/v2/%s/partitions"
274294
const HTTPTopicUnderNamespaceV1 string = "/admin/namespaces/%s/destinations?mode=%s"
275295
const HTTPTopicUnderNamespaceV2 string = "/admin/v2/namespaces/%s/topics?mode=%s"
296+
const HTTPSchemaV2 string = "/admin/v2/schemas/%s/schema"
297+
const HTTPSchemaWithVersionV2 string = "/admin/v2/schemas/%s/schema/%d"
276298

277299
type httpLookupData struct {
278300
BrokerURL string `json:"brokerUrl"`
@@ -289,6 +311,12 @@ type httpLookupService struct {
289311
metrics *Metrics
290312
}
291313

314+
type httpLookupSchema struct {
315+
HTTPSchemaType string `json:"type"`
316+
Data string `json:"data"`
317+
Properties map[string]string `json:"properties"`
318+
}
319+
292320
func (h *httpLookupService) GetBrokerAddress(brokerServiceURL string, _ bool) (*LookupResult, error) {
293321
logicalAddress, err := url.ParseRequestURI(brokerServiceURL)
294322
if err != nil {
@@ -371,8 +399,42 @@ func (h *httpLookupService) GetTopicsOfNamespace(namespace string, mode GetTopic
371399
return topics, nil
372400
}
373401

374-
func (h *httpLookupService) GetSchema(_ string, _ []byte) (schema *pb.Schema, err error) {
375-
return nil, errors.New("GetSchema is not supported by httpLookupService")
402+
func (h *httpLookupService) GetSchema(topic string, schemaVersion []byte) (*LookupSchema, error) {
403+
topicName, err := ParseTopicName(topic)
404+
if err != nil {
405+
return nil, err
406+
}
407+
topicRestPath := fmt.Sprintf("%s/%s", topicName.Namespace, topicName.Topic)
408+
var path string
409+
if schemaVersion != nil {
410+
path = fmt.Sprintf(HTTPSchemaWithVersionV2, topicRestPath, int64(binary.BigEndian.Uint64(schemaVersion)))
411+
} else {
412+
path = fmt.Sprintf(HTTPSchemaV2, topicRestPath)
413+
}
414+
lookupSchema := &httpLookupSchema{}
415+
if err := h.httpClient.Get(path, &lookupSchema, nil); err != nil {
416+
if strings.HasPrefix(err.Error(), "Code: 404") {
417+
err = fmt.Errorf("schema not found for topic: [ %v ], schema version : [ %v ]", topic, schemaVersion)
418+
}
419+
h.log.Errorf("schema [ %v ] request error, schema version : [ %v ]", topic, schemaVersion)
420+
return &LookupSchema{}, err
421+
}
422+
423+
// deserialize httpSchema and convert it to LookupSchema struct
424+
schemaType, exists := HTTPSchemaTypeMap[strings.ToUpper(lookupSchema.HTTPSchemaType)]
425+
if !exists {
426+
err = fmt.Errorf("unsupported schema type [%s] for topic: [ %v ], schema version : [ %v ]",
427+
lookupSchema.HTTPSchemaType,
428+
topic,
429+
schemaVersion,
430+
)
431+
return nil, err
432+
}
433+
return &LookupSchema{
434+
SchemaType: schemaType,
435+
Data: []byte(lookupSchema.Data),
436+
Properties: lookupSchema.Properties,
437+
}, nil
376438
}
377439

378440
func (h *httpLookupService) ServiceNameResolver() *ServiceNameResolver {

pulsar/internal/schema.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package internal
19+
20+
// SchemaType We need to define a SchemaType in this internal package, to avoid directly returning pulsar.SchemaType.
21+
// In case we might encounter importing cycle problem.
22+
type SchemaType int
23+
24+
const (
25+
NONE SchemaType = iota //No schema defined
26+
STRING //Simple String encoding with UTF-8
27+
JSON //JSON object encoding and validation
28+
PROTOBUF //Protobuf message encoding and decoding
29+
AVRO //Serialize and deserialize via Avro
30+
BOOLEAN //
31+
INT8 //A 8-byte integer.
32+
INT16 //A 16-byte integer.
33+
INT32 //A 32-byte integer.
34+
INT64 //A 64-byte integer.
35+
FLOAT //A float number.
36+
DOUBLE //A double number
37+
_ //
38+
_ //
39+
_ //
40+
KeyValue //A Schema that contains Key Schema and Value Schema.
41+
BYTES = 0 //A bytes array.
42+
AUTO = -2 //
43+
AutoConsume = -3 //Auto Consume Type.
44+
AutoPublish = -4 //Auto Publish Type.
45+
ProtoNative = 20 //Protobuf native message encoding and decoding
46+
)
47+
48+
var HTTPSchemaTypeMap = map[string]SchemaType{
49+
"NONE": BYTES,
50+
"STRING": STRING,
51+
"JSON": JSON,
52+
"PROTOBUF": PROTOBUF,
53+
"AVRO": AVRO,
54+
"BOOLEAN": BOOLEAN,
55+
"INT8": INT8,
56+
"INT16": INT16,
57+
"INT32": INT32,
58+
"INT64": INT64,
59+
"FLOAT": FLOAT,
60+
"DOUBLE": DOUBLE,
61+
"KEYVALUE": KeyValue,
62+
"BYTES": BYTES,
63+
"AUTO": AUTO,
64+
"AUTOCONSUME": AutoConsume,
65+
"AUTOPUBLISH": AutoPublish,
66+
"PROTOBUF_NATIVE": ProtoNative,
67+
}

pulsar/schema.go

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"sync"
2727
"unsafe"
2828

29+
"github.com/apache/pulsar-client-go/pulsar/internal"
2930
log "github.com/sirupsen/logrus"
3031

3132
"github.com/hamba/avro/v2"
@@ -35,30 +36,27 @@ import (
3536
"google.golang.org/protobuf/types/descriptorpb"
3637
)
3738

38-
type SchemaType int
39+
type SchemaType internal.SchemaType
3940

4041
const (
41-
NONE SchemaType = iota //No schema defined
42-
STRING //Simple String encoding with UTF-8
43-
JSON //JSON object encoding and validation
44-
PROTOBUF //Protobuf message encoding and decoding
45-
AVRO //Serialize and deserialize via Avro
46-
BOOLEAN //
47-
INT8 //A 8-byte integer.
48-
INT16 //A 16-byte integer.
49-
INT32 //A 32-byte integer.
50-
INT64 //A 64-byte integer.
51-
FLOAT //A float number.
52-
DOUBLE //A double number
53-
_ //
54-
_ //
55-
_ //
56-
KeyValue //A Schema that contains Key Schema and Value Schema.
57-
BYTES = 0 //A bytes array.
58-
AUTO = -2 //
59-
AutoConsume = -3 //Auto Consume Type.
60-
AutoPublish = -4 // Auto Publish Type.
61-
ProtoNative = 20 //Protobuf native message encoding and decoding
42+
NONE = SchemaType(internal.NONE)
43+
STRING = SchemaType(internal.STRING)
44+
JSON = SchemaType(internal.JSON)
45+
PROTOBUF = SchemaType(internal.PROTOBUF)
46+
AVRO = SchemaType(internal.AVRO)
47+
BOOLEAN = SchemaType(internal.BOOLEAN)
48+
INT8 = SchemaType(internal.INT8)
49+
INT16 = SchemaType(internal.INT16)
50+
INT32 = SchemaType(internal.INT32)
51+
INT64 = SchemaType(internal.INT64)
52+
FLOAT = SchemaType(internal.FLOAT)
53+
DOUBLE = SchemaType(internal.DOUBLE)
54+
KeyValue = SchemaType(internal.KeyValue)
55+
BYTES = SchemaType(internal.BYTES)
56+
AUTO = SchemaType(internal.AUTO)
57+
AutoConsume = SchemaType(internal.AutoConsume)
58+
AutoPublish = SchemaType(internal.AutoPublish)
59+
ProtoNative = SchemaType(internal.ProtoNative)
6260
)
6361

6462
// Encapsulates data around the schema definition

0 commit comments

Comments
 (0)