Skip to content

Commit 3888e73

Browse files
Split dag bundle tests into separate files (apache#46540)
1 parent 6d17ee9 commit 3888e73

File tree

3 files changed

+161
-105
lines changed

3 files changed

+161
-105
lines changed
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from __future__ import annotations
19+
20+
import fcntl
21+
import tempfile
22+
from pathlib import Path
23+
24+
import pytest
25+
26+
from airflow.dag_processing.bundles.base import BaseDagBundle
27+
from airflow.dag_processing.bundles.local import LocalDagBundle
28+
29+
from tests_common.test_utils.config import conf_vars
30+
31+
pytestmark = pytest.mark.db_test
32+
33+
34+
@pytest.fixture(autouse=True)
35+
def bundle_temp_dir(tmp_path):
36+
with conf_vars({("dag_processor", "dag_bundle_storage_path"): str(tmp_path)}):
37+
yield tmp_path
38+
39+
40+
def test_default_dag_storage_path():
41+
with conf_vars({("dag_processor", "dag_bundle_storage_path"): ""}):
42+
bundle = LocalDagBundle(name="test", path="/hello")
43+
assert bundle._dag_bundle_root_storage_path == Path(tempfile.gettempdir(), "airflow", "dag_bundles")
44+
45+
46+
class BasicBundle(BaseDagBundle):
47+
def refresh(self):
48+
pass
49+
50+
def get_current_version(self):
51+
pass
52+
53+
def path(self):
54+
pass
55+
56+
57+
def test_dag_bundle_root_storage_path():
58+
with conf_vars({("dag_processor", "dag_bundle_storage_path"): None}):
59+
bundle = BasicBundle(name="test")
60+
assert bundle._dag_bundle_root_storage_path == Path(tempfile.gettempdir(), "airflow", "dag_bundles")
61+
62+
63+
def test_lock_acquisition():
64+
"""Test that the lock context manager sets _locked and locks a lock file."""
65+
bundle = BasicBundle(name="locktest")
66+
lock_dir = bundle._dag_bundle_root_storage_path / "_locks"
67+
lock_file = lock_dir / f"{bundle.name}.lock"
68+
69+
assert not bundle._locked
70+
71+
with bundle.lock():
72+
assert bundle._locked
73+
assert lock_file.exists()
74+
75+
# Check lock file is now locked
76+
with open(lock_file, "w") as f:
77+
try:
78+
# Try to acquire an exclusive lock in non-blocking mode.
79+
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
80+
locked = False
81+
except OSError:
82+
locked = True
83+
assert locked
84+
85+
# After, _locked is False and file unlock has been called.
86+
assert bundle._locked is False
87+
with open(lock_file, "w") as f:
88+
try:
89+
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
90+
unlocked = True
91+
fcntl.flock(f, fcntl.LOCK_UN) # Release the lock immediately.
92+
except OSError:
93+
unlocked = False
94+
assert unlocked
95+
96+
97+
def test_lock_exception_handling():
98+
"""Test that exceptions within the lock context manager still release the lock."""
99+
bundle = BasicBundle(name="locktest")
100+
lock_dir = bundle._dag_bundle_root_storage_path / "_locks"
101+
lock_file = lock_dir / f"{bundle.name}.lock"
102+
103+
try:
104+
with bundle.lock():
105+
assert bundle._locked
106+
raise Exception("...")
107+
except Exception:
108+
pass
109+
110+
# lock file should be unlocked
111+
assert not bundle._locked
112+
with open(lock_file, "w") as f:
113+
try:
114+
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
115+
acquired = True
116+
fcntl.flock(f, fcntl.LOCK_UN)
117+
except OSError:
118+
acquired = False
119+
assert acquired
Lines changed: 0 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,15 @@
1717

1818
from __future__ import annotations
1919

20-
import fcntl
2120
import os
2221
import re
23-
import tempfile
24-
from pathlib import Path
2522
from unittest import mock
2623

2724
import pytest
2825
from git import Repo
2926
from git.exc import GitCommandError, NoSuchPathError
3027

31-
from airflow.dag_processing.bundles.base import BaseDagBundle
3228
from airflow.dag_processing.bundles.git import GitDagBundle, GitHook
33-
from airflow.dag_processing.bundles.local import LocalDagBundle
3429
from airflow.exceptions import AirflowException
3530
from airflow.models import Connection
3631
from airflow.utils import db
@@ -47,106 +42,6 @@ def bundle_temp_dir(tmp_path):
4742
yield tmp_path
4843

4944

50-
def test_default_dag_storage_path():
51-
with conf_vars({("dag_processor", "dag_bundle_storage_path"): ""}):
52-
bundle = LocalDagBundle(name="test", path="/hello")
53-
assert bundle._dag_bundle_root_storage_path == Path(tempfile.gettempdir(), "airflow", "dag_bundles")
54-
55-
56-
class BasicBundle(BaseDagBundle):
57-
def refresh(self):
58-
pass
59-
60-
def get_current_version(self):
61-
pass
62-
63-
def path(self):
64-
pass
65-
66-
67-
def test_dag_bundle_root_storage_path():
68-
with conf_vars({("dag_processor", "dag_bundle_storage_path"): None}):
69-
bundle = BasicBundle(name="test")
70-
assert bundle._dag_bundle_root_storage_path == Path(tempfile.gettempdir(), "airflow", "dag_bundles")
71-
72-
73-
def test_lock_acquisition():
74-
"""Test that the lock context manager sets _locked and locks a lock file."""
75-
bundle = BasicBundle(name="locktest")
76-
lock_dir = bundle._dag_bundle_root_storage_path / "_locks"
77-
lock_file = lock_dir / f"{bundle.name}.lock"
78-
79-
assert not bundle._locked
80-
81-
with bundle.lock():
82-
assert bundle._locked
83-
assert lock_file.exists()
84-
85-
# Check lock file is now locked
86-
with open(lock_file, "w") as f:
87-
try:
88-
# Try to acquire an exclusive lock in non-blocking mode.
89-
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
90-
locked = False
91-
except OSError:
92-
locked = True
93-
assert locked
94-
95-
# After, _locked is False and file unlock has been called.
96-
assert bundle._locked is False
97-
with open(lock_file, "w") as f:
98-
try:
99-
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
100-
unlocked = True
101-
fcntl.flock(f, fcntl.LOCK_UN) # Release the lock immediately.
102-
except OSError:
103-
unlocked = False
104-
assert unlocked
105-
106-
107-
def test_lock_exception_handling():
108-
"""Test that exceptions within the lock context manager still release the lock."""
109-
bundle = BasicBundle(name="locktest")
110-
lock_dir = bundle._dag_bundle_root_storage_path / "_locks"
111-
lock_file = lock_dir / f"{bundle.name}.lock"
112-
113-
try:
114-
with bundle.lock():
115-
assert bundle._locked
116-
raise Exception("...")
117-
except Exception:
118-
pass
119-
120-
# lock file should be unlocked
121-
assert not bundle._locked
122-
with open(lock_file, "w") as f:
123-
try:
124-
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
125-
acquired = True
126-
fcntl.flock(f, fcntl.LOCK_UN)
127-
except OSError:
128-
acquired = False
129-
assert acquired
130-
131-
132-
class TestLocalDagBundle:
133-
def test_path(self):
134-
bundle = LocalDagBundle(name="test", path="/hello")
135-
assert bundle.path == Path("/hello")
136-
137-
@conf_vars({("core", "dags_folder"): "/tmp/somewhere/dags"})
138-
def test_path_default(self):
139-
bundle = LocalDagBundle(name="test", refresh_interval=300)
140-
assert bundle.path == Path("/tmp/somewhere/dags")
141-
142-
def test_none_for_version(self):
143-
assert LocalDagBundle.supports_versioning is False
144-
145-
bundle = LocalDagBundle(name="test", path="/hello")
146-
147-
assert bundle.get_current_version() is None
148-
149-
15045
GIT_DEFAULT_BRANCH = "main"
15146

15247

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from __future__ import annotations
19+
20+
from pathlib import Path
21+
22+
from airflow.dag_processing.bundles.local import LocalDagBundle
23+
24+
from tests_common.test_utils.config import conf_vars
25+
26+
27+
class TestLocalDagBundle:
28+
def test_path(self):
29+
bundle = LocalDagBundle(name="test", path="/hello")
30+
assert bundle.path == Path("/hello")
31+
32+
@conf_vars({("core", "dags_folder"): "/tmp/somewhere/dags"})
33+
def test_path_default(self):
34+
bundle = LocalDagBundle(name="test", refresh_interval=300)
35+
assert bundle.path == Path("/tmp/somewhere/dags")
36+
37+
def test_none_for_version(self):
38+
assert LocalDagBundle.supports_versioning is False
39+
40+
bundle = LocalDagBundle(name="test", path="/hello")
41+
42+
assert bundle.get_current_version() is None

0 commit comments

Comments
 (0)