Skip to content

Commit 55e7dd2

Browse files
authored
[ISSUE #928] Fix C++ simple consumer error code and close function (#931)
1 parent eecb3a6 commit 55e7dd2

File tree

8 files changed

+287
-163
lines changed

8 files changed

+287
-163
lines changed

cpp/examples/ExampleSimpleConsumer.cpp

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <iostream>
1919

2020
#include "gflags/gflags.h"
21+
#include "rocketmq/ErrorCode.h"
2122
#include "rocketmq/Logger.h"
2223
#include "rocketmq/SimpleConsumer.h"
2324

@@ -42,10 +43,11 @@ int main(int argc, char* argv[]) {
4243

4344
CredentialsProviderPtr credentials_provider;
4445
if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) {
45-
credentials_provider = std::make_shared<StaticCredentialsProvider>(FLAGS_access_key, FLAGS_access_secret);
46+
credentials_provider = std::make_shared<StaticCredentialsProvider>(
47+
FLAGS_access_key, FLAGS_access_secret);
4648
}
4749

48-
// In most case, you don't need to create too many consumers, singletion pattern is recommended.
50+
// In most case, you don't need to create too many consumers, singleton pattern is recommended.
4951
auto simple_consumer = SimpleConsumer::newBuilder()
5052
.withGroup(FLAGS_group)
5153
.withConfiguration(Configuration::newBuilder()
@@ -54,32 +56,36 @@ int main(int argc, char* argv[]) {
5456
.withSsl(FLAGS_tls)
5557
.build())
5658
.subscribe(FLAGS_topic, tag)
59+
.withAwaitDuration(std::chrono::seconds(10))
5760
.build();
58-
std::vector<MessageConstSharedPtr> messages;
59-
std::error_code ec;
60-
simple_consumer.receive(4, std::chrono::seconds(3), ec, messages);
6161

62-
if (ec) {
63-
std::cerr << "Failed to receive messages. Cause: " << ec.message() << std::endl;
64-
return EXIT_FAILURE;
65-
}
62+
for (int j = 0; j < 30; j++) {
63+
std::vector<MessageConstSharedPtr> messages;
64+
std::error_code ec;
65+
simple_consumer.receive(4, std::chrono::seconds(15), ec, messages);
66+
if (ec) {
67+
std::cerr << "Failed to receive messages. Cause: " << ec.message() << std::endl;
68+
}
6669

67-
std::cout << "Received " << messages.size() << " messages" << std::endl;
68-
std::size_t i = 0;
69-
for (const auto& message : messages) {
70-
std::cout << "Received a message[topic=" << message->topic() << ", message-id=" << message->id()
71-
<< ", receipt-handle='" << message->extension().receipt_handle << "']" << std::endl;
70+
std::cout << "Received " << messages.size() << " messages" << std::endl;
71+
std::size_t i = 0;
7272

73-
std::error_code ec;
74-
if (++i % 2 == 0) {
75-
simple_consumer.ack(*message, ec);
76-
if (ec) {
77-
std::cerr << "Failed to ack message. Cause: " << ec.message() << std::endl;
78-
}
79-
} else {
80-
simple_consumer.changeInvisibleDuration(*message, std::chrono::milliseconds(100), ec);
81-
if (ec) {
82-
std::cerr << "Failed to change invisible duration of message. Cause: " << ec.message() << std::endl;
73+
for (const auto& message : messages) {
74+
std::cout << "Received a message[topic=" << message->topic()
75+
<< ", message-id=" << message->id()
76+
<< ", receipt-handle='" << message->extension().receipt_handle
77+
<< "']" << std::endl;
78+
79+
if (++i % 2 == 0) {
80+
simple_consumer.ack(*message, ec);
81+
if (ec) {
82+
std::cerr << "Failed to ack message. Cause: " << ec.message() << std::endl;
83+
}
84+
} else {
85+
simple_consumer.changeInvisibleDuration(*message, std::chrono::seconds(3), ec);
86+
if (ec) {
87+
std::cerr << "Failed to change invisible duration of message. Cause: " << ec.message() << std::endl;
88+
}
8389
}
8490
}
8591
}

cpp/proto/apache/rocketmq/v2/definition.proto

Lines changed: 124 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -175,10 +175,6 @@ enum DigestType {
175175
// 1) Standard messages should be negatively acknowledged instantly, causing
176176
// immediate re-delivery; 2) FIFO messages require special RPC, to re-fetch
177177
// previously acquired messages batch;
178-
//
179-
// Message consumption model also affects how invalid digest are handled. When
180-
// messages are consumed in broadcasting way,
181-
// TODO: define semantics of invalid-digest-when-broadcasting.
182178
message Digest {
183179
DigestType type = 1;
184180
string checksum = 2;
@@ -189,6 +185,7 @@ enum ClientType {
189185
PRODUCER = 1;
190186
PUSH_CONSUMER = 2;
191187
SIMPLE_CONSUMER = 3;
188+
PULL_CONSUMER = 4;
192189
}
193190

194191
enum Encoding {
@@ -270,9 +267,20 @@ message SystemProperties {
270267
// orphan. Servers that manages orphan messages would pick up
271268
// a capable publisher to resolve
272269
optional google.protobuf.Duration orphaned_transaction_recovery_duration = 19;
270+
271+
// Information to identify whether this message is from dead letter queue.
272+
optional DeadLetterQueue dead_letter_queue = 20;
273+
}
274+
275+
message DeadLetterQueue {
276+
// Original topic for this DLQ message.
277+
string topic = 1;
278+
// Original message id for this DLQ message.
279+
string message_id = 2;
273280
}
274281

275282
message Message {
283+
276284
Resource topic = 1;
277285

278286
// User defined key-value pairs.
@@ -336,6 +344,10 @@ enum Code {
336344
MESSAGE_CORRUPTED = 40016;
337345
// Request is rejected due to missing of x-mq-client-id header.
338346
CLIENT_ID_REQUIRED = 40017;
347+
// Polling time is illegal.
348+
ILLEGAL_POLLING_TIME = 40018;
349+
// Offset is illegal.
350+
ILLEGAL_OFFSET = 40019;
339351

340352
// Generic code indicates that the client request lacks valid authentication
341353
// credentials for the requested resource.
@@ -355,6 +367,8 @@ enum Code {
355367
TOPIC_NOT_FOUND = 40402;
356368
// Consumer group resource does not exist.
357369
CONSUMER_GROUP_NOT_FOUND = 40403;
370+
// Offset not found from server.
371+
OFFSET_NOT_FOUND = 40404;
358372

359373
// Generic code representing client side timeout when connecting to, reading data from, or write data to server.
360374
REQUEST_TIMEOUT = 40800;
@@ -363,6 +377,8 @@ enum Code {
363377
PAYLOAD_TOO_LARGE = 41300;
364378
// Message body size exceeds the threshold.
365379
MESSAGE_BODY_TOO_LARGE = 41301;
380+
// Message body is empty.
381+
MESSAGE_BODY_EMPTY = 41302;
366382

367383
// Generic code for use cases where pre-conditions are not met.
368384
// For example, if a producer instance is used to publish messages without prior start() invocation,
@@ -432,6 +448,13 @@ enum Language {
432448
DOT_NET = 3;
433449
GOLANG = 4;
434450
RUST = 5;
451+
PYTHON = 6;
452+
PHP = 7;
453+
NODE_JS = 8;
454+
RUBY = 9;
455+
OBJECTIVE_C = 10;
456+
DART = 11;
457+
KOTLIN = 12;
435458
}
436459

437460
// User Agent
@@ -447,4 +470,101 @@ message UA {
447470

448471
// Hostname of the node
449472
string hostname = 4;
473+
}
474+
475+
message Settings {
476+
// Configurations for all clients.
477+
optional ClientType client_type = 1;
478+
479+
optional Endpoints access_point = 2;
480+
481+
// If publishing of messages encounters throttling or server internal errors,
482+
// publishers should implement automatic retries after progressive longer
483+
// back-offs for consecutive errors.
484+
//
485+
// When processing message fails, `backoff_policy` describes an interval
486+
// after which the message should be available to consume again.
487+
//
488+
// For FIFO messages, the interval should be relatively small because
489+
// messages of the same message group would not be readily available until
490+
// the prior one depletes its lifecycle.
491+
optional RetryPolicy backoff_policy = 3;
492+
493+
// Request timeout for RPCs excluding long-polling.
494+
optional google.protobuf.Duration request_timeout = 4;
495+
496+
oneof pub_sub {
497+
Publishing publishing = 5;
498+
499+
Subscription subscription = 6;
500+
}
501+
502+
// User agent details
503+
UA user_agent = 7;
504+
505+
Metric metric = 8;
506+
}
507+
508+
message Publishing {
509+
// Publishing settings below here is appointed by client, thus it is
510+
// unnecessary for server to push at present.
511+
//
512+
// List of topics to which messages will publish to.
513+
repeated Resource topics = 1;
514+
515+
// If the message body size exceeds `max_body_size`, broker servers would
516+
// reject the request. As a result, it is advisable that Producer performs
517+
// client-side check validation.
518+
int32 max_body_size = 2;
519+
520+
// When `validate_message_type` flag set `false`, no need to validate message's type
521+
// with messageQueue's `accept_message_types` before publishing.
522+
bool validate_message_type = 3;
523+
}
524+
525+
message Subscription {
526+
// Subscription settings below here is appointed by client, thus it is
527+
// unnecessary for server to push at present.
528+
//
529+
// Consumer group.
530+
optional Resource group = 1;
531+
532+
// Subscription for consumer.
533+
repeated SubscriptionEntry subscriptions = 2;
534+
535+
// Subscription settings below here are from server, it is essential for
536+
// server to push.
537+
//
538+
// When FIFO flag is `true`, messages of the same message group are processed
539+
// in first-in-first-out manner.
540+
//
541+
// Brokers will not deliver further messages of the same group until prior
542+
// ones are completely acknowledged.
543+
optional bool fifo = 3;
544+
545+
// Message receive batch size here is essential for push consumer.
546+
optional int32 receive_batch_size = 4;
547+
548+
// Long-polling timeout for `ReceiveMessageRequest`, which is essential for
549+
// push consumer.
550+
optional google.protobuf.Duration long_polling_timeout = 5;
551+
}
552+
553+
message Metric {
554+
// Indicates that if client should export local metrics to server.
555+
bool on = 1;
556+
557+
// The endpoint that client metrics should be exported to, which is required if the switch is on.
558+
optional Endpoints endpoints = 2;
559+
}
560+
561+
enum QueryOffsetPolicy {
562+
// Use this option if client wishes to playback all existing messages.
563+
BEGINNING = 0;
564+
565+
// Use this option if client wishes to skip all existing messages.
566+
END = 1;
567+
568+
// Use this option if time-based seek is targeted.
569+
TIMESTAMP = 2;
450570
}

0 commit comments

Comments
 (0)