Skip to content
Closed

5.0.3 #930

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion python/rocketmq/v5/client/balancer/queue_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ def simple_consumer_queue_selector(cls, topic_route: TopicRouteData):

def select_next_queue(self):
if self.__selector_type == QueueSelector.NONE_TYPE_SELECTOR:
raise IllegalArgumentException("error type for queue selector, type is NONE_TYPE_SELECTOR.")
raise IllegalArgumentException("select next queue raise exception, because selector type is NONE_TYPE_SELECTOR.")
if len(self.__message_queues) == 0:
raise IllegalArgumentException("select next queue raise exception, because queue's length is 0.")
return self.__message_queues[self.__index.get_and_increment() % len(self.__message_queues)]

def all_queues(self):
Expand Down
63 changes: 32 additions & 31 deletions python/rocketmq/v5/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def __schedule_update_topic_route_cache(self):
while True:
if self.__client_thread_task_enabled is True:
self.__topic_route_scheduler_threading_event.wait(30)
logger.debug(f"{self.__str__()} run scheduler for update topic route cache.")
logger.debug(f"{self.__str__()} run update topic route in scheduler.")
# update topic route for each topic in cache
topics = self.__topic_route_cache.keys()
for topic in topics:
Expand All @@ -185,7 +185,7 @@ def __schedule_update_topic_route_cache(self):
self.__update_topic_route_async(topic)
except Exception as e:
logger.error(
f"{self.__str__()} run scheduler for update topic:{topic} route cache exception: {e}")
f"{self.__str__()} scheduler update topic:{topic} route raise exception: {e}")
else:
break
logger.info(f"{self.__str__()} stop scheduler for update topic route cache success.")
Expand All @@ -195,14 +195,14 @@ def __schedule_heartbeat(self):
while True:
if self.__client_thread_task_enabled is True:
self.__heartbeat_scheduler_threading_event.wait(10)
logger.debug(f"{self.__str__()} run scheduler for heartbeat.")
logger.debug(f"{self.__str__()} run send heartbeat in scheduler.")
all_endpoints = self.__get_all_endpoints().values()
try:
for endpoints in all_endpoints:
if self.__client_thread_task_enabled is True:
self.__heartbeat_async(endpoints)
except Exception as e:
logger.error(f"{self.__str__()} run scheduler for heartbeat exception: {e}")
logger.error(f"{self.__str__()} scheduler send heartbeat raise exception: {e}")
else:
break
logger.info(f"{self.__str__()} stop scheduler for heartbeat success.")
Expand All @@ -212,16 +212,14 @@ def __schedule_update_setting(self):
while True:
if self.__client_thread_task_enabled is True:
self.__sync_setting_scheduler_threading_event.wait(5)
logger.debug(f"{self.__str__()} run scheduler for update setting.")
logger.debug(f"{self.__str__()} run update setting in scheduler.")
try:
all_endpoints = self.__get_all_endpoints().values()
for endpoints in all_endpoints:
if self.__client_thread_task_enabled is True:
# if stream_stream_call for grpc channel is none, create a new one, otherwise use the existing one
self.__retrieve_telemetry_stream_stream_call(endpoints)
self.__setting_write(endpoints)
except Exception as e:
logger.error(f"{self.__str__()} run scheduler for update setting exception: {e}")
logger.error(f"{self.__str__()} scheduler set setting raise exception: {e}")
else:
break
logger.info(f"{self.__str__()} stop scheduler for update setting success.")
Expand Down Expand Up @@ -282,10 +280,12 @@ def _retrieve_topic_route_data(self, topic):
return route
else:
route = self.__update_topic_route(topic)
logger.info(f"{self.__str__()} update topic:{topic} route success.")
if route is not None:
logger.info(f"{self.__str__()} update topic:{topic} route success.")
self.__topics.add(topic)
return route
return route
else:
raise Exception(f"failed to fetch topic:{topic} route.")

def _remove_unused_topic_route_data(self, topic):
self.__topic_route_cache.remove(topic)
Expand All @@ -306,16 +306,15 @@ def _rpc_channel_io_loop(self):
# topic route #

def __update_topic_route(self, topic):
try:
future = self.__rpc_client.query_topic_route_async(self.__client_configuration.rpc_endpoints,
self.__topic_route_req(topic), metadata=self._sign(),
timeout=self.__client_configuration.request_timeout)
res = future.result()
route = self.__handle_topic_route_res(res, topic)
return route
except Exception as e:
logger.error(f"update topic route error, topic:{topic}, {e}")
raise e
event = threading.Event()
callback = functools.partial(self.__query_topic_route_async_callback, topic=topic, event=event)
future = self.__rpc_client.query_topic_route_async(self.__client_configuration.rpc_endpoints,
self.__topic_route_req(topic),
metadata=self._sign(),
timeout=self.__client_configuration.request_timeout)
future.add_done_callback(callback)
event.wait()
return self.__topic_route_cache.get(topic)

