Skip to content

Commit d355724

Browse files
guyinyouguyinyouRongtongJin
authored
Golang: add push consumer (#990)
* push consumer * fix test * fix golang demo * remove unused func in pushconsumer * golang: optimization code * golang: fix nil subscription in telemetry * golang: update runtime verison 1.24 * golang: update unit test * golang: fix ut * golang: fix ut * golang: fix build.yaml --------- Co-authored-by: guyinyou <[email protected]> Co-authored-by: rongtong <[email protected]>
1 parent 20757cc commit d355724

33 files changed

+5078
-2904
lines changed

.github/workflows/golang_build.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ jobs:
88
strategy:
99
fail-fast: false
1010
matrix:
11-
os: [ ubuntu-22.04, windows-2022 ]
12-
go: [1.17]
11+
os: [ubuntu-22.04, windows-2022]
12+
go: [1.24]
1313
steps:
1414
- name: Checkout
1515
uses: actions/checkout@v2

golang/client.go

Lines changed: 65 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,11 @@ import (
3131
"github.com/apache/rocketmq-clients/golang/v5/pkg/ticker"
3232
"github.com/apache/rocketmq-clients/golang/v5/pkg/utils"
3333
v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
34-
"github.com/golang/protobuf/proto"
3534
"github.com/google/uuid"
3635
"go.uber.org/atomic"
3736
"go.uber.org/zap"
3837
"google.golang.org/grpc/metadata"
38+
"google.golang.org/protobuf/proto"
3939
)
4040

4141
type Client interface {
@@ -117,7 +117,7 @@ func (cs *defaultClientSession) startUp() {
117117
if err != nil {
118118
// we are recovering
119119
if !cs.recovering {
120-
cs.cli.log.Info("Encountered error while receiving TelemetryCommand, trying to recover")
120+
cs.cli.log.Infof("Encountered error while receiving TelemetryCommand, trying to recover, err=%v", err)
121121
// we wait five seconds to give time for the transmission error to be resolved externally before we attempt to read the message again.
122122
time.Sleep(cs.recoveryWaitTime)
123123
cs.recovering = true
@@ -176,13 +176,19 @@ func (cs *defaultClientSession) release() {
176176
}
177177
func (cs *defaultClientSession) publish(ctx context.Context, common *v2.TelemetryCommand) error {
178178
var err error
179-
cs.observerLock.RLock()
180-
if cs.observer != nil {
181-
err = cs.observer.Send(common)
182-
cs.observerLock.RUnlock()
179+
180+
f0 := func() (bool, error) {
181+
cs.observerLock.RLock()
182+
defer cs.observerLock.RUnlock()
183+
if cs.observer != nil {
184+
return true, cs.observer.Send(common)
185+
}
186+
return false, nil
187+
}
188+
over, err := f0()
189+
if over {
183190
return err
184191
}
185-
cs.observerLock.RUnlock()
186192

187193
cs.observerLock.Lock()
188194
defer cs.observerLock.Unlock()
@@ -218,8 +224,8 @@ type defaultClient struct {
218224
endpointsTelemetryClientTable map[string]*defaultClientSession
219225
endpointsTelemetryClientsLock sync.RWMutex
220226
on atomic.Bool
221-
222-
clientImpl isClient
227+
inited atomic.Bool
228+
clientImpl isClient
223229
}
224230

225231
var NewClient = func(config *Config, opts ...ClientOption) (Client, error) {
@@ -235,6 +241,7 @@ var NewClient = func(config *Config, opts ...ClientOption) (Client, error) {
235241
messageInterceptors: make([]MessageInterceptor, 0),
236242
endpointsTelemetryClientTable: make(map[string]*defaultClientSession),
237243
on: *atomic.NewBool(true),
244+
inited: *atomic.NewBool(false),
238245
}
239246
cli.log = sugarBaseLogger.With("client_id", cli.clientID)
240247
for _, opt := range opts {
@@ -299,7 +306,7 @@ func (cli *defaultClient) registerMessageInterceptor(messageInterceptor MessageI
299306
cli.messageInterceptors = append(cli.messageInterceptors, messageInterceptor)
300307
}
301308

302-
func (cli *defaultClient) doBefore(hookPoint MessageHookPoints, messageCommons []*MessageCommon) {
309+
func (cli *defaultClient) doBefore(hookPoint MessageHookPoints, messageCommons []*MessageCommon) error {
303310
cli.messageInterceptorsLock.RLocker().Lock()
304311
defer cli.messageInterceptorsLock.RLocker().Unlock()
305312

@@ -309,9 +316,10 @@ func (cli *defaultClient) doBefore(hookPoint MessageHookPoints, messageCommons [
309316
cli.log.Errorf("exception raised while intercepting message, hookPoint=%v, err=%v", hookPoint, err)
310317
}
311318
}
319+
return nil
312320
}
313321

314-
func (cli *defaultClient) doAfter(hookPoint MessageHookPoints, messageCommons []*MessageCommon, duration time.Duration, status MessageHookPointsStatus) {
322+
func (cli *defaultClient) doAfter(hookPoint MessageHookPoints, messageCommons []*MessageCommon, duration time.Duration, status MessageHookPointsStatus) error {
315323
cli.messageInterceptorsLock.RLocker().Lock()
316324
defer cli.messageInterceptorsLock.RLocker().Unlock()
317325

@@ -321,6 +329,7 @@ func (cli *defaultClient) doAfter(hookPoint MessageHookPoints, messageCommons []
321329
cli.log.Errorf("exception raised while intercepting message, hookPoint=%v, err=%v", hookPoint, err)
322330
}
323331
}
332+
return nil
324333
}
325334

326335
func (cli *defaultClient) getMessageQueues(ctx context.Context, topic string) ([]*v2.MessageQueue, error) {
@@ -417,6 +426,36 @@ func (cli *defaultClient) getSettingsCommand() *v2.TelemetryCommand {
417426
}
418427
}
419428

429+
func (cli *defaultClient) queryAssignments(ctx context.Context, topic string, group string, duration time.Duration) (*[]*v2.Assignment, error) {
430+
ctx = cli.Sign(ctx)
431+
response, err := cli.clientManager.QueryAssignments(ctx, cli.accessPoint, cli.getQueryAssignmentRequest(topic, group), duration)
432+
if err != nil {
433+
return nil, err
434+
}
435+
if response.GetStatus().GetCode() != v2.Code_OK {
436+
return nil, &ErrRpcStatus{
437+
Code: int32(response.Status.GetCode()),
438+
Message: response.GetStatus().GetMessage(),
439+
}
440+
}
441+
ret := response.GetAssignments()
442+
return &ret, nil
443+
}
444+
445+
func (cli *defaultClient) getQueryAssignmentRequest(topic string, group string) *v2.QueryAssignmentRequest {
446+
return &v2.QueryAssignmentRequest{
447+
Topic: &v2.Resource{
448+
Name: topic,
449+
ResourceNamespace: cli.config.NameSpace,
450+
},
451+
Group: &v2.Resource{
452+
Name: group,
453+
ResourceNamespace: cli.config.NameSpace,
454+
},
455+
Endpoints: cli.accessPoint,
456+
}
457+
}
458+
420459
func (cli *defaultClient) doHeartbeat(target string, request *v2.HeartbeatRequest) error {
421460
ctx := cli.Sign(context.Background())
422461
endpoints, err := utils.ParseTarget(target)
@@ -428,9 +467,6 @@ func (cli *defaultClient) doHeartbeat(target string, request *v2.HeartbeatReques
428467
return fmt.Errorf("failed to send heartbeat, endpoints=%v, err=%v, requestId=%s", endpoints, err, utils.GetRequestID(ctx))
429468
}
430469
if resp.Status.GetCode() != v2.Code_OK {
431-
if resp.Status.GetCode() == v2.Code_UNRECOGNIZED_CLIENT_TYPE {
432-
go cli.trySyncSettings()
433-
}
434470
cli.log.Errorf("failed to send heartbeat, code=%v, status message=[%s], endpoints=%v, requestId=%s", resp.Status.GetCode(), resp.Status.GetMessage(), endpoints, utils.GetRequestID(ctx))
435471
return &ErrRpcStatus{
436472
Code: int32(resp.Status.GetCode()),
@@ -496,6 +532,7 @@ func (cli *defaultClient) startUp() error {
496532
cm.startUp()
497533
cm.RegisterClient(cli)
498534
cli.clientManager = cm
535+
499536
for _, topic := range cli.initTopics {
500537
_, err := cli.getMessageQueues(context.Background(), topic)
501538
if err != nil {
@@ -546,6 +583,13 @@ func (cli *defaultClient) startUp() error {
546583
})
547584
}
548585
ticker.Tick(f, time.Second*30, cli.done)
586+
587+
// wait syncSettings finish
588+
for !cli.inited.Load() {
589+
sugarBaseLogger.Infoln("wait for sync settings finish")
590+
time.Sleep(time.Second)
591+
}
592+
sugarBaseLogger.Infoln("sync settings finished")
549593
return nil
550594
}
551595

@@ -587,6 +631,10 @@ func (cli *defaultClient) GracefulStop() error {
587631
return nil
588632
}
589633

634+
func (cli *defaultClient) isRunning() bool {
635+
return cli.on.Load()
636+
}
637+
590638
func (cli *defaultClient) Sign(ctx context.Context) context.Context {
591639
now := time.Now().Format("20060102T150405Z")
592640
return metadata.AppendToOutgoingContext(ctx,
@@ -629,7 +677,9 @@ func (cli *defaultClient) onSettingsCommand(endpoints *v2.Endpoints, settings *v
629677
if metric != nil {
630678
cli.clientMeterProvider.Reset(metric)
631679
}
632-
return cli.settings.applySettingsCommand(settings)
680+
err := cli.settings.applySettingsCommand(settings)
681+
cli.inited.Store(true)
682+
return err
633683
}
634684

635685
func (cli *defaultClient) onRecoverOrphanedTransactionCommand(endpoints *v2.Endpoints, command *v2.RecoverOrphanedTransactionCommand) {

golang/client_manager.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ type ClientManager interface {
3333
RegisterClient(client Client)
3434
UnRegisterClient(client Client)
3535
QueryRoute(ctx context.Context, endpoints *v2.Endpoints, request *v2.QueryRouteRequest, duration time.Duration) (*v2.QueryRouteResponse, error)
36+
QueryAssignments(ctx context.Context, endpoints *v2.Endpoints, request *v2.QueryAssignmentRequest, duration time.Duration) (*v2.QueryAssignmentResponse, error)
3637
HeartBeat(ctx context.Context, endpoints *v2.Endpoints, request *v2.HeartbeatRequest, duration time.Duration) (*v2.HeartbeatResponse, error)
3738
SendMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.SendMessageRequest, duration time.Duration) (*v2.SendMessageResponse, error)
3839
Telemetry(ctx context.Context, endpoints *v2.Endpoints, duration time.Duration) (v2.MessagingService_TelemetryClient, error)
@@ -41,6 +42,7 @@ type ClientManager interface {
4142
ReceiveMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.ReceiveMessageRequest) (v2.MessagingService_ReceiveMessageClient, error)
4243
AckMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.AckMessageRequest, duration time.Duration) (*v2.AckMessageResponse, error)
4344
ChangeInvisibleDuration(ctx context.Context, endpoints *v2.Endpoints, request *v2.ChangeInvisibleDurationRequest, duration time.Duration) (*v2.ChangeInvisibleDurationResponse, error)
45+
ForwardMessageToDeadLetterQueue(ctx context.Context, endpoints *v2.Endpoints, request *v2.ForwardMessageToDeadLetterQueueRequest, duration time.Duration) (*v2.ForwardMessageToDeadLetterQueueResponse, error)
4446
}
4547

4648
type clientManagerOptions struct {
@@ -232,6 +234,17 @@ func (cm *defaultClientManager) QueryRoute(ctx context.Context, endpoints *v2.En
232234
return ret, err
233235
}
234236

237+
func (cm *defaultClientManager) QueryAssignments(ctx context.Context, endpoints *v2.Endpoints, request *v2.QueryAssignmentRequest, duration time.Duration) (*v2.QueryAssignmentResponse, error) {
238+
ctx, _ = context.WithTimeout(ctx, duration)
239+
rpcClient, err := cm.getRpcClient(endpoints)
240+
if err != nil {
241+
return nil, err
242+
}
243+
ret, err := rpcClient.QueryAssignments(ctx, request)
244+
cm.handleGrpcError(rpcClient, err)
245+
return ret, err
246+
}
247+
235248
func (cm *defaultClientManager) SendMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.SendMessageRequest, duration time.Duration) (*v2.SendMessageResponse, error) {
236249
ctx, _ = context.WithTimeout(ctx, duration)
237250
rpcClient, err := cm.getRpcClient(endpoints)
@@ -319,3 +332,14 @@ func (cm *defaultClientManager) ChangeInvisibleDuration(ctx context.Context, end
319332
cm.handleGrpcError(rpcClient, err)
320333
return ret, err
321334
}
335+
336+
func (cm *defaultClientManager) ForwardMessageToDeadLetterQueue(ctx context.Context, endpoints *v2.Endpoints, request *v2.ForwardMessageToDeadLetterQueueRequest, duration time.Duration) (*v2.ForwardMessageToDeadLetterQueueResponse, error) {
337+
ctx, _ = context.WithTimeout(ctx, duration)
338+
rpcClient, err := cm.getRpcClient(endpoints)
339+
if err != nil {
340+
return nil, err
341+
}
342+
ret, err := rpcClient.ForwardMessageToDeadLetterQueue(ctx, request)
343+
cm.handleGrpcError(rpcClient, err)
344+
return ret, err
345+
}

golang/client_manager_mock.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,21 @@ func (mr *MockClientManagerMockRecorder) EndTransaction(ctx, endpoints, request,
9494
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EndTransaction", reflect.TypeOf((*MockClientManager)(nil).EndTransaction), ctx, endpoints, request, duration)
9595
}
9696

97+
// ForwardMessageToDeadLetterQueue mocks base method.
98+
func (m *MockClientManager) ForwardMessageToDeadLetterQueue(ctx context.Context, endpoints *v2.Endpoints, request *v2.ForwardMessageToDeadLetterQueueRequest, duration time.Duration) (*v2.ForwardMessageToDeadLetterQueueResponse, error) {
99+
m.ctrl.T.Helper()
100+
ret := m.ctrl.Call(m, "ForwardMessageToDeadLetterQueue", ctx, endpoints, request, duration)
101+
ret0, _ := ret[0].(*v2.ForwardMessageToDeadLetterQueueResponse)
102+
ret1, _ := ret[1].(error)
103+
return ret0, ret1
104+
}
105+
106+
// ForwardMessageToDeadLetterQueue indicates an expected call of ForwardMessageToDeadLetterQueue.
107+
func (mr *MockClientManagerMockRecorder) ForwardMessageToDeadLetterQueue(ctx, endpoints, request, duration interface{}) *gomock.Call {
108+
mr.mock.ctrl.T.Helper()
109+
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ForwardMessageToDeadLetterQueue", reflect.TypeOf((*MockClientManager)(nil).ForwardMessageToDeadLetterQueue), ctx, endpoints, request, duration)
110+
}
111+
97112
// HeartBeat mocks base method.
98113
func (m *MockClientManager) HeartBeat(ctx context.Context, endpoints *v2.Endpoints, request *v2.HeartbeatRequest, duration time.Duration) (*v2.HeartbeatResponse, error) {
99114
m.ctrl.T.Helper()
@@ -124,6 +139,21 @@ func (mr *MockClientManagerMockRecorder) NotifyClientTermination(ctx, endpoints,
124139
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NotifyClientTermination", reflect.TypeOf((*MockClientManager)(nil).NotifyClientTermination), ctx, endpoints, request, duration)
125140
}
126141

142+
// QueryAssignments mocks base method.
143+
func (m *MockClientManager) QueryAssignments(ctx context.Context, endpoints *v2.Endpoints, request *v2.QueryAssignmentRequest, duration time.Duration) (*v2.QueryAssignmentResponse, error) {
144+
m.ctrl.T.Helper()
145+
ret := m.ctrl.Call(m, "QueryAssignments", ctx, endpoints, request, duration)
146+
ret0, _ := ret[0].(*v2.QueryAssignmentResponse)
147+
ret1, _ := ret[1].(error)
148+
return ret0, ret1
149+
}
150+
151+
// QueryAssignments indicates an expected call of QueryAssignments.
152+
func (mr *MockClientManagerMockRecorder) QueryAssignments(ctx, endpoints, request, duration interface{}) *gomock.Call {
153+
mr.mock.ctrl.T.Helper()
154+
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueryAssignments", reflect.TypeOf((*MockClientManager)(nil).QueryAssignments), ctx, endpoints, request, duration)
155+
}
156+
127157
// QueryRoute mocks base method.
128158
func (m *MockClientManager) QueryRoute(ctx context.Context, endpoints *v2.Endpoints, request *v2.QueryRouteRequest, duration time.Duration) (*v2.QueryRouteResponse, error) {
129159
m.ctrl.T.Helper()

golang/client_manager_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ func TestCMClearIdleRpcClients(t *testing.T) {
338338
})
339339
defer stubs.Reset()
340340

341-
MOCK_RPC_CLIENT.EXPECT().idleDuration().Return(time.Hour * 24 * 365)
341+
MOCK_RPC_CLIENT.EXPECT().idleDuration().Return(time.Hour * 24 * 365).AnyTimes()
342342
cm := NewDefaultClientManager()
343343
cm.startUp()
344344
cm.RegisterClient(MOCK_CLIENT)

golang/client_test.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ func BuildCLient(t *testing.T) *defaultClient {
7474
if err != nil {
7575
t.Error(err)
7676
}
77+
cli.inited.Store(true)
7778
err = cli.startUp()
7879
if err != nil {
7980
t.Error(err)
@@ -140,6 +141,7 @@ func TestCLINewClient(t *testing.T) {
140141
if err != nil {
141142
t.Error(err)
142143
}
144+
cli.(*defaultClient).inited.Store(true)
143145
sugarBaseLogger.Info(cli)
144146
err = cli.(*defaultClient).startUp()
145147
if err != nil {
@@ -223,7 +225,6 @@ func TestRestoreDefaultClientSessionZeroErrors(t *testing.T) {
223225
if err != nil {
224226
t.Error(err)
225227
}
226-
default_cli_session.publish(context.TODO(), &v2.TelemetryCommand{})
227228
observedLogs := PrepareTestLogger(cli)
228229
default_cli_session.observer = &MOCK_MessagingService_TelemetryClient{
229230
recv_error_count: 0,
@@ -232,6 +233,8 @@ func TestRestoreDefaultClientSessionZeroErrors(t *testing.T) {
232233
default_cli_session.recoveryWaitTime = time.Second
233234
cli.settings = &simpleConsumerSettings{}
234235

236+
default_cli_session.publish(context.TODO(), &v2.TelemetryCommand{})
237+
235238
// when
236239
time.Sleep(3 * time.Second)
237240

@@ -249,7 +252,6 @@ func TestRestoreDefaultClientSessionOneError(t *testing.T) {
249252
if err != nil {
250253
t.Error(err)
251254
}
252-
default_cli_session.publish(context.TODO(), &v2.TelemetryCommand{})
253255
observedLogs := PrepareTestLogger(cli)
254256
default_cli_session.observer = &MOCK_MessagingService_TelemetryClient{
255257
recv_error_count: 1,
@@ -258,13 +260,14 @@ func TestRestoreDefaultClientSessionOneError(t *testing.T) {
258260
default_cli_session.recoveryWaitTime = time.Second
259261
cli.settings = &simpleConsumerSettings{}
260262

263+
default_cli_session.publish(context.TODO(), &v2.TelemetryCommand{})
261264
// when
262-
time.Sleep(3 * time.Second)
265+
time.Sleep(4 * time.Second)
263266

264267
// then
265268
sugarBaseLogger.Info(observedLogs.All())
266269
commandExecutionLog := observedLogs.All()[:3]
267-
assert.Equal(t, "Encountered error while receiving TelemetryCommand, trying to recover", commandExecutionLog[0].Message)
270+
assert.Equal(t, "Encountered error while receiving TelemetryCommand, trying to recover, err=EOF", commandExecutionLog[0].Message)
268271
assert.Equal(t, "Managed to recover, executing message", commandExecutionLog[1].Message)
269272
assert.Equal(t, "Executed command successfully", commandExecutionLog[2].Message)
270273
}
@@ -276,7 +279,6 @@ func TestRestoreDefaultClientSessionTwoErrors(t *testing.T) {
276279
if err != nil {
277280
t.Error(err)
278281
}
279-
default_cli_session.publish(context.TODO(), &v2.TelemetryCommand{})
280282
observedLogs := PrepareTestLogger(cli)
281283
default_cli_session.observer = &MOCK_MessagingService_TelemetryClient{
282284
recv_error_count: 2,
@@ -285,13 +287,14 @@ func TestRestoreDefaultClientSessionTwoErrors(t *testing.T) {
285287
default_cli_session.recoveryWaitTime = time.Second
286288
cli.settings = &simpleConsumerSettings{}
287289

290+
default_cli_session.publish(context.TODO(), &v2.TelemetryCommand{})
288291
// when
289-
time.Sleep(3 * time.Second)
292+
time.Sleep(4 * time.Second)
290293

291294
// then
292295
sugarBaseLogger.Info(observedLogs.All())
293296
commandExecutionLog := observedLogs.All()[:2]
294-
assert.Equal(t, "Encountered error while receiving TelemetryCommand, trying to recover", commandExecutionLog[0].Message)
297+
assert.Equal(t, "Encountered error while receiving TelemetryCommand, trying to recover, err=EOF", commandExecutionLog[0].Message)
295298
assert.Equal(t, "Failed to recover, err=EOF", commandExecutionLog[1].Message)
296299
}
297300

golang/consumer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,5 @@ import (
2626
type Consumer interface {
2727
GetGroupName() string
2828
wrapReceiveMessageRequest(batchSize int, messageQueue *v2.MessageQueue, filterExpression *FilterExpression, invisibleDuration time.Duration) *v2.ReceiveMessageRequest
29+
isClient
2930
}

0 commit comments

Comments
 (0)