Skip to content

Commit 3a37769

Browse files
authored
Fix clickhouse export (#3009)
* Hide all environment files * remove prints * fix asyncio issues * improve doc for the hack
1 parent 9dcee8b commit 3a37769

File tree

17 files changed

+130
-82
lines changed

17 files changed

+130
-82
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ data/
3232
.DS_Store
3333
*.pem
3434
*.env
35+
*.env.*
3536
.env*.local
3637
.env*.cloud
3738
*.log

warehouse/metrics_tools/transfer/clickhouse.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,12 @@ async def import_table(
4848
loading_table_fqn,
4949
export_reference.columns.columns_as("clickhouse"),
5050
)
51-
import_path = f"https://storage.googleapis.com/{gcs_bucket}/{gcs_blob_path}/*"
51+
# We need the `2*` to match the files that are created by the export
52+
# this is a bit of a hack due to some weird behavior in clickhouse that
53+
# changed sometime around 2025-02-01. The `2*` is used because trino
54+
# prefixes files with the date. So this will work for the next 975 years
55+
# or so. Hopefully that's enough time.
56+
import_path = f"https://storage.googleapis.com/{gcs_bucket}/{gcs_blob_path}/2*"
5257
self.logger.debug(f"Importing table {loading_table_fqn} from {gcs_path}")
5358
import_data(
5459
self.ch,

warehouse/oso_dagster/assets/clickhouse_dbt_marts.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import json
2+
import logging
23
import os
34
import uuid
45
from pathlib import Path
@@ -16,6 +17,8 @@
1617
from ..utils.bq import BigQueryTableConfig
1718
from ..utils.common import SourceMode
1819

20+
logger = logging.getLogger(__name__)
21+
1922
MART_DIRECTORY = "marts"
2023
INTERMEDIATE_DIRECTORY = "intermediate"
2124
SYNC_KEY = "sync_to_db"
@@ -74,7 +77,7 @@ def all_clickhouse_dbt_mart_assets(
7477
table_name = n.get("name")
7578
# Only copy marts that are marked for sync
7679
if n.get("meta").get(SYNC_KEY, False):
77-
print(f"Queuing {table_name}")
80+
logger.debug(f"Queuing {table_name}")
7881
copied_mart_names.append(table_name)
7982
# Create an asset for each mart to copy
8083
result = result + create_bq2clickhouse_asset(
@@ -131,8 +134,7 @@ def all_clickhouse_dbt_mart_assets(
131134
# Track which marts were skipped
132135
else:
133136
skipped_mart_names.append(table_name)
134-
print(
137+
logger.debug(
135138
f"...queued {str(len(copied_mart_names))} marts, skipping {str(len(skipped_mart_names))}"
136139
)
137-
# print(skipped_mart_names)
138140
return result

warehouse/oso_dagster/assets/dbt.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import logging
12
import os
23
from pathlib import Path
34
from typing import Any, Dict, List, Mapping, Optional, Sequence
@@ -8,6 +9,8 @@
89

910
from ..factories import AssetFactoryResponse, early_resources_asset_factory
1011

12+
logger = logging.getLogger(__name__)
13+
1114

1215
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
1316
def __init__(
@@ -81,8 +84,8 @@ def generate_dbt_asset(
8184
AssetsDefinition
8285
a single Dagster dbt asset
8386
"""
84-
print(f"Target[{target}] using profiles_dir({dbt_profiles_dir})")
85-
print(f"\tmanifest_path({manifest_path})")
87+
logger.debug(f"Target[{target}] using profiles_dir({dbt_profiles_dir})")
88+
logger.debug(f"\tmanifest_path({manifest_path})")
8689
translator = CustomDagsterDbtTranslator(target, ["dbt", target], internal_map)
8790

8891
asset_name = f"{target}_dbt"
@@ -94,7 +97,7 @@ def generate_dbt_asset(
9497
op_tags=op_tags,
9598
)
9699
def _generated_dbt_assets(context: AssetExecutionContext, config: DBTConfig):
97-
print(f"using profiles dir {dbt_profiles_dir}")
100+
logger.debug(f"using profiles dir {dbt_profiles_dir}")
98101
dbt = DbtCliResource(
99102
project_dir=os.fspath(project_dir),
100103
target=target,

warehouse/oso_dagster/cbt/bq.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
# Query tools for bigquery tables
2-
from typing import List, cast, Optional, Dict, NotRequired, TypedDict
2+
import logging
33
from functools import cache
4+
from typing import Dict, List, NotRequired, Optional, TypedDict, cast
45

56
from google.cloud.bigquery import Client, Table, TableReference
67
from google.cloud.bigquery.table import RowIterator
78
from sqlglot import expressions as exp
8-
from .context import Connector, ColumnList
9+
10+
from .context import ColumnList, Connector
911

1012
type ExtraVarType = str | int
1113

14+
logger = logging.getLogger(__name__)
15+
1216

1317
class MetricQueryInput(TypedDict):
1418
ref: str
@@ -22,7 +26,7 @@ def __init__(self, bq: Client):
2226
self.bq = bq
2327

2428
def __call__(self, table_ref: TableReference | Table | str):
25-
print(table_ref)
29+
logger.debug(table_ref)
2630
return BigQueryTableQueryHelper.load_by_table(self.bq, table_ref)
2731

2832

@@ -136,8 +140,6 @@ def update_columns_with(
136140

137141
# If the other table is included then we can ensure only matching fields are contained
138142
if source_table is not None:
139-
print("WHAT IS THIS")
140-
print(source_table._table_ref)
141143
source_column_names = set(
142144
map(lambda c: c.column_name, source_table.columns)
143145
)
@@ -147,10 +149,6 @@ def update_columns_with(
147149
additional_columns = (
148150
set(source_column_names) - set(ordered_columns) - set(exclude or [])
149151
)
150-
print("source")
151-
print(list(source_column_names))
152-
print("ordered")
153-
print(list(ordered_columns))
154152
if len(additional_columns) > 0:
155153
raise Exception(
156154
f"more columns in the source table than the destination `{additional_columns}`"

warehouse/oso_dagster/cbt/cbt.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,20 @@
22
#
33
# A poor excuse for a dbt replacement when calling sql as a library
44
import os
5-
import arrow
6-
from typing import List, Optional, Sequence
75
from dataclasses import dataclass
86
from enum import Enum
97
from functools import cache
8+
from typing import List, Optional, Sequence
109

10+
import arrow
1111
from dagster import ConfigurableResource, DagsterLogManager
1212
from dagster_gcp import BigQueryResource
13-
from google.cloud.bigquery import (
14-
TableReference,
15-
Client,
16-
)
13+
from google.cloud.bigquery import Client, TableReference
1714
from google.cloud.exceptions import NotFound
1815
from jinja2 import Environment, FileSystemLoader, meta
1916

20-
from .bq import BigQueryTableQueryHelper, BigQueryConnector
21-
from .context import DataContext, ContextQuery, Transformation
17+
from .bq import BigQueryConnector, BigQueryTableQueryHelper
18+
from .context import ContextQuery, DataContext, Transformation
2219

2320

2421
class UpdateStrategy(Enum):
@@ -311,7 +308,6 @@ def _transform_replace(
311308
select_query = context.transform_query(select_query, transformations).sql(
312309
dialect="bigquery"
313310
)
314-
print(select_query)
315311

316312
if time_partitioning:
317313
self.log.debug("creating table with a time partition")

warehouse/oso_dagster/cbt/context.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1-
from typing import Callable, TypeVar, List, Sequence, Tuple, Optional, cast
1+
import logging
22
from collections import OrderedDict
3+
from typing import Callable, List, Optional, Sequence, Tuple, TypeVar, cast
34

45
import sqlglot as sql
56
from sqlglot import expressions as exp
67

8+
logger = logging.getLogger(__name__)
9+
710

811
T = TypeVar("T")
912
ColumnList = List[Tuple[str, str]]
@@ -79,7 +82,7 @@ def execute_query(
7982
dialect: str = "bigquery",
8083
) -> T:
8184
exp = self.transform_query(query, transformations, dialect=dialect)
82-
print(exp.sql())
85+
logger.debug(f"Executing query: {exp.sql()}")
8386
return self._connector.execute_expression(exp)
8487

8588
# def examine_query(self, query: ContextQuery):

warehouse/oso_dagster/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ class DagsterConfig(BaseSettings):
7676
# `enable_k8s_executor` is used to enable k8s executor for dagster
7777
enable_k8s: bool = False
7878

79+
k8s_use_port_forward: bool = False
80+
7981
trino_remote_url: str = "http://localhost:8080"
8082
trino_k8s_namespace: str = ""
8183
trino_k8s_service_name: str = ""

warehouse/oso_dagster/definitions.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
import logging
22
import os
3+
import warnings
34

4-
from dagster import Definitions
5+
from dagster import Definitions, ExperimentalWarning
56
from dagster_dbt import DbtCliResource
67
from dagster_embedded_elt.dlt import DagsterDltResource
78
from dagster_gcp import BigQueryResource, GCSResource
89
from dagster_k8s import k8s_job_executor
910
from dagster_sqlmesh import SQLMeshContextConfig, SQLMeshResource
10-
from dotenv import load_dotenv
11+
from dotenv import find_dotenv, load_dotenv
1112
from metrics_tools.utils.logging import setup_module_logging
1213
from oso_dagster.resources.bq import BigQueryImporterResource
1314
from oso_dagster.resources.clickhouse import ClickhouseImporterResource
@@ -31,10 +32,10 @@
3132
from .resources import (
3233
BigQueryDataTransferResource,
3334
ClickhouseResource,
35+
K8sApiResource,
3436
K8sResource,
3537
MCSK8sResource,
3638
MCSRemoteResource,
37-
PodLocalK8sResource,
3839
PrefixedSQLMeshTranslator,
3940
Trino2BigQuerySQLMeshExporter,
4041
Trino2ClickhouseSQLMeshExporter,
@@ -54,10 +55,15 @@
5455

5556
logger = logging.getLogger(__name__)
5657

58+
if os.environ.get("ENV") == "local":
59+
load_dotenv(find_dotenv(".env.local"))
60+
elif os.environ.get("ENV") == "production":
61+
load_dotenv(find_dotenv(".env.production"))
5762
load_dotenv()
5863

5964

6065
def load_definitions():
66+
warnings.filterwarnings("ignore", category=ExperimentalWarning)
6167
setup_module_logging("oso_dagster")
6268
# Load the configuration for the project
6369
global_config = DagsterConfig() # type: ignore
@@ -127,19 +133,21 @@ def load_definitions():
127133

128134
else:
129135
logger.info("Loading k8s resources")
130-
k8s = PodLocalK8sResource()
136+
k8s = K8sApiResource()
131137
trino = TrinoK8sResource(
132138
k8s=k8s,
133139
namespace=global_config.trino_k8s_namespace,
134140
service_name=global_config.trino_k8s_service_name,
135141
coordinator_deployment_name=global_config.trino_k8s_coordinator_deployment_name,
136142
worker_deployment_name=global_config.trino_k8s_worker_deployment_name,
143+
use_port_forward=global_config.k8s_use_port_forward,
137144
)
138145
mcs = MCSK8sResource(
139146
k8s=k8s,
140147
namespace=global_config.mcs_k8s_namespace,
141148
service_name=global_config.mcs_k8s_service_name,
142149
deployment_name=global_config.mcs_k8s_deployment_name,
150+
use_port_forward=global_config.k8s_use_port_forward,
143151
)
144152
sqlmesh_exporter = [
145153
Trino2ClickhouseSQLMeshExporter(

warehouse/oso_dagster/factories/dlt.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ def pydantic_to_dlt_nullable_columns(b: t.Type[BaseModel]):
4242
table_schema_columns = pydantic_to_table_schema_columns(b)
4343
for column in table_schema_columns.values():
4444
column["nullable"] = True
45-
print(table_schema_columns)
4645
return table_schema_columns
4746

4847

@@ -123,9 +122,7 @@ def _factory(
123122
# We need to ensure that both dlt and global_config are
124123
# available to the generated asset as they're used by the
125124
# generated function.
126-
final_extra_resources = extra_resources.union(
127-
{"dlt", "global_config"}
128-
)
125+
final_extra_resources = extra_resources.union({"dlt", "global_config"})
129126

130127
@asset(
131128
name=asset_name,
@@ -159,7 +156,9 @@ def _dlt_asset(
159156
global_config = t.cast(
160157
DagsterConfig, getattr(context.resources, "global_config")
161158
)
162-
assert global_config, "global_config resource is not loading correctly"
159+
assert (
160+
global_config
161+
), "global_config resource is not loading correctly"
163162
if global_config.enable_bigquery:
164163
context.log.debug("dlt pipeline setup to use staging")
165164
pipeline = dltlib.pipeline(

0 commit comments

Comments
 (0)