Skip to content

Commit 6eba103

Browse files
authored
adapt to localService (#973)
* update to 5.0.3 1、use grpc.use_local_subchannel_pool 2、adapt to localService 3、fifo、delay message example
1 parent b727270 commit 6eba103

13 files changed

+290
-172
lines changed

python/example/async_producer_example.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ def handle_send_result(result_future):
3131
# if auth enable
3232
# credentials = Credentials("ak", "sk")
3333
config = ClientConfiguration(endpoints, credentials)
34+
# with namespace
35+
# config = ClientConfiguration(endpoints, credentials, "namespace")
3436
topic = "topic"
3537
producer = Producer(config, (topic,))
3638

@@ -39,10 +41,14 @@ def handle_send_result(result_future):
3941
try:
4042
for i in range(10):
4143
msg = Message()
44+
# topic for the current message
4245
msg.topic = topic
4346
msg.body = "hello, rocketmq.".encode('utf-8')
47+
# secondary classifier of message besides topic
4448
msg.tag = "rocketmq-send-message"
49+
# key(s) of the message, another way to mark message besides message id
4550
msg.keys = "send_async"
51+
# user property for the message
4652
msg.add_property("send", "async")
4753
send_result_future = producer.send_async(msg)
4854
send_result_future.add_done_callback(handle_send_result)

python/example/async_simple_consumer_example.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ def receive_callback(receive_result_future, consumer):
3535
print(f"{consumer.__str__()} receive {len(messages)} messages.")
3636
for msg in messages:
3737
try:
38+
# consume message in other thread, don't block the async receive thread
3839
consume_executor.submit(consume_message, consumer=consumer, message=msg)
3940
except Exception as exception:
4041
print(f"receive message raise exception: {exception}")
@@ -46,8 +47,11 @@ def receive_callback(receive_result_future, consumer):
4647
# if auth enable
4748
# credentials = Credentials("ak", "sk")
4849
config = ClientConfiguration(endpoints, credentials)
50+
# with namespace
51+
# config = ClientConfiguration(endpoints, credentials, "namespace")
4952
topic = "topic"
50-
53+
# in most case, you don't need to create too many consumers, singleton pattern is recommended
54+
# close the simple consumer when you don't need it anymore
5155
simple_consumer = SimpleConsumer(config, "consumer-group")
5256
try:
5357
simple_consumer.startup()
@@ -58,6 +62,7 @@ def receive_callback(receive_result_future, consumer):
5862
while True:
5963
try:
6064
time.sleep(1)
65+
# max message num for each long polling and message invisible duration after it is received
6166
future = simple_consumer.receive_async(32, 15)
6267
future.add_done_callback(functools.partial(receive_callback, consumer=simple_consumer))
6368
except Exception as e:
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
import time
16+
17+
from rocketmq import ClientConfiguration, Credentials, Message, Producer
18+
19+
if __name__ == '__main__':
20+
endpoints = "foobar.com:8080"
21+
credentials = Credentials()
22+
# if auth enable
23+
# credentials = Credentials("ak", "sk")
24+
config = ClientConfiguration(endpoints, credentials)
25+
# with namespace
26+
# config = ClientConfiguration(endpoints, credentials, "namespace")
27+
topic = "delay-topic"
28+
producer = Producer(config, (topic,))
29+
30+
try:
31+
producer.startup()
32+
try:
33+
msg = Message()
34+
# topic for the current message
35+
msg.topic = topic
36+
msg.body = "hello, rocketmq.".encode('utf-8')
37+
# secondary classifier of message besides topic
38+
msg.tag = "rocketmq-send-delay-message"
39+
# delay 10 seconds
40+
msg.delivery_timestamp = int(time.time()) + 10
41+
res = producer.send(msg)
42+
print(f"{producer.__str__()} send message success. {res}")
43+
producer.shutdown()
44+
print(f"{producer.__str__()} shutdown.")
45+
except Exception as e:
46+
print(f"normal producer example raise exception: {e}")
47+
producer.shutdown()
48+
except Exception as e:
49+
print(f"{producer.__str__()} startup raise exception: {e}")
50+
producer.shutdown()
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
from rocketmq import ClientConfiguration, Credentials, Message, Producer
17+
18+
if __name__ == '__main__':
19+
endpoints = "foobar.com:8080"
20+
credentials = Credentials()
21+
# if auth enable
22+
# credentials = Credentials("ak", "sk")
23+
config = ClientConfiguration(endpoints, credentials)
24+
# with namespace
25+
# config = ClientConfiguration(endpoints, credentials, "namespace")
26+
topic = "fifo-topic"
27+
producer = Producer(config, (topic,))
28+
29+
try:
30+
producer.startup()
31+
try:
32+
msg = Message()
33+
# topic for the current message
34+
msg.topic = topic
35+
msg.body = "hello, rocketmq.".encode('utf-8')
36+
# secondary classifier of message besides topic
37+
msg.tag = "rocketmq-send-fifo-message"
38+
# message group decides the message delivery order
39+
msg.message_group = "your-message-group0"
40+
res = producer.send(msg)
41+
print(f"{producer.__str__()} send message success. {res}")
42+
producer.shutdown()
43+
print(f"{producer.__str__()} shutdown.")
44+
except Exception as e:
45+
print(f"normal producer example raise exception: {e}")
46+
producer.shutdown()
47+
except Exception as e:
48+
print(f"{producer.__str__()} startup raise exception: {e}")
49+
producer.shutdown()

python/example/normal_producer_example.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,23 @@
2121
# if auth enable
2222
# credentials = Credentials("ak", "sk")
2323
config = ClientConfiguration(endpoints, credentials)
24+
# with namespace
25+
# config = ClientConfiguration(endpoints, credentials, "namespace")
2426
topic = "topic"
2527
producer = Producer(config, (topic,))
2628

2729
try:
2830
producer.startup()
2931
try:
3032
msg = Message()
33+
# topic for the current message
3134
msg.topic = topic
3235
msg.body = "hello, rocketmq.".encode('utf-8')
36+
# secondary classifier of message besides topic
3337
msg.tag = "rocketmq-send-message"
38+
# key(s) of the message, another way to mark message besides message id
3439
msg.keys = "send_sync"
40+
# user property for the message
3541
msg.add_property("send", "sync")
3642
res = producer.send(msg)
3743
print(f"{producer.__str__()} send message success. {res}")

python/example/simple_consumer_example.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@
2121
# if auth enable
2222
# credentials = Credentials("ak", "sk")
2323
config = ClientConfiguration(endpoints, credentials)
24+
# with namespace
25+
# config = ClientConfiguration(endpoints, credentials, "namespace")
2426
topic = "topic"
27+
# in most case, you don't need to create too many consumers, singleton pattern is recommended
28+
# close the simple consumer when you don't need it anymore
2529
simple_consumer = SimpleConsumer(config, "consumer-group")
2630
try:
2731
simple_consumer.startup()
@@ -31,6 +35,7 @@
3135
# simple_consumer.subscribe(topic, FilterExpression("tag"))
3236
while True:
3337
try:
38+
# max message num for each long polling and message invisible duration after it is received
3439
messages = simple_consumer.receive(32, 15)
3540
if messages is not None:
3641
print(f"{simple_consumer.__str__()} receive {len(messages)} messages.")

python/example/transaction_producer_example.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ def check(self, message: Message) -> TransactionResolution:
3030
# if auth enable
3131
# credentials = Credentials("ak", "sk")
3232
config = ClientConfiguration(endpoints, credentials)
33+
# with namespace
34+
# config = ClientConfiguration(endpoints, credentials, "namespace")
3335
topic = "topic"
3436
check_from_server = True # commit message from server check
3537
producer = Producer(config, (topic,), checker=TestChecker())
@@ -42,18 +44,23 @@ def check(self, message: Message) -> TransactionResolution:
4244
try:
4345
transaction = producer.begin_transaction()
4446
msg = Message()
47+
# topic for the current message
4548
msg.topic = topic
4649
msg.body = "hello, rocketmq.".encode('utf-8')
50+
# secondary classifier of message besides topic
4751
msg.tag = "rocketmq-send-transaction-message"
52+
# key(s) of the message, another way to mark message besides message id
4853
msg.keys = "send_transaction"
54+
# user property for the message
4955
msg.add_property("send", "transaction")
5056
res = producer.send(msg, transaction)
5157
print(f"send message: {res}")
5258
if check_from_server:
59+
# wait for server check in TransactionChecker's check
5360
input("Please Enter to Stop the Application.\r\n")
5461
producer.shutdown()
5562
else:
56-
# producer directly commit or rollback
63+
# direct commit or rollback
5764
transaction.commit()
5865
print(f"producer commit message:{transaction.message_id}")
5966
# transaction.rollback()

python/rocketmq/v5/client/balancer/queue_selector.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16+
import hashlib
1617
import random
1718

1819
from rocketmq.v5.exception import IllegalArgumentException
@@ -64,6 +65,12 @@ def select_next_queue(self):
6465
self.__index.get_and_increment() % len(self.__message_queues)
6566
]
6667

