Skip to content

Commit d21a85b

Browse files
committed
Expose NumPartitions() method on producer interface and GetHashingFunction() method
Signed-off-by: Chen Liu <[email protected]>
1 parent f17deac commit d21a85b

File tree

4 files changed

+38
-12
lines changed

4 files changed

+38
-12
lines changed

pulsar/producer.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package pulsar
1919

2020
import (
2121
"context"
22+
"github.com/apache/pulsar-client-go/pulsar/internal"
2223
"time"
2324
)
2425

@@ -182,6 +183,9 @@ type Producer interface {
182183
// the eventual error in publishing
183184
SendAsync(context.Context, *ProducerMessage, func(MessageID, *ProducerMessage, error))
184185

186+
// NumPartitions return the number of partitions that the topic has
187+
NumPartitions() uint32
188+
185189
// LastSequenceID get the last sequence id that was published by this producer.
186190
// This represent either the automatically assigned or custom sequence id (set on the ProducerMessage) that
187191
// was published and acknowledged by the broker.
@@ -199,3 +203,15 @@ type Producer interface {
199203
// of errors, pending writes will not be retried.
200204
Close()
201205
}
206+
207+
// GetHashingFunction return the corresponding hashing function for the hashing scheme
208+
func GetHashingFunction(s HashingScheme) func(string) uint32 {
209+
switch s {
210+
case JavaStringHash:
211+
return internal.JavaStringHash
212+
case Murmur3_32Hash:
213+
return internal.Murmur3_32Hash
214+
default:
215+
return internal.JavaStringHash
216+
}
217+
}

pulsar/producer_impl.go

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -59,17 +59,6 @@ type producer struct {
5959

6060
var partitionsAutoDiscoveryInterval = 1 * time.Minute
6161

62-
func getHashingFunction(s HashingScheme) func(string) uint32 {
63-
switch s {
64-
case JavaStringHash:
65-
return internal.JavaStringHash
66-
case Murmur3_32Hash:
67-
return internal.Murmur3_32Hash
68-
default:
69-
return internal.JavaStringHash
70-
}
71-
}
72-
7362
func newProducer(client *client, options *ProducerOptions) (*producer, error) {
7463
if options.Topic == "" {
7564
return nil, newError(InvalidTopicName, "Topic name is required for producer")
@@ -102,7 +91,7 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) {
10291

10392
if options.MessageRouter == nil {
10493
internalRouter := NewDefaultRouter(
105-
getHashingFunction(options.HashingScheme),
94+
GetHashingFunction(options.HashingScheme),
10695
options.BatchingMaxMessages,
10796
options.BatchingMaxSize,
10897
options.BatchingMaxPublishDelay,

pulsar/producer_partition.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,10 @@ func (p *partitionProducer) Name() string {
324324
return p.producerName
325325
}
326326

327+
func (p *partitionProducer) NumPartitions() uint32 {
328+
return 1
329+
}
330+
327331
func (p *partitionProducer) internalSend(request *sendRequest) {
328332
p.log.Debug("Received send request: ", *request)
329333

pulsar/producer_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"context"
2222
"fmt"
2323
"net/http"
24+
"reflect"
25+
"runtime"
2426
"strconv"
2527
"sync"
2628
"sync/atomic"
@@ -41,6 +43,12 @@ func TestInvalidURL(t *testing.T) {
4143
}
4244
}
4345

46+
func TestGetHashingFunction(t *testing.T) {
47+
assertHashingFunctionEqual(t, internal.JavaStringHash, GetHashingFunction(-1))
48+
assertHashingFunctionEqual(t, internal.JavaStringHash, GetHashingFunction(JavaStringHash))
49+
assertHashingFunctionEqual(t, internal.Murmur3_32Hash, GetHashingFunction(Murmur3_32Hash))
50+
}
51+
4452
func TestProducerConnectError(t *testing.T) {
4553
client, err := NewClient(ClientOptions{
4654
URL: "pulsar://invalid-hostname:6650",
@@ -94,6 +102,7 @@ func TestSimpleProducer(t *testing.T) {
94102

95103
assert.NoError(t, err)
96104
assert.NotNil(t, producer)
105+
assert.Equal(t, uint32(1), producer.NumPartitions())
97106
defer producer.Close()
98107

99108
for i := 0; i < 10; i++ {
@@ -398,6 +407,7 @@ func TestFlushInPartitionedProducer(t *testing.T) {
398407
BatchingMaxPublishDelay: time.Second * 10,
399408
})
400409
assert.Nil(t, err)
410+
assert.Equal(t, uint32(numberOfPartitions), producer.NumPartitions())
401411
defer producer.Close()
402412

403413
// send 5 messages
@@ -473,6 +483,7 @@ func TestRoundRobinRouterPartitionedProducer(t *testing.T) {
473483
DisableBatching: true,
474484
})
475485
assert.Nil(t, err)
486+
assert.Equal(t, uint32(numberOfPartitions), producer.NumPartitions())
476487
defer producer.Close()
477488

478489
// send 5 messages
@@ -1031,3 +1042,9 @@ func TestProducerWithInterceptors(t *testing.T) {
10311042
assert.Equal(t, 10, metric.sendn)
10321043
assert.Equal(t, 10, metric.ackn)
10331044
}
1045+
1046+
func assertHashingFunctionEqual(t *testing.T, func1, func2 func(string) uint32) {
1047+
funcName1 := runtime.FuncForPC(reflect.ValueOf(func1).Pointer()).Name()
1048+
funcName2 := runtime.FuncForPC(reflect.ValueOf(func2).Pointer()).Name()
1049+
assert.Equal(t, funcName1, funcName2)
1050+
}

0 commit comments

Comments
 (0)