def __update_topic_route_async(self, topic):
callback = functools.partial(self.__query_topic_route_async_callback, topic=topic)
Expand All @@ -325,12 +324,15 @@ def __update_topic_route_async(self, topic):
timeout=self.__client_configuration.request_timeout)
future.add_done_callback(callback)

def __query_topic_route_async_callback(self, future, topic):
def __query_topic_route_async_callback(self, future, topic, event=None):
try:
res = future.result()
self.__handle_topic_route_res(res, topic)
except Exception as e:
raise e
finally:
if event is not None:
event.set()

def __topic_route_req(self, topic):
req = QueryRouteRequest()
Expand All @@ -344,7 +346,7 @@ def __handle_topic_route_res(self, res, topic):
MessagingResultChecker.check(res.status)
if res.status.code == Code.OK:
topic_route = TopicRouteData(res.message_queues)
logger.debug(f"{self.__str__()} update topic:{topic} route, route info: {topic_route.__str__()}")
logger.info(f"{self.__str__()} update topic:{topic} route, route info: {topic_route.__str__()}")
# if topic route has new endpoint, connect
self.__check_topic_route_endpoints_changed(topic, topic_route)
self.__topic_route_cache.put(topic, topic_route)
Expand Down Expand Up @@ -380,25 +382,24 @@ def __heartbeat_callback(self, future, endpoints):

# sync settings #

def __retrieve_telemetry_stream_stream_call(self, endpoints, rebuild=False):
try:
self.__rpc_client.telemetry_stream(endpoints, self, self._sign(), rebuild, timeout=60 * 60 * 24 * 365)
except Exception as e:
logger.error(
f"{self.__str__()} rebuild stream_steam_call to {endpoints.__str__()} exception: {e}" if rebuild else f"{self.__str__()} create stream_steam_call to {endpoints.__str__()} exception: {e}")

def __setting_write(self, endpoints):
req = self._sync_setting_req(endpoints)
callback = functools.partial(self.__setting_write_callback, endpoints=endpoints)
future = self.__rpc_client.telemetry_write_async(endpoints, req)
logger.debug(f"{self.__str__()} send setting to {endpoints.__str__()}, {req}")
future.add_done_callback(callback)

def __retrieve_telemetry_stream_stream_call(self, endpoints, rebuild=False):
try:
self.__rpc_client.telemetry_stream(endpoints, self, metadata=self._sign(), timeout=60 * 60 * 24 * 365,
rebuild=rebuild)
except Exception as e:
logger.error(
f"{self.__str__()} rebuild stream_steam_call to {endpoints.__str__()} exception: {e}" if rebuild else f"{self.__str__()} create stream_steam_call to {endpoints.__str__()} exception: {e}")

def __setting_write_callback(self, future, endpoints):
try:
future.result()
logger.debug(f"{self.__str__()} send setting to {endpoints.__str__()} success.")
logger.info(f"{self.__str__()} send setting to {endpoints.__str__()} success.")
except InvalidStateError as e:
logger.warn(f"{self.__str__()} send setting to {endpoints.__str__()} occurred InvalidStateError: {e}")
self.__retrieve_telemetry_stream_stream_call(endpoints, rebuild=True)
Expand Down
9 changes: 5 additions & 4 deletions python/rocketmq/v5/client/connection/rpc_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ async def start_stream_read(self):
if res.HasField("settings"):
# read a response for send setting result
if res is not None and res.status.code == Code.OK:
logger.debug(f"async setting success. response status code: {res.status.code}")
logger.debug(f"{ self.__handler.__str__()} sync setting success. response status code: {res.status.code}")
if res.settings is not None and res.settings.metric is not None:
# reset metrics if needed
self.__handler.reset_metric(res.settings.metric)
Expand All @@ -136,9 +136,9 @@ async def start_stream_read(self):
transaction_id)
except AioRpcError as e:
logger.warn(
f"stream read from endpoints {self.__endpoints.__str__()} occurred AioRpcError. code: {e.code()}, message: {e.details()}")
f"{ self.__handler.__str__()} read stream from endpoints {self.__endpoints.__str__()} occurred AioRpcError. code: {e.code()}, message: {e.details()}")
except Exception as e:
logger.error(f"stream read from endpoints {self.__endpoints.__str__()} exception, {e}")
logger.error(f"{ self.__handler.__str__()} read stream from endpoints {self.__endpoints.__str__()} exception, {e}")

async def stream_write(self, req):
if self.__stream_stream_call is not None:
Expand All @@ -164,6 +164,7 @@ def __init__(self, endpoints: RpcEndpoints, tls_enabled=False):

def create_channel(self, loop):
# create grpc channel with the given loop
# assert loop == RpcClient._io_loop
asyncio.set_event_loop(loop)
self.__create_aio_channel()

