Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
f5e8f9c
adds optional pipeline activation history to context
rudolfix Jun 7, 2025
5bc2d48
allows to configure configs and pragmas for duckdb, improves sql_clie…
rudolfix Jun 7, 2025
7373ea2
allows query string for motherduck, tests WIP
rudolfix Jun 7, 2025
e1e5e33
mocks local_dir correctly to place local files, drop duckdb in pipeli…
rudolfix Jun 7, 2025
c1113f7
enables activation factory to drop datasets from all pipelines
rudolfix Jun 7, 2025
52779e4
uses correct fixture scope in test read interfaces
rudolfix Jun 7, 2025
53bd8bf
bumps duckdb and pyarrow
rudolfix Jun 7, 2025
19b1139
ignores some flake8 errors
rudolfix Jun 7, 2025
abab6b7
logs resolved traces thread-wise, clears log between pipeline runs
rudolfix Jun 8, 2025
04f34a7
improves duckdb tests and docs
rudolfix Jun 8, 2025
3e97e98
bumps arrow to v20 because duckdb 1.3 needs at least 19 for its types
rudolfix Jun 8, 2025
5c4fc88
fixes tests - mostly duckdb database locations
rudolfix Jun 8, 2025
1b8efb6
fixes lockfile
rudolfix Jun 8, 2025
6069829
fixes edge cases when passing setting to duckdb connection
rudolfix Jun 8, 2025
adf2288
disables iceberg abfss tests
rudolfix Jun 8, 2025
eadaa5d
refactors WithLocalFiles so they can be used independent from destina…
rudolfix Jun 8, 2025
69701ad
more local dir test fixes
rudolfix Jun 9, 2025
505081d
moves WithLocalFiles to common storages configuration
rudolfix Jun 9, 2025
82a0031
tests edge cases when setting configs on duckdb fails
rudolfix Jun 9, 2025
6363fa2
updates docs
rudolfix Jun 9, 2025
f7f45c3
reverts duckdb to 1.2.1 - last stable version
rudolfix Jun 9, 2025
d7632ff
more test fixes
rudolfix Jun 9, 2025
bfe6f88
moves create_secret to duckdb sqlclient
rudolfix Jun 10, 2025
5d98177
disables building of Dockerfile until we upgrade arrow
rudolfix Jun 10, 2025
2f2848f
skip gcs compat test for local clickhouse tests
sh-rp Jun 11, 2025
62dfaa7
Merge branch 'devel' into chores/fixes-leaking-datasets-tests
rudolfix Jun 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ test-build-images: build-library
# filter out libs that need native compilation
grep `cat compiled_packages.txt` _gen_requirements.txt > compiled_requirements.txt
docker build -f deploy/dlt/Dockerfile.airflow --build-arg=COMMIT_SHA="$(shell git log -1 --pretty=%h)" --build-arg=IMAGE_VERSION="$(shell poetry version -s)" .
docker build -f deploy/dlt/Dockerfile --build-arg=COMMIT_SHA="$(shell git log -1 --pretty=%h)" --build-arg=IMAGE_VERSION="$(shell poetry version -s)" .
# enable when we upgrade arrow to 20.x
# docker build -f deploy/dlt/Dockerfile --build-arg=COMMIT_SHA="$(shell git log -1 --pretty=%h)" --build-arg=IMAGE_VERSION="$(shell poetry version -s)" .

preprocess-docs:
# run docs preprocessing to run a few checks and ensure examples can be parsed
Expand Down
9 changes: 5 additions & 4 deletions deploy/dlt/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@ RUN apk update &&\
pip install --upgrade setuptools wheel pip &&\
rm -r /usr/lib/python*/ensurepip

# install arrow 17.0.0, usually we would need apache-arrow-dev=17.0.0 but it is not available in alpine 3.20
# install arrow 18.1.0
# adapt this version to the arrow version you need
# NOTE: it is impossible to build 18.1 because some sources are missing
RUN git clone --no-checkout https://github.com/apache/arrow.git /arrow \
&& cd /arrow \
&& git checkout tags/apache-arrow-17.0.0 \
&& git checkout tags/apache-arrow-18.1.0 \
&& cd cpp \
&& mkdir build \
&& cd build \
&& cmake -DARROW_CSV=ON -DARROW_JSON=ON -DARROW_FILESYSTEM=ON .. \
&& cmake -DARROW_PARQUET=ON -DARROW_CSV=ON -DARROW_JSON=ON -DARROW_FILESYSTEM=ON .. \
&& make -j$(nproc) \
&& make install

