|
13 | 13 | # See the License for the specific language governing permissions and |
14 | 14 | # limitations under the License. |
15 | 15 | # |
16 | | -import os |
17 | 16 |
|
18 | 17 | from fastapi.encoders import jsonable_encoder |
19 | 18 | from fastapi.responses import JSONResponse |
|
57 | 56 | Feed, |
58 | 57 | Gtfsrealtimefeed, |
59 | 58 | ) |
60 | | -from shared.helpers.pub_sub import get_execution_id, publish_messages |
| 59 | +from shared.helpers.pub_sub import get_execution_id, trigger_dataset_download |
61 | 60 | from shared.helpers.query_helper import ( |
62 | 61 | query_feed_by_stable_id, |
63 | 62 | get_feeds_query, |
|
74 | 73 | from .models.operation_gtfs_rt_feed_impl import OperationGtfsRtFeedImpl |
75 | 74 | from .request_validator import validate_request |
76 | 75 |
|
77 | | -pubsub_topic_name = os.getenv("DATASET_PROCESSING_TOPIC_NAME") |
78 | | -project_id = os.getenv("PROJECT_ID") |
79 | | - |
80 | 76 |
|
81 | 77 | class OperationsApiImpl(BaseOperationsApi): |
82 | 78 | """Implementation of the operations API.""" |
@@ -112,24 +108,24 @@ def assert_no_existing_feed_url(producer_url: str, db_session: Session): |
112 | 108 | detail=message, |
113 | 109 | ) |
114 | 110 |
|
115 | | - @staticmethod |
116 | | - def send_feed_process_event(feed: type[Gtfsfeed] | None, request=None): |
117 | | - """Send a message to Pub/Sub to process the feed.""" |
118 | | - message_payload = { |
119 | | - "execution_id": get_execution_id( |
120 | | - get_request_context(), "feed-created-process" |
121 | | - ), |
122 | | - "producer_url": feed.producer_url, |
123 | | - "feed_stable_id": feed.stable_id, |
124 | | - "feed_id": feed.id, |
125 | | - "dataset_stable_id": None, |
126 | | - "dataset_hash": None, |
127 | | - "authentication_type": feed.authentication_type, |
128 | | - "authentication_info_url": feed.authentication_info_url, |
129 | | - "api_key_parameter_name": feed.api_key_parameter_name, |
130 | | - } |
131 | | - publish_messages([message_payload], project_id, pubsub_topic_name) |
132 | | - logging.debug("Sent feed process event") |
| 111 | + # @staticmethod |
| 112 | + # def send_feed_process_event(feed: type[Gtfsfeed] | None, request=None): |
| 113 | + # """Send a message to Pub/Sub to process the feed.""" |
| 114 | + # message_payload = { |
| 115 | + # "execution_id": get_execution_id( |
| 116 | + # get_request_context(), "feed-created-process" |
| 117 | + # ), |
| 118 | + # "producer_url": feed.producer_url, |
| 119 | + # "feed_stable_id": feed.stable_id, |
| 120 | + # "feed_id": feed.id, |
| 121 | + # "dataset_stable_id": None, |
| 122 | + # "dataset_hash": None, |
| 123 | + # "authentication_type": feed.authentication_type, |
| 124 | + # "authentication_info_url": feed.authentication_info_url, |
| 125 | + # "api_key_parameter_name": feed.api_key_parameter_name, |
| 126 | + # } |
| 127 | + # publish_messages([message_payload], project_id, pubsub_topic_name) |
| 128 | + # logging.debug("Sent feed process event") |
133 | 129 |
|
134 | 130 | @with_db_session |
135 | 131 | async def get_feeds( |
@@ -391,7 +387,10 @@ async def create_gtfs_feed( |
391 | 387 | db_session.add(new_feed) |
392 | 388 | db_session.commit() |
393 | 389 | created_feed = db_session.get(Gtfsfeed, new_feed.id) |
394 | | - self.send_feed_process_event(created_feed) |
| 390 | + trigger_dataset_download( |
| 391 | + created_feed, |
| 392 | + get_execution_id(get_request_context(), "feed-created-process"), |
| 393 | + ) |
395 | 394 | logging.info("Created new GTFS feed with ID: %s", new_feed.stable_id) |
396 | 395 | payload = OperationGtfsFeedImpl.from_orm(created_feed).model_dump() |
397 | 396 | return JSONResponse(status_code=201, content=jsonable_encoder(payload)) |
|
0 commit comments