Expand Down Expand Up @@ -211,7 +212,7 @@ def __create_aio_channel(self):
except Exception as e:
logger.error(f"create_aio_channel to [{self.__endpoints.__str__()}] exception: {e}")
raise e

#
""" property """

@property
Expand Down
57 changes: 32 additions & 25 deletions python/rocketmq/v5/client/connection/rpc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,17 @@ def __init__(self, tls_enable=False):
def retrieve_or_create_channel(self, endpoints: RpcEndpoints):
if self.__enable_retrieve_channel is False:
raise Exception("RpcClient is not running.")

try:
# get or create a new grpc channel
with RpcClient._channel_lock:
channel = self.__get_channel(endpoints)
if channel is not None:
channel.update_time = int(time.time())
else:
channel = self.__get_channel(endpoints)
if channel is not None:
channel.update_time = int(time.time())
else:
with RpcClient._channel_lock:
channel = RpcChannel(endpoints, self.__tls_enable)
channel.create_channel(RpcClient.get_channel_io_loop())
self.__put_channel(endpoints, channel)
return channel
return channel
except Exception as e:
logger.error(f"retrieve or create channel exception: {e}")
raise e
Expand All @@ -80,11 +79,12 @@ def clear_idle_rpc_channels(self):
for endpoints, channel in items:
if now - channel.update_time > RpcClient.RPC_CLIENT_MAX_IDLE_SECONDS:
idle_endpoints.append(endpoints)
with RpcClient._channel_lock:
for endpoints in idle_endpoints:
logger.info(f"remove idle channel {endpoints.__str__()}")
self.__close_rpc_channel(endpoints)
self.channels.remove(endpoints)
if len(idle_endpoints) > 0:
with RpcClient._channel_lock:
for endpoints in idle_endpoints:
logger.info(f"remove idle channel {endpoints.__str__()}")
self.__close_rpc_channel(endpoints)
self.channels.remove(endpoints)

def stop(self):
with RpcClient._channel_lock:
Expand Down Expand Up @@ -137,28 +137,32 @@ def notify_client_termination(self, endpoints: RpcEndpoints, req: NotifyClientTe
return RpcClient.__run_message_service_async(
self.__notify_client_termination_0(endpoints, req, metadata=metadata, timeout=timeout))

def telemetry_stream(self, endpoints: RpcEndpoints, client, metadata, timeout=3000, rebuild=False):
try:
channel = self.retrieve_or_create_channel(endpoints)
if channel.telemetry_stream_stream_call is None or rebuild is True:
stream = channel.async_stub.Telemetry(metadata=metadata, timeout=timeout)
channel.register_telemetry_stream_stream_call(stream, client)
asyncio.run_coroutine_threadsafe(channel.telemetry_stream_stream_call.start_stream_read(),
RpcClient.get_channel_io_loop())
logger.info(
f"{client.__str__()} rebuild stream_steam_call to {endpoints.__str__()} success." if rebuild else f"{client.__str__()} create stream_steam_call to {endpoints.__str__()} success.")
except Exception as e:
raise e

def end_transaction_for_server_check(self, endpoints: RpcEndpoints, req: EndTransactionRequest, metadata,
timeout=3):
# assert asyncio.get_running_loop() == RpcClient._io_loop
try:
return self.__end_transaction_0(endpoints, req, metadata=metadata, timeout=timeout)
except Exception as e:
logger.error(
f"end transaction exception, topic:{req.topic.name}, message_id:{req.message_id}, transaction_id:{req.transaction_id}: {e}")
raise e

""" build stream_stream_call """

def telemetry_stream(self, endpoints: RpcEndpoints, client, metadata, rebuild, timeout=3000):
# assert asyncio.get_running_loop() == RpcClient._io_loop
try:
channel = self.retrieve_or_create_channel(endpoints)
stream = channel.async_stub.Telemetry(metadata=metadata, timeout=timeout, wait_for_ready=True)
channel.register_telemetry_stream_stream_call(stream, client)
asyncio.run_coroutine_threadsafe(channel.telemetry_stream_stream_call.start_stream_read(),
RpcClient.get_channel_io_loop())
logger.info(
f"{client.__str__()} rebuild stream_steam_call to {endpoints.__str__()}." if rebuild else f"{client.__str__()} create stream_steam_call to {endpoints.__str__()}.")
return channel
except Exception as e:
raise e

""" MessageService.stub impl """

async def __query_route_async_0(self, endpoints: RpcEndpoints, req: QueryRouteRequest, metadata, timeout=3):
Expand Down Expand Up @@ -197,6 +201,9 @@ async def __notify_client_termination_0(self, endpoints: RpcEndpoints, req: Noti
metadata=metadata,
timeout=timeout)

async def __create_channel_async(self, endpoints: RpcEndpoints):
return self.retrieve_or_create_channel(endpoints)

""" private """

def __get_channel(self, endpoints: RpcEndpoints) -> RpcChannel:
Expand Down
Loading
Loading