Expand All @@ -56,4 +57,4 @@ WORKDIR /
RUN rm -r /tmp/pydlt
# make sure dlt can be actually imported
# TODO: pendulum breaks alpine
# RUN python -c 'import dlt'
RUN python -c 'import dlt;import duckdb;import pyarrow;'
3 changes: 2 additions & 1 deletion dlt/common/configuration/providers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .provider import ConfigProvider
from .provider import ConfigProvider, EXPLICIT_VALUES_PROVIDER_NAME
from .environ import EnvironProvider
from .dictionary import DictionaryProvider
from .toml import (
Expand Down Expand Up @@ -29,4 +29,5 @@
"CustomLoaderDocProvider",
"VaultDocProvider",
"GoogleSecretsProvider",
"EXPLICIT_VALUES_PROVIDER_NAME",
]
4 changes: 3 additions & 1 deletion dlt/common/configuration/providers/provider.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import abc
from typing import Any, Sequence, Tuple, Type, Optional

from dlt.common.configuration.exceptions import ConfigurationException
from dlt.common.utils import simple_repr, without_none

EXPLICIT_VALUES_PROVIDER_NAME = "ExplicitValues"
"""A name of virtual provider that represent explicit values found during config resolution"""


class ConfigProvider(abc.ABC):
@abc.abstractmethod
Expand Down
19 changes: 15 additions & 4 deletions dlt/common/configuration/resolve.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
from typing import Any, Dict, ContextManager, List, Optional, Sequence, Tuple, Type, TypeVar

