|
1 | | -from typing import Any, List, Mapping |
| 1 | +import logging |
| 2 | +from typing import Any, Iterable, List, Mapping, Tuple |
| 3 | + |
| 4 | +from airbyte_protocol_dataclasses.models import ( |
| 5 | + AirbyteCatalog, |
| 6 | + AirbyteConnectionStatus, |
| 7 | + AirbyteMessage, |
| 8 | + Status, |
| 9 | + TraceType, |
| 10 | +) |
| 11 | +from airbyte_protocol_dataclasses.models import Type as AirbyteMessageType |
2 | 12 |
|
3 | 13 | from airbyte_cdk.connector_builder.models import StreamRead |
4 | 14 | from airbyte_cdk.connector_builder.test_reader import TestReader |
| 15 | +from airbyte_cdk.entrypoint import AirbyteEntrypoint |
5 | 16 | from airbyte_cdk.models.airbyte_protocol import ( |
6 | 17 | AirbyteStateMessage, |
7 | 18 | ConfiguredAirbyteCatalog, |
|
13 | 24 |
|
14 | 25 | class ManifestRunner: |
15 | 26 | _source: ManifestDeclarativeSource |
| 27 | + _logger = logging.getLogger("airbyte.manifest-runner") |
16 | 28 |
|
17 | 29 | def __init__(self, source: ManifestDeclarativeSource) -> None: |
18 | 30 | self._source = source |
@@ -44,3 +56,85 @@ def test_read( |
44 | 56 | ) |
45 | 57 |
|
46 | 58 | return stream_read |
| 59 | + |
| 60 | + def check_connection( |
| 61 | + self, |
| 62 | + config: Mapping[str, Any], |
| 63 | + ) -> Tuple[bool, str]: |
| 64 | + """ |
| 65 | + Check the connection to the source. |
| 66 | + """ |
| 67 | + |
| 68 | + spec = self._source.spec(self._logger) |
| 69 | + messages = AirbyteEntrypoint(source=self._source).check(spec, config) |
| 70 | + messages_by_type = self._get_messages_by_type(messages) |
| 71 | + self._raise_on_trace_message(messages_by_type) |
| 72 | + connection_status = self._get_connection_status(messages_by_type) |
| 73 | + |
| 74 | + if connection_status: |
| 75 | + return connection_status.status == Status.SUCCEEDED, connection_status.message |
| 76 | + return False, "Connection check failed" |
| 77 | + |
| 78 | + def discover( |
| 79 | + self, |
| 80 | + config: Mapping[str, Any], |
| 81 | + ) -> AirbyteCatalog | None: |
| 82 | + """ |
| 83 | + Discover the catalog from the source. |
| 84 | + """ |
| 85 | + spec = self._source.spec(self._logger) |
| 86 | + messages = AirbyteEntrypoint(source=self._source).discover(spec, config) |
| 87 | + messages_by_type = self._get_messages_by_type(messages) |
| 88 | + self._raise_on_trace_message(messages_by_type) |
| 89 | + return self._get_catalog(messages_by_type) |
| 90 | + |
| 91 | + def _get_messages_by_type( |
| 92 | + self, |
| 93 | + messages: Iterable[AirbyteMessage], |
| 94 | + ) -> Mapping[str, Iterable[AirbyteMessage]]: |
| 95 | + """ |
| 96 | + Group messages by type. |
| 97 | + """ |
| 98 | + grouped = {} |
| 99 | + for message in messages: |
| 100 | + msg_type = message.type |
| 101 | + if msg_type not in grouped: |
| 102 | + grouped[msg_type] = [] |
| 103 | + grouped[msg_type].append(message) |
| 104 | + return grouped |
| 105 | + |
| 106 | + def _get_connection_status( |
| 107 | + self, |
| 108 | + messages_by_type: Mapping[str, List[AirbyteMessage]], |
| 109 | + ) -> AirbyteConnectionStatus | None: |
| 110 | + """ |
| 111 | + Get the connection status from the messages. |
| 112 | + """ |
| 113 | + messages = messages_by_type.get(AirbyteMessageType.CONNECTION_STATUS) |
| 114 | + return messages[-1].connectionStatus if messages else None |
| 115 | + |
| 116 | + def _get_catalog( |
| 117 | + self, |
| 118 | + messages_by_type: Mapping[str, List[AirbyteMessage]], |
| 119 | + ) -> AirbyteCatalog: |
| 120 | + """ |
| 121 | + Get the catalog from the messages. |
| 122 | + """ |
| 123 | + messages = messages_by_type.get(AirbyteMessageType.CATALOG) |
| 124 | + return messages[-1].catalog if messages else None |
| 125 | + |
| 126 | + def _raise_on_trace_message( |
| 127 | + self, |
| 128 | + messages_by_type: Mapping[str, List[AirbyteMessage]], |
| 129 | + ) -> None: |
| 130 | + """ |
| 131 | + Raise an exception if a trace message is found. |
| 132 | + """ |
| 133 | + messages = [ |
| 134 | + message |
| 135 | + for message in messages_by_type.get(AirbyteMessageType.TRACE, []) |
| 136 | + if message.trace.type == TraceType.ERROR |
| 137 | + ] |
| 138 | + if messages: |
| 139 | + # TODO: raise a better exception |
| 140 | + raise Exception(messages[-1].trace.error.message) |
0 commit comments