33import threading
44import uuid
55from typing import List
6+ from concurrent import futures
67
78from google .auth import default
89from google .cloud import pubsub_v1
@@ -48,11 +49,12 @@ def publish_callback(future: Future, stable_id: str, topic_path: str):
4849 logger .info (f"Published stable_id = { stable_id } ." )
4950
5051
51- def publish (feed : Feed , topic_path : str ):
52+ def publish (feed : Feed , topic_path : str ) -> Future :
5253 """
5354 Publishes a feed to the Pub/Sub topic.
5455 :param feed: The feed to publish
5556 :param topic_path: The path to the Pub/Sub topic
57+ :return: The Future object representing the result of the publishing operation
5658 """
5759 payload = {
5860 "execution_id" : f"batch-uuid-{ uuid .uuid4 ()} " ,
@@ -68,8 +70,7 @@ def publish(feed: Feed, topic_path: str):
6870 data_bytes = json .dumps (payload ).encode ("utf-8" )
6971 future = get_pubsub_client ().publish (topic_path , data = data_bytes )
7072 future .add_done_callback (lambda _ : publish_callback (future , feed .stable_id , topic_path ))
71- # Block until the message is published
72- future .result () # This will wait until the publishing is confirmed
73+ return future
7374
7475
7576def publish_all (feeds : List [Feed ]):
@@ -82,7 +83,10 @@ def publish_all(feeds: List[Feed]):
8283 credentials , project = default ()
8384 logger .info (f"Authenticated project: { project } " )
8485 logger .info (f"Service Account Email: { credentials .service_account_email } " )
86+ publish_futures = []
8587 for feed in feeds :
8688 logger .info (f"Publishing feed { feed .stable_id } to Pub/Sub topic { topic_path } ..." )
87- publish (feed , topic_path )
89+ future = publish (feed , topic_path )
90+ publish_futures .append (future )
91+ futures .wait (publish_futures , return_when = futures .ALL_COMPLETED )
8892 logger .info (f"Published { len (feeds )} feeds to Pub/Sub topic { topic_path } ." )
0 commit comments