from dlt.common import logger
from dlt.common.configuration.providers.provider import ConfigProvider
from dlt.common.configuration.providers.provider import (
ConfigProvider,
EXPLICIT_VALUES_PROVIDER_NAME,
)
from dlt.common.configuration.const import TYPE_EXAMPLES
from dlt.common.typing import (
AnyType,
Expand Down Expand Up @@ -58,6 +61,7 @@ def resolve_configuration(
# allows, for example, to store connection string or service.json in their native form in single env variable or under single vault key
# this happens only when explicit value for the configuration was not provided
# TODO: we can move it into _resolve_configuration and also remove similar code in _resolve_config_field
# TODO: also allow when explicit_value is dict so we can parse initial value and merge with it
if config.__section__ and explicit_value is None:
initial_hint = TSecretValue if isinstance(config, CredentialsConfiguration) else AnyType
initial_value, traces = _resolve_single_value(
Expand Down Expand Up @@ -241,7 +245,9 @@ def _set_field() -> None:
# do not resolve not resolvable, but allow for explicit values to be passed
if not explicit_none:
current_value = default_value if explicit_value is None else explicit_value
traces = [LookupTrace("ExplicitValues", None, key, current_value)]
traces = [
LookupTrace(EXPLICIT_VALUES_PROVIDER_NAME, embedded_sections, key, current_value)
]
_set_field()
continue

Expand Down Expand Up @@ -306,7 +312,7 @@ def _set_field() -> None:
)
else:
# set the trace for explicit none
traces = [LookupTrace("ExplicitValues", None, key, None)]
traces = [LookupTrace(EXPLICIT_VALUES_PROVIDER_NAME, embedded_sections, key, None)]

_set_field()

Expand Down Expand Up @@ -337,7 +343,11 @@ def _resolve_config_field(
inner_hint = extract_inner_hint(hint, preserve_literal=True)
if explicit_value is not None:
value = explicit_value
traces: List[LookupTrace] = []
# TODO: consider logging explicit values, currently initial values taken from configuration
# are passed as explicit values so that needs to be fixed first
traces: List[LookupTrace] = [
LookupTrace(EXPLICIT_VALUES_PROVIDER_NAME, embedded_sections, key, value)
]
else:
# resolve key value via active providers passing the original hint ie. to preserve TSecretValue
# NOTE: if inner_hint is an embedded config, it won't be resolved and value is None
Expand Down Expand Up @@ -367,6 +377,7 @@ def _resolve_config_field(
embedded_config = inner_hint()
# only config with sections may look for initial values
# TODO: all this code can be moved into _resolve_configuration
# TODO: also allow when explicit_value is dict so we can parse initial value and merge with it
if embedded_config.__section__ and explicit_value is None:
# config section becomes the key if the key does not start with, otherwise it keeps its original value
initial_key, initial_embedded = _apply_embedded_sections_to_config_sections(
Expand Down
74 changes: 55 additions & 19 deletions dlt/common/configuration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import (
Any,
Dict,
List,
Mapping,
NamedTuple,
Optional,
Expand All @@ -13,18 +14,21 @@
Sequence,
Literal,
)
from collections.abc import Mapping as C_Mapping

import yaml

from dlt.common.configuration.container import Container
from dlt.common.configuration.providers.provider import EXPLICIT_VALUES_PROVIDER_NAME
from dlt.common.json import json
from dlt.common.typing import AnyType, DictStrAny, TAny, is_any_type, get_args, get_origin
from dlt.common.data_types import coerce_value, py_type_to_sc_type
from dlt.common.configuration.providers import EnvironProvider
from dlt.common.configuration.exceptions import ConfigValueCannotBeCoercedException, LookupTrace
from dlt.common.configuration.specs.base_configuration import (
BaseConfiguration,
ContainerInjectableContext,
is_base_configuration_inner_hint,
configspec,
)


Expand All @@ -37,8 +41,41 @@ class ResolvedValueTrace(NamedTuple):
provider_name: str
config: BaseConfiguration

def is_resolved(self) -> bool:
# explicit values if present are always resolved
return self.value is not None or self.provider_name == EXPLICIT_VALUES_PROVIDER_NAME

_RESOLVED_TRACES: Dict[str, ResolvedValueTrace] = {} # stores all the resolved traces

@configspec
class TraceLogContext(ContainerInjectableContext):
"""Stores log of all config resolver traces, per thread so parallel pipelines may log to it"""

resolved_traces: List[ResolvedValueTrace] = None
"""Traces with resolved values"""
all_traces: List[ResolvedValueTrace] = None
"""All logged traces"""

logging_enabled: bool = True
"""Collect logs by default"""

def clear(self) -> None:
self.resolved_traces = []
self.all_traces = []

def log(self, trace: ResolvedValueTrace) -> None:
if not self.logging_enabled:
return
if trace.is_resolved():
self.resolved_traces.append(trace)
self.all_traces.append(trace)

def __init__(self) -> None:
self.clear()

@staticmethod
def _get_log_as_dict(traces: List[ResolvedValueTrace]) -> Dict[str, ResolvedValueTrace]:
"""Converts logs into layout path - value dict"""
return {f'{".".join(t.sections)}.{t.key}': t for t in traces}


def deserialize_value(key: str, value: Any, hint: Type[TAny]) -> TAny:
Expand Down Expand Up @@ -170,26 +207,25 @@ def log_traces(
# if logger.is_logging() and logger.log_level() == "DEBUG" and config:
# logger.debug(f"Field {key} with type {hint} in {type(config).__name__} {'NOT RESOLVED' if value is None else 'RESOLVED'}")
# print(f"Field {key} with type {hint} in {type(config).__name__} {'NOT RESOLVED' if value is None else 'RESOLVED'}")
# for tr in traces:
# # print(str(tr))
# logger.debug(str(tr))
# store all traces with resolved values
resolved_trace = next((trace for trace in traces if trace.value is not None), None)
if resolved_trace is not None:
path = f'{".".join(resolved_trace.sections)}.{key}'
_RESOLVED_TRACES[path] = ResolvedValueTrace(
key,
resolved_trace.value,
default_value,
hint,
resolved_trace.sections,
resolved_trace.provider,
config,
trace_logger = get_resolved_traces()
for trace in traces:
trace_logger.log(
ResolvedValueTrace(
key,
trace.value,
default_value,
hint,
trace.sections,
trace.provider,
config,
)
)


def get_resolved_traces() -> Dict[str, ResolvedValueTrace]:
return _RESOLVED_TRACES
def get_resolved_traces() -> TraceLogContext:
"""Gets trace logging context, per thread, stopped by default"""
# may create default value
return Container()[TraceLogContext]


def add_config_to_env(config: BaseConfiguration, sections: Tuple[str, ...] = ()) -> None:
Expand Down
36 changes: 31 additions & 5 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,31 +587,48 @@ def __call__(

@configspec
class PipelineContext(ContainerInjectableContext):
_DEFERRED_PIPELINE: ClassVar[Callable[[], SupportsPipeline]] = None
_PIPELINE_FACTORY: ClassVar[Callable[[], SupportsPipeline]] = None
"""Maker of default pipeline, set at runtime"""
_pipeline: SupportsPipeline = dataclasses.field(
default=None, init=False, repr=False, compare=False
)
"""Active pipeline"""
_pipelines: Optional[List[SupportsPipeline]] = dataclasses.field(
default=None, init=False, repr=False, compare=False
)
"""History of previously activated pipelines, disabled by default"""
enable_activation_history: bool = False
"""When True, references to activated pipelines will be also stored"""

can_create_default: ClassVar[bool] = True

def pipeline(self) -> SupportsPipeline:
"""Creates or returns exiting pipeline"""
"""Creates or returns an active pipeline"""
if not self._pipeline:
# delayed pipeline creation
assert PipelineContext._DEFERRED_PIPELINE is not None, (
assert PipelineContext._PIPELINE_FACTORY is not None, (
"Deferred pipeline creation function not provided to PipelineContext. Are you"
" calling dlt.pipeline() from another thread?"
)
self.activate(PipelineContext._DEFERRED_PIPELINE())
self.activate(PipelineContext._PIPELINE_FACTORY())
return self._pipeline

def activate(self, pipeline: SupportsPipeline) -> None:
"""Activates `pipeline` and deactivates active one."""
# do not activate currently active pipeline
if pipeline == self._pipeline:
return
self.deactivate()
pipeline._set_context(True)
self._pipeline = pipeline
# store activated pipelines
# NOTE: this prevents them from being garbage collected
# NOTE: this context has thread affinity so each thread has own history
if self.enable_activation_history:
if self._pipelines is None:
self._pipelines = []
if pipeline not in self._pipelines:
self._pipelines.append(pipeline)

def is_active(self) -> bool:
return self._pipeline is not None
Expand All @@ -621,10 +638,19 @@ def deactivate(self) -> None:
self._pipeline._set_context(False)
self._pipeline = None

def clear_activation_history(self) -> None:
self._pipelines = []

def activation_history(self) -> List[SupportsPipeline]:
"""Get list of pipelines that were activated"""
if not self.enable_activation_history:
raise RuntimeError("Activation history is not enabled")
return self._pipelines or []

@classmethod
def cls__init__(self, deferred_pipeline: Callable[..., SupportsPipeline] = None) -> None:
"""Initialize the context with a function returning the Pipeline object to allow creation on first use"""
self._DEFERRED_PIPELINE = deferred_pipeline
self._PIPELINE_FACTORY = deferred_pipeline


def current_pipeline() -> SupportsPipeline:
Expand Down
4 changes: 4 additions & 0 deletions dlt/common/storages/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
SchemaStorageConfiguration,
TSchemaFileFormat,
FilesystemConfiguration,
WithLocalFiles,
FilesystemConfigurationWithLocalFiles,
)
from .fsspec_filesystem import fsspec_from_config, fsspec_filesystem

Expand All @@ -36,6 +38,8 @@
"SchemaStorageConfiguration",
"TSchemaFileFormat",
"FilesystemConfiguration",
"WithLocalFiles",
"FilesystemConfigurationWithLocalFiles",
"ParsedLoadJobFileName",
"LoadJobInfo",
"LoadPackageInfo",
Expand Down
Loading
Loading