Skip to content

Commit cd1340b

Browse files
authored
Use structlog for GitDagBundle (apache#47685)
When working on stale bundle cleanup I experimented with structlog and added some logging that used its bind feature. I later removed the structlog part of it because I was not sure it would make it into 3.0. To keep the debug key-values I added a helper. We can remove it now that structlog is expected to be in 3.0.
1 parent d5ea56a commit cd1340b

File tree

2 files changed

+29
-35
lines changed

2 files changed

+29
-35
lines changed

airflow/dag_processing/bundles/git.py

Lines changed: 20 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@
1919

2020
import contextlib
2121
import json
22-
import logging
2322
import os
2423
import tempfile
2524
from pathlib import Path
2625
from typing import Any
2726
from urllib.parse import urlparse
2827

28+
import structlog
2929
from git import Repo
3030
from git.exc import BadName, GitCommandError, NoSuchPathError
3131

@@ -34,9 +34,8 @@
3434
)
3535
from airflow.exceptions import AirflowException
3636
from airflow.hooks.base import BaseHook
37-
from airflow.utils.log.logging_mixin import LoggingMixin
3837

39-
log = logging.getLogger(__name__)
38+
log = structlog.get_logger()
4039

4140

4241
class GitHook(BaseHook):
@@ -119,7 +118,7 @@ def configure_hook_env(self):
119118
yield
120119

121120

122-
class GitDagBundle(BaseDagBundle, LoggingMixin):
121+
class GitDagBundle(BaseDagBundle):
123122
"""
124123
git DAG bundle - exposes a git repository as a DAG bundle.
125124
@@ -154,33 +153,23 @@ def __init__(
154153
self.git_conn_id = git_conn_id
155154
self.repo_url = repo_url
156155

157-
def log_debug(msg, **kwargs):
158-
if not log.isEnabledFor(logging.DEBUG):
159-
return
160-
# ugly; replace when structlog implemented
161-
context = dict(
162-
bundle_name=self.name,
163-
version=self.version,
164-
bare_repo_path=self.bare_repo_path,
165-
repo_path=self.repo_path,
166-
versions_path=self.versions_dir,
167-
git_conn_id=self.git_conn_id,
168-
repo_url=self.repo_url,
169-
)
170-
context.update(kwargs)
171-
172-
for k, v in context.items():
173-
msg += f" {k}='{v}'"
174-
log.debug(msg)
175-
176-
self._log_debug = log_debug
177-
log_debug("bundle configured")
156+
self._log = log.bind(
157+
bundle_name=self.name,
158+
version=self.version,
159+
bare_repo_path=self.bare_repo_path,
160+
repo_path=self.repo_path,
161+
versions_path=self.versions_dir,
162+
git_conn_id=self.git_conn_id,
163+
repo_url=self.repo_url,
164+
)
165+
166+
self._log.debug("bundle configured")
178167
try:
179168
self.hook = GitHook(git_conn_id=self.git_conn_id, repo_url=self.repo_url)
180169
self.repo_url = self.hook.repo_url
181-
log_debug("repo_url updated from hook", repo_url=self.repo_url)
170+
self._log.debug("repo_url updated from hook", repo_url=self.repo_url)
182171
except AirflowException as e:
183-
self.log.warning("Could not create GitHook for connection %s : %s", self.git_conn_id, e)
172+
self._log.warning("Could not create GitHook", conn_id=self.git_conn_id, exc=e)
184173

185174
def _initialize(self):
186175
with self.lock():
@@ -190,7 +179,7 @@ def _initialize(self):
190179

191180
self._clone_repo_if_required()
192181
self.repo.git.checkout(self.tracking_ref)
193-
self._log_debug("bundle initialize", version=self.version)
182+
self._log.debug("bundle initialize", version=self.version)
194183
if self.version:
195184
if not self._has_version(self.repo, self.version):
196185
self.repo.remotes.origin.fetch()
@@ -207,7 +196,7 @@ def initialize(self) -> None:
207196

208197
def _clone_repo_if_required(self) -> None:
209198
if not os.path.exists(self.repo_path):
210-
self.log.info("Cloning repository to %s from %s", self.repo_path, self.bare_repo_path)
199+
self._log.info("Cloning repository", repo_path=self.repo_path, bare_repo_path=self.bare_repo_path)
211200
try:
212201
Repo.clone_from(
213202
url=self.bare_repo_path,
@@ -217,14 +206,14 @@ def _clone_repo_if_required(self) -> None:
217206
# Protection should the bare repo be removed manually
218207
raise AirflowException("Repository path: %s not found", self.bare_repo_path) from e
219208
else:
220-
self._log_debug("repo exists", repo_path=self.repo_path)
209+
self._log.debug("repo exists", repo_path=self.repo_path)
221210
self.repo = Repo(self.repo_path)
222211

223212
def _clone_bare_repo_if_required(self) -> None:
224213
if not self.repo_url:
225214
raise AirflowException(f"Connection {self.git_conn_id} doesn't have a host url")
226215
if not os.path.exists(self.bare_repo_path):
227-
self.log.info("Cloning bare repository to %s", self.bare_repo_path)
216+
self._log.info("Cloning bare repository", bare_repo_path=self.bare_repo_path)
228217
try:
229218
Repo.clone_from(
230219
url=self.repo_url,

tests/dag_processing/bundles/test_git.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import os
2121
import re
2222
from unittest import mock
23+
from unittest.mock import patch
2324

2425
import pytest
2526
from git import Repo
@@ -564,16 +565,20 @@ def test_clone_repo_no_such_path_error(self, mock_githook):
564565

565566
assert "Repository path: %s not found" in str(exc_info.value)
566567

567-
@mock.patch("airflow.dag_processing.bundles.git.GitDagBundle.log")
568-
def test_repo_url_access_missing_connection_doesnt_error(self, mock_log):
568+
@patch.dict(os.environ, {"AIRFLOW_CONN_MY_TEST_GIT": '{"host": "something"}'})
569+
@pytest.mark.parametrize("conn_id, should_find", [("my_test_git", True), ("something-else", False)])
570+
def test_repo_url_access_missing_connection_doesnt_error(self, conn_id, should_find):
569571
bundle = GitDagBundle(
570572
name="testa",
571573
tracking_ref="main",
572-
git_conn_id="unknown",
574+
git_conn_id=conn_id,
573575
repo_url="some_repo_url",
574576
)
575577
assert bundle.repo_url == "some_repo_url"
576-
assert "Could not create GitHook for connection" in mock_log.warning.call_args[0][0]
578+
if should_find:
579+
assert isinstance(bundle.hook, GitHook)
580+
else:
581+
assert not hasattr(bundle, "hook")
577582

578583
@mock.patch("airflow.dag_processing.bundles.git.GitHook")
579584
def test_lock_used(self, mock_githook, git_repo):

0 commit comments

Comments
 (0)