Skip to content

Commit 9a04b7b

Browse files
feat: Use EDK v0.4.0 (#52)
1 parent 378f193 commit 9a04b7b

File tree

9 files changed

+3631
-708
lines changed

9 files changed

+3631
-708
lines changed

.flake8

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
[flake8]
2-
ignore = W503, C901, ANN101
3-
max-line-length = 88
2+
ignore = DAR
3+
max-line-length = 120
44
exclude = files_airflow_ext,cookiecutter
5-
max-complexity = 10
65
docstring-convention = google
7-
allow-star-arg-any = true

.github/dependabot.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
version: 2
2+
updates:
3+
- package-ecosystem: "pip"
4+
directory: "/"
5+
schedule:
6+
interval: daily
7+
time: "12:00"
8+
timezone: "UTC"
9+
reviewers: [meltano/engineering]

.pre-commit-config.yaml

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
ci:
2-
autofix_prs: false
2+
autofix_prs: true
33
autoupdate_schedule: weekly
44
autoupdate_commit_msg: 'chore: pre-commit autoupdate'
55

@@ -9,38 +9,26 @@ repos:
99
hooks:
1010
- id: check-json
1111
- id: check-toml
12-
- id: check-yaml
12+
exclude: |
13+
(?x)^(
14+
copier_template/.*/pyproject.toml
15+
)$
1316
- id: end-of-file-fixer
14-
exclude: (docs/.*|samples/.*\.json)
17+
exclude: (copier_template/.*|docs/.*|samples/.*\.json)
1518
- id: trailing-whitespace
1619

17-
- repo: https://github.com/psf/black
18-
rev: 23.11.0
19-
hooks:
20-
- id: black
21-
22-
- repo: https://github.com/pycqa/isort
23-
rev: 5.12.0
20+
- repo: https://github.com/astral-sh/ruff-pre-commit
21+
rev: v0.1.6
2422
hooks:
25-
- id: isort
23+
- id: ruff
24+
args: [--fix, --exit-non-zero-on-fix, --show-fixes]
25+
- id: ruff-format
2626

2727
- repo: https://github.com/pycqa/flake8
2828
rev: 6.1.0
2929
hooks:
3030
- id: flake8
3131
additional_dependencies:
3232
- darglint==1.8.1
33-
- flake8-annotations==2.9.0
34-
- flake8-docstrings==1.6.0
3533
files: 'airflow_ext/.*'
3634
exclude: 'files_airflow_ext/orchestrate/meltano.py'
37-
38-
- repo: https://github.com/asottile/pyupgrade
39-
rev: v3.15.0
40-
hooks:
41-
- id: pyupgrade
42-
args: [--py37-plus]
43-
exclude: |
44-
(?x)^(
45-
files_airflow_ext/orchestrate/meltano.py
46-
)$

airflow_ext/main.py

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,11 @@ def initialize(ctx: typer.Context, force: bool = False) -> None:
3434
try:
3535
ext.initialize(force)
3636
except Exception:
37-
log.exception(
38-
"initialize failed with uncaught exception, please report to maintainer"
39-
)
37+
log.exception("initialize failed with uncaught exception, please report to maintainer")
4038
sys.exit(1)
4139

4240

43-
@app.command(
44-
context_settings={"allow_extra_args": True, "ignore_unknown_options": True}
45-
)
41+
@app.command(context_settings={"allow_extra_args": True, "ignore_unknown_options": True})
4642
def invoke(ctx: typer.Context, command_args: List[str]) -> None:
4743
"""Invoke the underlying wrapped cli.
4844
@@ -55,17 +51,13 @@ def invoke(ctx: typer.Context, command_args: List[str]) -> None:
5551
command_args: The command args to invoke
5652
"""
5753
command_name, command_args = command_args[0], command_args[1:]
58-
log.debug(
59-
"called", command_name=command_name, command_args=command_args, env=os.environ
60-
)
54+
log.debug("called", command_name=command_name, command_args=command_args, env=os.environ)
6155
ext.pass_through_invoker(log, command_name, *command_args)
6256

6357

6458
@app.command()
6559
def describe(
66-
output_format: DescribeFormat = typer.Option(
67-
DescribeFormat.text, "--format", help="Output format"
68-
)
60+
output_format: DescribeFormat = typer.Option(DescribeFormat.text, "--format", help="Output format"),
6961
) -> None:
7062
"""Describe the available commands of this extension.
7163
@@ -75,22 +67,16 @@ def describe(
7567
try:
7668
typer.echo(ext.describe_formatted(output_format))
7769
except Exception:
78-
log.exception(
79-
"describe failed with uncaught exception, please report to maintainer"
80-
)
70+
log.exception("describe failed with uncaught exception, please report to maintainer")
8171
sys.exit(1)
8272

8373

8474
@app.callback(invoke_without_command=True)
8575
def main(
8676
ctx: typer.Context,
8777
log_level: str = typer.Option("INFO", envvar="LOG_LEVEL"),
88-
log_timestamps: bool = typer.Option(
89-
False, envvar="LOG_TIMESTAMPS", help="Show timestamp in logs"
90-
),
91-
log_levels: bool = typer.Option(
92-
False, "--log-levels", envvar="LOG_LEVELS", help="Show log levels"
93-
),
78+
log_timestamps: bool = typer.Option(False, envvar="LOG_TIMESTAMPS", help="Show timestamp in logs"),
79+
log_levels: bool = typer.Option(False, "--log-levels", envvar="LOG_LEVELS", help="Show log levels"),
9480
meltano_log_json: bool = typer.Option(
9581
False,
9682
"--meltano-log-json",

airflow_ext/pass_through.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,5 @@ def pass_through_cli() -> None:
1313
ext = Airflow()
1414
ext.pass_through_invoker(
1515
structlog.getLogger("airflow_invoker"),
16-
*sys.argv[1:] if len(sys.argv) > 1 else []
16+
*sys.argv[1:] if len(sys.argv) > 1 else [],
1717
)

airflow_ext/wrapper.py

Lines changed: 9 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,13 @@ def __init__(self) -> None:
2525
self.airflow_bin = "airflow"
2626
self.airflow_invoker = Invoker(self.airflow_bin)
2727

28-
self.airflow_home = os.environ.get("AIRFLOW_HOME") or os.environ.get(
29-
f"{self.app_name}_AIRFLOW_HOME"
30-
)
28+
self.airflow_home = os.environ.get("AIRFLOW_HOME") or os.environ.get(f"{self.app_name}_AIRFLOW_HOME")
3129
if not self.airflow_home:
3230
log.debug("env dump", env=os.environ)
33-
log.error(
34-
"AIRFLOW_HOME not found in environment, unable to function without it"
35-
)
31+
log.error("AIRFLOW_HOME not found in environment, unable to function without it")
3632
sys.exit(1)
3733

38-
self.airflow_cfg_path = Path(
39-
os.environ.get("AIRFLOW_CONFIG", f"{self.airflow_home}/airflow.cfg")
40-
)
34+
self.airflow_cfg_path = Path(os.environ.get("AIRFLOW_CONFIG", f"{self.airflow_home}/airflow.cfg"))
4135
self.airflow_core_dags_path = Path(
4236
os.path.expandvars(
4337
os.environ.get(
@@ -75,19 +69,15 @@ def initialize(self, force: bool = False) -> None:
7569
"meltano dag generator not found, will be auto-generated",
7670
dag_generator_path=dag_generator_path,
7771
)
78-
dag_generator_path.write_bytes(
79-
pkgutil.get_data("files_airflow_ext", "orchestrate/meltano.py")
80-
)
72+
dag_generator_path.write_bytes(pkgutil.get_data("files_airflow_ext", "orchestrate/meltano.py"))
8173

8274
readme_path = self.airflow_core_dags_path / "README.md"
8375
if not readme_path.exists():
8476
log.debug(
8577
"meltano dag generator README not found, will be auto-generated",
8678
readme_path=readme_path,
8779
)
88-
readme_path.write_bytes(
89-
pkgutil.get_data("files_airflow_ext", "orchestrate/README.md")
90-
)
80+
readme_path.write_bytes(pkgutil.get_data("files_airflow_ext", "orchestrate/README.md"))
9181

9282
def invoke(self, command_name: str | None, *command_args: Any) -> None:
9383
"""Invoke the airflow command.
@@ -101,9 +91,7 @@ def invoke(self, command_name: str | None, *command_args: Any) -> None:
10191
try:
10292
self.airflow_invoker.run_and_log(command_name, *command_args)
10393
except subprocess.CalledProcessError as err:
104-
log_subprocess_error(
105-
f"airflow {command_name}", err, "airflow invocation failed"
106-
)
94+
log_subprocess_error(f"airflow {command_name}", err, "airflow invocation failed")
10795
sys.exit(err.returncode)
10896

10997
def describe(self) -> models.Describe:
@@ -115,12 +103,8 @@ def describe(self) -> models.Describe:
115103
# TODO: could we auto-generate all or portions of this from typer instead?
116104
return models.Describe(
117105
commands=[
118-
models.ExtensionCommand(
119-
name="airflow_extension", description="airflow extension commands"
120-
),
121-
models.InvokerCommand(
122-
name="airflow_invoker", description="airflow pass through invoker"
123-
),
106+
models.ExtensionCommand(name="airflow_extension", description="airflow extension commands"),
107+
models.InvokerCommand(name="airflow_invoker", description="airflow pass through invoker"),
124108
]
125109
)
126110

@@ -131,9 +115,7 @@ def _create_config(self) -> None:
131115
try:
132116
self.airflow_invoker.run("--help", stdout=subprocess.DEVNULL)
133117
except subprocess.CalledProcessError as err:
134-
log_subprocess_error(
135-
"airflow --help", err, "initial airflow invocation failed"
136-
)
118+
log_subprocess_error("airflow --help", err, "initial airflow invocation failed")
137119
sys.exit(err.returncode)
138120

139121
def _initdb(self) -> None:

files_airflow_ext/orchestrate/meltano.py

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
1+
"""Meltano DAG generator."""
2+
13
# If you want to define a custom DAG, create
24
# a new file under orchestrate/dags/ and Airflow
35
# will pick it up automatically.
46

7+
from __future__ import annotations
8+
59
import json
610
import logging
711
import os
@@ -37,13 +41,14 @@
3741

3842
if not Path(PROJECT_ROOT).joinpath(MELTANO_BIN).exists():
3943
logger.warning(
40-
f"A symlink to the 'meltano' executable could not be found at '{MELTANO_BIN}'. Falling back on expecting it "
41-
f"to be in the PATH instead. "
44+
"A symlink to the 'meltano' executable could not be found at '%s'. "
45+
"Falling back on expecting it to be in the PATH instead. ",
46+
MELTANO_BIN,
4247
)
4348
MELTANO_BIN = "meltano"
4449

4550

46-
def _meltano_elt_generator(schedules):
51+
def _meltano_elt_generator(schedules: list) -> None:
4752
"""Generate singular dag's for each legacy Meltano elt task.
4853
4954
Args:
@@ -53,7 +58,8 @@ def _meltano_elt_generator(schedules):
5358
logger.info(f"Considering schedule '{schedule['name']}': {schedule}")
5459
if not schedule["cron_interval"]:
5560
logger.info(
56-
f"No DAG created for schedule '{schedule['name']}' because its interval is set to `@once`.",
61+
"No DAG created for schedule '%s' because its interval is set to " "`@once`.",
62+
schedule["name"],
5763
)
5864
continue
5965

@@ -89,7 +95,7 @@ def _meltano_elt_generator(schedules):
8995
max_active_runs=1,
9096
)
9197

92-
elt = BashOperator(
98+
elt = BashOperator( # noqa: F841
9399
task_id="extract_load",
94100
bash_command=f"cd {PROJECT_ROOT}; {MELTANO_BIN} schedule run {schedule['name']}",
95101
dag=dag,
@@ -100,7 +106,7 @@ def _meltano_elt_generator(schedules):
100106
logger.info(f"DAG created for schedule '{schedule['name']}'")
101107

102108

103-
def _meltano_job_generator(schedules):
109+
def _meltano_job_generator(schedules: list) -> None:
104110
"""Generate dag's for each task within a Meltano scheduled job.
105111
106112
Args:
@@ -109,12 +115,14 @@ def _meltano_job_generator(schedules):
109115
for schedule in schedules:
110116
if not schedule.get("job"):
111117
logger.info(
112-
f"No DAG's created for schedule '{schedule['name']}'. It was passed to job generator but has no job."
118+
"No DAG's created for schedule '%s'. It was passed to job generator but has no job.",
119+
schedule["name"],
113120
)
114121
continue
115122
if not schedule["cron_interval"]:
116123
logger.info(
117-
f"No DAG created for schedule '{schedule['name']}' because its interval is set to `@once`."
124+
"No DAG created for schedule '%s' because its interval is set to `@once`.",
125+
schedule["name"],
118126
)
119127
continue
120128

@@ -137,7 +145,10 @@ def _meltano_job_generator(schedules):
137145
previous_task = None
138146
for idx, task in enumerate(schedule["job"]["tasks"]):
139147
logger.info(
140-
f"Considering task '{task}' of schedule '{schedule['name']}': {schedule}"
148+
"Considering task '%s' of schedule '%s': %s",
149+
task,
150+
schedule["name"],
151+
schedule,
141152
)
142153

143154
task_id = f"{base_id}_task{idx}"
@@ -155,21 +166,19 @@ def _meltano_job_generator(schedules):
155166
if previous_task:
156167
task.set_upstream(previous_task)
157168
previous_task = task
158-
logger.info(
159-
f"Spun off task '{task}' of schedule '{schedule['name']}': {schedule}"
160-
)
169+
logger.info("Spun off task '%s' of schedule '%s': %s", task, schedule["name"], schedule)
161170

162171
globals()[base_id] = dag
163172
logger.info(f"DAG created for schedule '{schedule['name']}', task='{run_args}'")
164173

165174

166-
def create_dags():
175+
def create_dags() -> None:
167176
"""Create DAGs for Meltano schedules."""
168177
list_result = subprocess.run(
169178
[MELTANO_BIN, "schedule", "list", "--format=json"],
170179
cwd=PROJECT_ROOT,
171180
stdout=subprocess.PIPE,
172-
universal_newlines=True,
181+
text=True,
173182
check=True,
174183
)
175184
schedule_export = json.loads(list_result.stdout)

0 commit comments

Comments
 (0)