Skip to content

Commit 4873a32

Browse files
hsheth2treff7es
andauthored
fix(ingest): emitter bug fixes (#8093)
Co-authored-by: Tamas Nemeth <[email protected]>
1 parent d98ebd3 commit 4873a32

File tree

8 files changed

+35
-16
lines changed

8 files changed

+35
-16
lines changed
Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,27 @@
11
import os
2+
from typing import Dict, Type
3+
4+
import click
5+
from pydantic import BaseModel
6+
27
from datahub.api.entities.dataproduct.dataproduct import DataProduct
38
from datahub.ingestion.source.metadata.business_glossary import BusinessGlossaryConfig
4-
import click
9+
510

611
@click.command()
712
@click.option("--out-dir", type=str, required=True)
8-
def generate_specs(
9-
out_dir: str
10-
) -> None:
13+
def generate_specs(out_dir: str) -> None:
1114
print(out_dir)
1215
schemas_dir = f"{out_dir}/schemas"
1316
os.makedirs(schemas_dir, exist_ok=True)
14-
concept_class_map = {
17+
concept_class_map: Dict[str, Type[BaseModel]] = {
1518
"dataproduct": DataProduct,
16-
"businessglossary": BusinessGlossaryConfig
19+
"businessglossary": BusinessGlossaryConfig,
1720
}
1821
for concept, concept_class in concept_class_map.items():
1922
with open(f"{schemas_dir}/{concept}_schema.json", "w") as f:
2023
f.write(concept_class.schema_json(indent=2))
2124

2225

2326
if __name__ == "__main__":
24-
generate_specs()
27+
generate_specs()

metadata-ingestion/src/datahub/emitter/rest_emitter.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,10 @@ def __repr__(self) -> str:
290290
f"DataHubRestEmitter: configured to talk to {self._gms_server}{token_str}"
291291
)
292292

293+
def flush(self) -> None:
294+
# No-op, but present to keep the interface consistent with the Kafka emitter.
295+
pass
296+
293297
def close(self) -> None:
294298
self._session.close()
295299

metadata-ingestion/src/datahub/ingestion/graph/client.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@
1313
from requests.models import HTTPError
1414
from typing_extensions import Literal
1515

16-
from datahub.cli.cli_utils import get_boolean_env_variable, get_url_and_token
16+
from datahub.cli.cli_utils import get_url_and_token
1717
from datahub.configuration.common import ConfigModel, GraphError, OperationalError
18+
from datahub.configuration.validate_field_removal import pydantic_removed_field
1819
from datahub.emitter.aspect import TIMESERIES_ASPECT_MAP
1920
from datahub.emitter.mce_builder import Aspect, make_data_platform_urn
2021
from datahub.emitter.mcp import MetadataChangeProposalWrapper
@@ -49,9 +50,6 @@
4950
logger = logging.getLogger(__name__)
5051

5152

52-
telemetry_enabled = get_boolean_env_variable("DATAHUB_TELEMETRY_ENABLED", True)
53-
54-
5553
class DatahubClientConfig(ConfigModel):
5654
"""Configuration class for holding connectivity to datahub gms"""
5755

@@ -62,9 +60,12 @@ class DatahubClientConfig(ConfigModel):
6260
retry_max_times: Optional[int]
6361
extra_headers: Optional[Dict[str, str]]
6462
ca_certificate_path: Optional[str]
65-
max_threads: int = 15
6663
disable_ssl_verification: bool = False
6764

65+
_max_threads_moved_to_sink = pydantic_removed_field(
66+
"max_threads", print_warning=False
67+
)
68+
6869

6970
# Alias for backwards compatibility.
7071
# DEPRECATION: Remove in v0.10.2.
@@ -107,7 +108,11 @@ def __init__(self, config: DatahubClientConfig) -> None:
107108
disable_ssl_verification=self.config.disable_ssl_verification,
108109
)
109110
self.test_connection()
110-
if not telemetry_enabled:
111+
112+
# Cache the server id for telemetry.
113+
from datahub.telemetry.telemetry import telemetry_instance
114+
115+
if not telemetry_instance.enabled:
111116
self.server_id = "missing"
112117
return
113118
try:

metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,12 @@ class SyncOrAsync(ConfigEnum):
3636

3737

3838
class DatahubRestSinkConfig(DatahubClientConfig):
39-
max_pending_requests: int = 1000
4039
mode: SyncOrAsync = SyncOrAsync.ASYNC
4140

41+
# These only apply in async mode.
42+
max_threads: int = 15
43+
max_pending_requests: int = 1000
44+
4245

4346
@dataclass
4447
class DataHubRestSinkReport(SinkReport):

metadata-ingestion/src/datahub_provider/_lineage_core.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,3 +110,5 @@ def send_lineage_to_datahub(
110110
end_timestamp_millis=int(datetime.utcnow().timestamp() * 1000),
111111
)
112112
operator.log.info(f"Emitted from Lineage: {dpi}")
113+
114+
emitter.flush()

metadata-ingestion/src/datahub_provider/_plugin.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,8 @@ def datahub_task_status_callback(context, status):
190190
)
191191
task.log.info(f"Emitted Completed Data Process Instance: {dpi}")
192192

193+
emitter.flush()
194+
193195

194196
def datahub_pre_execution(context):
195197
ti = context["ti"]
@@ -240,6 +242,8 @@ def datahub_pre_execution(context):
240242

241243
task.log.info(f"Emitting Datahub Dataprocess Instance: {dpi}")
242244

245+
emitter.flush()
246+
243247

244248
def _wrap_pre_execution(pre_execution):
245249
def custom_pre_execution(context):

metadata-ingestion/tests/unit/graph/test_client.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
from datahub.metadata.schema_classes import CorpUserEditableInfoClass
99

1010

11-
@patch("datahub.ingestion.graph.client.telemetry_enabled", False)
1211
@patch("datahub.emitter.rest_emitter.DataHubRestEmitter.test_connection")
1312
def test_get_aspect(mock_test_connection):
1413
mock_test_connection.return_value = {}

metadata-ingestion/tests/unit/stateful_ingestion/test_configs.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
token="dummy_test_tok",
5050
timeout_sec=10,
5151
extra_headers={},
52-
max_threads=15,
5352
),
5453
),
5554
False,

0 commit comments

Comments
 (0)