68+
def select_queue_by_hash_key(self, key):
69+
hash_object = hashlib.sha256(key.encode('utf-8'))
70+
hash_code = int.from_bytes(hash_object.digest(), byteorder='big')
71+
print(f"hashcode: {hash_code}")
72+
return self.__message_queues[hash_code % len(self.__message_queues)]
73+
6774
def all_queues(self):
6875
index = self.__index.get_and_increment() % len(self.__message_queues)
6976
return self.__message_queues[index:] + self.__message_queues[:index]

python/rocketmq/v5/client/connection/rpc_channel.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ def __facade(self):
8888
or len(self.__addresses) == 0
8989
or self.__scheme == AddressScheme.ADDRESS_SCHEME_UNSPECIFIED
9090
):
91-
return ""
91+
return "", ""
9292

9393
prefix = "dns:"
9494
if self.__scheme == AddressScheme.IPv4:
@@ -227,6 +227,7 @@ def __create_aio_channel(self):
227227
("grpc.enable_retries", 0),
228228
("grpc.max_send_message_length", -1),
229229
("grpc.max_receive_message_length", -1),
230+
("grpc.use_local_subchannel_pool", 1),
230231
]
231232
if self.__tls_enabled:
232233
self.__async_channel = aio.secure_channel(
@@ -237,7 +238,7 @@ def __create_aio_channel(self):
237238
self.__endpoints.facade, options
238239
)
239240
self.__async_stub = MessagingServiceStub(self.__async_channel)
240-
logger.debug(
241+
logger.info(
241242
f"create_aio_channel to [{self.__endpoints.__str__()}] success. channel state:{self.__async_channel.get_state()}"
242243
)
243244
except Exception as e:

0 commit comments

Comments
 (0)