Skip to content

Conversation

@geniusjoe
Copy link
Contributor

Master Issue: https://github.com/apache/pulsar/wiki/PIP-43%3A-producer-send-message-with-different-schema#changespart-1

Related pr #611

Motivation

Currently pulsar go sdk has supported multi-version schema in above pr, but the pr does not support getSchema() method with http lookup service. So that we will encounter error when we call msg.GetSchemaValue(v interface{}) error function with http serviceUrl. Demo below:

func createClient() Client {
	// create client
	//lookupURL := "pulsar://localhost:6650"
	lookupURL := "http://localhost:8080"   // change to http protocol serviceUrl
	client, err := NewClient(ClientOptions{
		URL: lookupURL,
	})
	if err != nil {
		log.Fatal(err)
	}
	return client
}

func TestBytesSchema(t *testing.T) {
	client := createClient()
	defer client.Close()

	topic := newTopicName()

	properties := make(map[string]string)
	properties["pulsar"] = "hello"
	producerSchemaBytes := NewBytesSchema(properties)
	producer, err := client.CreateProducer(ProducerOptions{
		Topic:  topic,
		Schema: producerSchemaBytes,
	})
	assert.NoError(t, err)

	_, err = producer.Send(context.Background(), &ProducerMessage{
		Value: []byte(`{"key": "value"}`),
	})
	require.NoError(t, err)
	producer.Close()

	// Create consumer
	consumerSchemaBytes := NewBytesSchema(nil)
	assert.NotNil(t, consumerSchemaBytes)
	consumer, err := client.Subscribe(ConsumerOptions{
		Topic:                       topic,
		SubscriptionName:            "sub-1",
		Schema:                      consumerSchemaBytes,
		SubscriptionInitialPosition: SubscriptionPositionEarliest,
	})
	assert.Nil(t, err)

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()

	// Receive first message
	var out1 []byte
	msg1, err := consumer.Receive(ctx)
	assert.NoError(t, err)
	err = msg1.GetSchemaValue(&out1)
	assert.NoError(t, err)
	assert.Equal(t, []byte(`{"key": "value"}`), out1)
	consumer.Ack(msg1)
	require.NoError(t, err)
}

Test output:

/root/go/pkg/mod/golang.org/[email protected]/bin/go tool test2json -t /root/.cache/JetBrains/GoLand2024.1/tmp/GoLand/___TestBytesSchema_in_github_com_apache_pulsar_client_go_pulsar.test -test.v -test.paniconexit0 -test.run ^\QTestBytesSchema\E$
=== RUN   TestBytesSchema
time="2025-05-14T16:32:45+08:00" level=info msg="Connecting to broker" remote_addr="pulsar://localhost:6650"
time="2025-05-14T16:32:45+08:00" level=info msg="TCP connection established" local_addr="127.0.0.1:36638" remote_addr="pulsar://localhost:6650"
time="2025-05-14T16:32:45+08:00" level=info msg="Connection is ready" local_addr="127.0.0.1:36638" remote_addr="pulsar://localhost:6650"
time="2025-05-14T16:32:45+08:00" level=info msg="Connected producer" cnx="127.0.0.1:36638 -> 127.0.0.1:6650" epoch=0 topic="persistent://public/default/my-topic-147368803"
time="2025-05-14T16:32:45+08:00" level=info msg="Created producer" cnx="127.0.0.1:36638 -> 127.0.0.1:6650" producerID=1 producer_name=standalone-42-1 topic="persistent://public/default/my-topic-147368803"
time="2025-05-14T16:32:45+08:00" level=info msg="Closing producer" producerID=1 producer_name=standalone-42-1 topic="persistent://public/default/my-topic-147368803"
time="2025-05-14T16:32:45+08:00" level=info msg="Closed producer" producerID=1 producer_name=standalone-42-1 topic="persistent://public/default/my-topic-147368803"
time="2025-05-14T16:32:45+08:00" level=info msg="Connected consumer" consumerID=1 name=yzway subscription=sub-1 topic="persistent://public/default/my-topic-147368803"
time="2025-05-14T16:32:45+08:00" level=info msg="Created consumer" consumerID=1 name=yzway subscription=sub-1 topic="persistent://public/default/my-topic-147368803"
    schema_test.go:101: 
        	Error Trace:	/data/code/dev/pulsar-client-go/pulsar/schema_test.go:101
        	Error:      	Received unexpected error:
        	            	GetSchema is not supported by httpLookupService
        	Test:       	TestBytesSchema
    schema_test.go:102: 
        	Error Trace:	/data/code/dev/pulsar-client-go/pulsar/schema_test.go:102
        	Error:      	Not equal: 
        	            	expected: []byte{0x7b, 0x22, 0x6b, 0x65, 0x79, 0x22, 0x3a, 0x20, 0x22, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x7d}
        	            	actual  : []byte(nil)
        	            	
        	            	Diff:
        	            	--- Expected
        	            	+++ Actual
        	            	@@ -1,4 +1,2 @@
        	            	-([]uint8) (len=16) {
        	            	- 00000000  7b 22 6b 65 79 22 3a 20  22 76 61 6c 75 65 22 7d  |{"key": "value"}|
        	            	-}
        	            	+([]uint8) <nil>
        	            	 
        	Test:       	TestBytesSchema
    schema_test.go:104: 
        	Error Trace:	/data/code/dev/pulsar-client-go/pulsar/schema_test.go:104
        	Error:      	Received unexpected error:
        	            	GetSchema is not supported by httpLookupService
        	Test:       	TestBytesSchema
