Skip to content

Commit 99b805d

Browse files
authored
Merge pull request #116 from Point72/tkp/etl
Add end-to-end etl example
2 parents cc8f0ee + 012b016 commit 99b805d

File tree

21 files changed

+852
-15
lines changed

21 files changed

+852
-15
lines changed

.gitignore

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,3 +157,11 @@ ccflow/labextension
157157

158158
# Rust
159159
target
160+
161+
# Examples
162+
outputs
163+
raw.html
164+
extracted.csv
165+
etl.db
166+
lobsters.html
167+
lobsters.csv

ccflow/callable.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import logging
1616
from functools import lru_cache, wraps
1717
from inspect import Signature, isclass, signature
18-
from typing import Any, ClassVar, Dict, Generic, List, Optional, Tuple, Type, TypeVar
18+
from typing import Any, ClassVar, Dict, Generic, List, Optional, Tuple, Type, TypeVar, Union, get_args, get_origin
1919

2020
from pydantic import BaseModel as PydanticBaseModel, ConfigDict, Field, InstanceOf, PrivateAttr, TypeAdapter, field_validator, model_validator
2121
from typing_extensions import override
@@ -217,7 +217,9 @@ def get_evaluation_context(model: CallableModelType, context: ContextType, as_di
217217
def wrapper(model, context=Signature.empty, *, _options: Optional[FlowOptions] = None, **kwargs):
218218
if not isinstance(model, CallableModel):
219219
raise TypeError(f"Can only decorate methods on CallableModels (not {type(model)}) with the flow decorator.")
220-
if not isclass(model.context_type) or not issubclass(model.context_type, ContextBase):
220+
if (not isclass(model.context_type) or not issubclass(model.context_type, ContextBase)) and not (
221+
get_origin(model.context_type) is Union and type(None) in get_args(model.context_type)
222+
):
221223
raise TypeError(f"Context type {model.context_type} must be a subclass of ContextBase")
222224
if not isclass(model.result_type) or not issubclass(model.result_type, ResultBase):
223225
raise TypeError(f"Result type {model.result_type} must be a subclass of ResultBase")
@@ -237,7 +239,11 @@ def wrapper(model, context=Signature.empty, *, _options: Optional[FlowOptions] =
237239

238240
# Type coercion on input. We do this here (rather than relying on ModelEvaluationContext) as it produces a nicer traceback/error message
239241
if not isinstance(context, model.context_type):
240-
context = model.context_type.model_validate(context)
242+
if get_origin(model.context_type) is Union and type(None) in get_args(model.context_type):
243+
model_context_type = [t for t in get_args(model.context_type) if t is not type(None)][0]
244+
else:
245+
model_context_type = model.context_type
246+
context = model_context_type.model_validate(context)
241247

242248
if fn != getattr(model.__class__, fn.__name__).__wrapped__:
243249
# This happens when super().__call__ is used when implementing a CallableModel that derives from another one.
@@ -385,7 +391,8 @@ class ModelEvaluationContext(
385391
fn: str = Field("__call__", strict=True)
386392
options: Dict[str, Any] = Field(default_factory=dict)
387393
model: InstanceOf[_CallableModel]
388-
context: InstanceOf[ContextBase]
394+
context: Union[InstanceOf[ContextBase], None]
395+
389396
# Using InstanceOf instead of the actual type will limit Pydantic's validation of the field to instance checking
390397
# Otherwise, the validation will re-run fully despite the models already being validated on construction
391398
# TODO: Make the instance check compatible with the generic types instead of the base type
@@ -492,9 +499,15 @@ def context_type(self) -> Type[ContextType]:
492499
typ = _cached_signature(self.__class__.__call__).parameters["context"].annotation
493500
if typ is Signature.empty:
494501
raise TypeError("Must either define a type annotation for context on __call__ or implement 'context_type'")
495-
if not issubclass(typ, ContextBase):
496-
raise TypeError(f"Context type declared in signature of __call__ must be a subclass of ContextBase. Received {typ}.")
497502

503+
# If optional type, extract inner type
504+
if get_origin(typ) is Optional or (get_origin(typ) is Union and type(None) in get_args(typ)):
505+
typ_to_check = [t for t in get_args(typ) if t is not type(None)][0]
506+
else:
507+
typ_to_check = typ
508+
# Ensure subclass of ContextBase
509+
if not issubclass(typ_to_check, ContextBase):
510+
raise TypeError(f"Context type declared in signature of __call__ must be a subclass of ContextBase. Received {typ_to_check}.")
498511
return typ
499512

500513
@property

ccflow/examples/etl/__init__.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
from pathlib import Path
2+
from typing import List, Optional
3+
4+
from ccflow import RootModelRegistry, load_config as load_config_base
5+
6+
__all__ = ("load_config",)
7+
8+
9+
def load_config(
10+
config_dir: str = "",
11+
config_name: str = "",
12+
overrides: Optional[List[str]] = None,
13+
*,
14+
overwrite: bool = True,
15+
basepath: str = "",
16+
) -> RootModelRegistry:
17+
return load_config_base(
18+
root_config_dir=str(Path(__file__).resolve().parent / "config"),
19+
root_config_name="base",
20+
config_dir=config_dir,
21+
config_name=config_name,
22+
overrides=overrides,
23+
overwrite=overwrite,
24+
basepath=basepath,
25+
)

ccflow/examples/etl/__main__.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import hydra
2+
3+
from ccflow.utils.hydra import cfg_run
4+
5+
__all__ = ("main",)
6+
7+
8+
@hydra.main(config_path="config", config_name="base", version_base=None)
9+
def main(cfg):
10+
cfg_run(cfg)
11+
12+
13+
# Extract step:
14+
# python -m ccflow.examples.etl +callable=extract +context=[]
15+
# Change url, as example of context override:
16+
# python -m ccflow.examples.etl +callable=extract +context=["http://lobste.rs"]
17+
# Change file name, as example of callable override:
18+
# python -m ccflow.examples.etl +callable=extract +context=["http://lobste.rs"] ++extract.publisher.name=lobsters
19+
20+
# Transform step:
21+
# python -m ccflow.examples.etl +callable=transform +context=[]
22+
# python -m ccflow.examples.etl +callable=transform +context=[] ++transform.model.file=lobsters.html ++transform.publisher.name=lobsters
23+
24+
# Load step:
25+
# python -m ccflow.examples.etl +callable=load +context=[]
26+
# python -m ccflow.examples.etl +callable=load +context=[] ++load.file=lobsters.csv ++load.db_file=":memory:"
27+
28+
# View SQLite DB:
29+
# sqlite3 etl.db
30+
# .tables
31+
# select * from links;
32+
# .quit
33+
34+
# [project.scripts]
35+
# etl = "ccflow.examples.etl:main"
36+
# etl-explain = "ccflow.examples.etl:explain"
37+
38+
if __name__ == "__main__":
39+
main()

ccflow/examples/etl/config/__init__.py

Whitespace-only changes.
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
extract:
2+
_target_: ccflow.PublisherModel
3+
model:
4+
_target_: ccflow.examples.etl.models.RestModel
5+
publisher:
6+
_target_: ccflow.publishers.GenericFilePublisher
7+
name: raw
8+
suffix: .html
9+
field: value
10+
11+
transform:
12+
_target_: ccflow.PublisherModel
13+
model:
14+
_target_: ccflow.examples.etl.models.LinksModel
15+
file: ${extract.publisher.name}${extract.publisher.suffix}
16+
publisher:
17+
_target_: ccflow.publishers.GenericFilePublisher
18+
name: extracted
19+
suffix: .csv
20+
field: value
21+
22+
load:
23+
_target_: ccflow.examples.etl.models.DBModel
24+
file: ${transform.publisher.name}${transform.publisher.suffix}
25+
db_file: etl.db
26+
table: links
27+
28+
# Alternative multi-file approach
29+
# defaults:
30+
# - extract: rest
31+
# - transform: links
32+
# - load: db
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
_target_: ccflow.PublisherModel
2+
model:
3+
_target_: ccflow.examples.etl.models.RestModel
4+
publisher:
5+
_target_: ccflow.publishers.GenericFilePublisher
6+
name: raw
7+
suffix: .html
8+
field: value
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
_target_: ccflow.examples.etl.models.DBModel
2+
file: ${transform.publisher.name}${transform.publisher.suffix}
3+
db_file: etl.db
4+
table: links
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
_target_: ccflow.PublisherModel
2+
model:
3+
_target_: ccflow.examples.etl.models.LinksModel
4+
file: ${extract.publisher.name}${extract.publisher.suffix}
5+
publisher:
6+
_target_: ccflow.publishers.GenericFilePublisher
7+
name: extracted
8+
suffix: .csv
9+
field: value

ccflow/examples/etl/explain.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from ccflow.utils.hydra import cfg_explain_cli
2+
3+
from .__main__ import main
4+
5+
__all__ = ("explain",)
6+
7+
8+
def explain():
9+
cfg_explain_cli(config_path="config", config_name="base", hydra_main=main)
10+
11+
12+
if __name__ == "__main__":
13+
explain()

0 commit comments

Comments
 (0)