Skip to content

Commit 0612938

Browse files
crossoverJienodece
andauthored
PIP-254: Support configuring client version (#1316)
* pip-254 * lint * lint * admin token * admin token * admin token * Update pulsar/client_impl_test.go Co-authored-by: Zixuan Liu <[email protected]> * Update pulsar/client_impl_test.go Co-authored-by: Zixuan Liu <[email protected]> * Update pulsar/consumer_test.go Co-authored-by: Zixuan Liu <[email protected]> * Update pulsar/consumer_test.go Co-authored-by: Zixuan Liu <[email protected]> * Update pulsar/client_impl_test.go Co-authored-by: Zixuan Liu <[email protected]> * fix lint --------- Co-authored-by: Zixuan Liu <[email protected]>
1 parent edea3eb commit 0612938

File tree

6 files changed

+126
-6
lines changed

6 files changed

+126
-6
lines changed

pulsar/client.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,15 @@ type ClientOptions struct {
171171
// The protocol is binary protocol, i.e. the service URL starts with "pulsar://" or "pulsar+ssl://"
172172
// The `loadManagerClassName` config in broker is a class that implements the `ExtensibleLoadManager` interface
173173
LookupProperties map[string]string
174+
175+
// Set the description.
176+
// By default, when the client connects to the broker, a version string like "Pulsar Go <version>" will be
177+
// carried and saved by the broker. The client version string could be queried from the topic stats.
178+
// This method provides a way to add more description to a specific PulsarClient instance. If it's configured,
179+
// the description will be appended to the original client version string, with '-' as the separator.
180+
// For example, if the client version is 3.0.0, and the description is "forked", the final client version string
181+
// "Pulsar Go 3.0.0-forked".
182+
Description string
174183
}
175184

176185
// Client represents a pulsar client

pulsar/client_impl.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ func newClient(options ClientOptions) (Client, error) {
163163

164164
c := &client{
165165
cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout, keepAliveInterval,
166-
maxConnectionsPerHost, logger, metrics, connectionMaxIdleTime),
166+
maxConnectionsPerHost, logger, metrics, options.Description, connectionMaxIdleTime),
167167
log: logger,
168168
metrics: metrics,
169169
memLimit: internal.NewMemoryLimitController(memLimitBytes, defaultMemoryLimitTriggerThreshold),

pulsar/client_impl_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ import (
2929
"testing"
3030
"time"
3131

32+
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin"
33+
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
34+
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
35+
3236
"github.com/apache/pulsar-client-go/pulsar/auth"
3337
"github.com/apache/pulsar-client-go/pulsar/internal"
3438
"github.com/stretchr/testify/assert"
@@ -208,6 +212,43 @@ func TestTokenAuth(t *testing.T) {
208212

209213
client.Close()
210214
}
215+
func TestTokenAuthWithClientVersion(t *testing.T) {
216+
token, err := os.ReadFile(tokenFilePath)
217+
assert.NoError(t, err)
218+
219+
client, err := NewClient(ClientOptions{
220+
URL: serviceURL,
221+
Authentication: NewAuthenticationToken(string(token)),
222+
Description: "test-client",
223+
})
224+
assert.NoError(t, err)
225+
defer client.Close()
226+
227+
topic := newAuthTopicName()
228+
producer, err := client.CreateProducer(ProducerOptions{
229+
Topic: topic,
230+
})
231+
232+
assert.NoError(t, err)
233+
assert.NotNil(t, producer)
234+
235+
readFile, err := os.ReadFile("../integration-tests/tokens/admin-token")
236+
assert.NoError(t, err)
237+
cfg := &config.Config{
238+
Token: string(readFile),
239+
}
240+
admin, err := admin.New(cfg)
241+
assert.NoError(t, err)
242+
assert.NotNil(t, admin)
243+
244+
topicName, err := utils.GetTopicName(topic)
245+
assert.Nil(t, err)
246+
topicState, err := admin.Topics().GetStats(*topicName)
247+
assert.Nil(t, err)
248+
publisher := topicState.Publishers[0]
249+
assert.True(t, strings.HasPrefix(publisher.ClientVersion, "Pulsar Go version"))
250+
assert.True(t, strings.HasSuffix(publisher.ClientVersion, "-test-client"))
251+
}
211252

212253
func TestTokenAuthWithSupplier(t *testing.T) {
213254
client, err := NewClient(ClientOptions{

pulsar/consumer_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,14 @@ import (
2626
"os"
2727
"regexp"
2828
"strconv"
29+
"strings"
2930
"sync"
3031
"sync/atomic"
3132
"testing"
3233
"time"
3334

35+
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin"
36+
3437
"github.com/stretchr/testify/require"
3538
"github.com/testcontainers/testcontainers-go"
3639
"github.com/testcontainers/testcontainers-go/wait"
@@ -4985,3 +4988,53 @@ func TestConsumerKeepReconnectingAndThenCallClose(t *testing.T) {
49854988
return true
49864989
}, 30*time.Second, 1*time.Second)
49874990
}
4991+
4992+
func TestClientVersion(t *testing.T) {
4993+
client, err := NewClient(ClientOptions{
4994+
URL: lookupURL,
4995+
})
4996+
4997+
assert.Nil(t, err)
4998+
defer client.Close()
4999+
5000+
topic := newTopicName()
5001+
// create producer
5002+
producer, err := client.CreateProducer(ProducerOptions{
5003+
Topic: topic,
5004+
DisableBatching: false,
5005+
})
5006+
assert.Nil(t, err)
5007+
defer producer.Close()
5008+
5009+
cfg := &config.Config{}
5010+
admin, err := admin.New(cfg)
5011+
assert.NoError(t, err)
5012+
assert.NotNil(t, admin)
5013+
5014+
topicName, err := utils.GetTopicName(topic)
5015+
assert.Nil(t, err)
5016+
topicState, err := admin.Topics().GetStats(*topicName)
5017+
assert.Nil(t, err)
5018+
publisher := topicState.Publishers[0]
5019+
assert.True(t, strings.HasPrefix(publisher.ClientVersion, "Pulsar Go version"))
5020+
5021+
topic = newTopicName()
5022+
client, err = NewClient(ClientOptions{
5023+
URL: lookupURL,
5024+
Description: "test-client",
5025+
})
5026+
assert.Nil(t, err)
5027+
producer, err = client.CreateProducer(ProducerOptions{
5028+
Topic: topic,
5029+
DisableBatching: false,
5030+
})
5031+
assert.Nil(t, err)
5032+
topicName, err = utils.GetTopicName(topic)
5033+
assert.Nil(t, err)
5034+
topicState, err = admin.Topics().GetStats(*topicName)
5035+
assert.Nil(t, err)
5036+
publisher = topicState.Publishers[0]
5037+
assert.True(t, strings.HasPrefix(publisher.ClientVersion, "Pulsar Go version"))
5038+
assert.True(t, strings.HasSuffix(publisher.ClientVersion, "-test-client"))
5039+
5040+
}

pulsar/internal/connection.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,8 @@ type connection struct {
176176

177177
keepAliveInterval time.Duration
178178

179-
lastActive time.Time
179+
lastActive time.Time
180+
description string
180181
}
181182

182183
// connectionOptions defines configurations for creating connection.
@@ -189,6 +190,7 @@ type connectionOptions struct {
189190
logger log.Logger
190191
metrics *Metrics
191192
keepAliveInterval time.Duration
193+
description string
192194
}
193195

194196
func newConnection(opts connectionOptions) *connection {
@@ -218,6 +220,7 @@ func newConnection(opts connectionOptions) *connection {
218220
listeners: make(map[uint64]ConnectionListener),
219221
consumerHandlers: make(map[uint64]ConsumerHandler),
220222
metrics: opts.metrics,
223+
description: opts.description,
221224
}
222225
cnx.state.Store(int32(connectionInit))
223226
cnx.reader = newConnectionReader(cnx)
@@ -305,7 +308,7 @@ func (c *connection) doHandshake() bool {
305308
c.cnx.SetDeadline(time.Now().Add(c.keepAliveInterval))
306309
cmdConnect := &pb.CommandConnect{
307310
ProtocolVersion: proto.Int32(PulsarProtocolVersion),
308-
ClientVersion: proto.String(ClientVersionString),
311+
ClientVersion: proto.String(c.getClientVersion()),
309312
AuthMethodName: proto.String(c.auth.Name()),
310313
AuthData: authData,
311314
FeatureFlags: &pb.FeatureFlags{
@@ -346,6 +349,16 @@ func (c *connection) doHandshake() bool {
346349
return true
347350
}
348351

352+
func (c *connection) getClientVersion() string {
353+
var clientVersion string
354+
if c.description == "" {
355+
clientVersion = ClientVersionString
356+
} else {
357+
clientVersion = fmt.Sprintf("%s-%s", ClientVersionString, c.description)
358+
}
359+
return clientVersion
360+
}
361+
349362
func (c *connection) IsProxied() bool {
350363
return c.logicalAddr.Host != c.physicalAddr.Host
351364
}
@@ -832,7 +845,7 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge)
832845

833846
cmdAuthResponse := &pb.CommandAuthResponse{
834847
ProtocolVersion: proto.Int32(PulsarProtocolVersion),
835-
ClientVersion: proto.String(ClientVersionString),
848+
ClientVersion: proto.String(c.getClientVersion()),
836849
Response: &pb.AuthData{
837850
AuthMethodName: proto.String(c.auth.Name()),
838851
AuthData: authData,

pulsar/internal/connection_pool.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,9 @@ type connectionPool struct {
5252
keepAliveInterval time.Duration
5353
closeCh chan struct{}
5454

55-
metrics *Metrics
56-
log log.Logger
55+
metrics *Metrics
56+
log log.Logger
57+
description string
5758
}
5859

5960
// NewConnectionPool init connection pool.
@@ -65,6 +66,7 @@ func NewConnectionPool(
6566
maxConnectionsPerHost int,
6667
logger log.Logger,
6768
metrics *Metrics,
69+
description string,
6870
connectionMaxIdleTime time.Duration) ConnectionPool {
6971
p := &connectionPool{
7072
connections: make(map[string]*connection),
@@ -76,6 +78,7 @@ func NewConnectionPool(
7678
log: logger,
7779
metrics: metrics,
7880
closeCh: make(chan struct{}),
81+
description: description,
7982
}
8083
go p.checkAndCleanIdleConnections(connectionMaxIdleTime)
8184
return p
@@ -113,6 +116,7 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U
113116
keepAliveInterval: p.keepAliveInterval,
114117
logger: p.log,
115118
metrics: p.metrics,
119+
description: p.description,
116120
})
117121
p.connections[key] = conn
118122
p.Unlock()

0 commit comments

Comments
 (0)