time="2025-05-14T16:32:45+08:00" level=info msg="Closing consumer=1" consumerID=1 name=yzway subscription=sub-1 topic="persistent://public/default/my-topic-147368803"
time="2025-05-14T16:32:45+08:00" level=info msg="Closed consumer" consumerID=1 name=yzway subscription=sub-1 topic="persistent://public/default/my-topic-147368803"
--- FAIL: TestBytesSchema (0.12s)

Expected :[]byte{0x7b, 0x22, 0x6b, 0x65, 0x79, 0x22, 0x3a, 0x20, 0x22, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x7d}
Actual   :[]byte(nil)

Modifications

  • Update LookupService interface return type from GetSchema(topic string, schemaVersion []byte) (schema *pb.Schema, err error) to GetSchema(topic string, schemaVersion []byte) (*LookupSchema, error) to support http lookup protocol in lookup_service.go
  • Support HTTPLookupService GetSchema(topic string, schemaVersion []byte) (*LookupSchema, error) function in lookup_service.go
  • Add http lookup GetSchema() related test cases in schema_test.go

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:
pulsar/schema_test.go/TestHTTPLookupJsonSchema
pulsar/schema_test.go/TestHTTPLookupAvroSchema

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)

Documentation

  • Does this pull request introduce a new feature? (no)

@geniusjoe geniusjoe force-pushed the dev/http-lookup-schema branch from 117ab5f to 4939b05 Compare May 15, 2025 07:14
@geniusjoe geniusjoe force-pushed the dev/http-lookup-schema branch from 4939b05 to fb1ec4a Compare June 5, 2025 12:04
@RobertIndie RobertIndie requested a review from Copilot June 6, 2025 10:43
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds support for the HTTP lookup GetSchema interface in the Pulsar Go SDK. Key changes include updating the LookupService interface and its implementations, adding new test cases for HTTP lookup with JSON/Avro schemas, and updating tooling in the Makefile.

Reviewed Changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
pulsar/schema_test.go Added tests and a helper to create a client for HTTP lookup
pulsar/schema.go Introduced HTTPSchemaTypeMap to map HTTP schema types
pulsar/internal/lookup_service.go Modified GetSchema to return LookupSchema and implemented HTTP lookup
pulsar/consumer_partition.go Updated schema caching logic to support both HTTP and pb lookup methods
Makefile Upgraded golangci-lint version
Comments suppressed due to low confidence (1)

pulsar/schema_test.go:202

  • [nitpick] Consider using an assertion such as require.NoError(t, err) instead of log.Fatal(err) to keep test failure handling consistent with other tests.
if err != nil { log.Fatal(err) }

@geniusjoe geniusjoe force-pushed the dev/http-lookup-schema branch 5 times, most recently from 3d0c06f to 745c0e2 Compare July 2, 2025 04:43
Copy link
Member

@RobertIndie RobertIndie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@geniusjoe geniusjoe force-pushed the dev/http-lookup-schema branch from 745c0e2 to 86ef81b Compare July 2, 2025 09:43
@RobertIndie RobertIndie merged commit 12b966d into apache:master Jul 2, 2025
7 checks passed
@RobertIndie RobertIndie added this to the v0.16.0 milestone Jul 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants