|
1 | 1 | import os |
2 | 2 |
|
3 | | -import docker |
4 | 3 | import pytest |
5 | 4 | from celery import Celery |
6 | 5 |
|
@@ -46,54 +45,3 @@ def default_worker_app(default_worker_app: Celery) -> Celery: |
46 | 45 | if app.conf.broker_url and app.conf.broker_url.startswith("sqs"): |
47 | 46 | app.conf.broker_transport_options["region"] = LOCALSTACK_CREDS["AWS_DEFAULT_REGION"] |
48 | 47 | return app |
49 | | - |
50 | | - |
51 | | -@pytest.fixture(scope="module", autouse=True) |
52 | | -def auto_clean_docker_resources(): |
53 | | - """Clean up Docker resources after each test module.""" |
54 | | - # Used for debugging |
55 | | - verbose = False |
56 | | - |
57 | | - def log(message): |
58 | | - if verbose: |
59 | | - print(message) |
60 | | - |
61 | | - def cleanup_docker_resources(): |
62 | | - """Function to clean up Docker containers, networks, and volumes based |
63 | | - on labels.""" |
64 | | - docker_client = docker.from_env() |
65 | | - |
66 | | - try: |
67 | | - # Clean up containers with the label 'creator=pytest-docker-tools' |
68 | | - containers = docker_client.containers.list(all=True, filters={"label": "creator=pytest-docker-tools"}) |
69 | | - for con in containers: |
70 | | - con.reload() # Ensure we have the latest status |
71 | | - if con.status != "running": # Only remove non-running containers |
72 | | - log(f"Removing container {con.name}") |
73 | | - con.remove(force=True) |
74 | | - else: |
75 | | - log(f"Skipping running container {con.name}") |
76 | | - |
77 | | - # Clean up networks with names starting with 'pytest-' |
78 | | - networks = docker_client.networks.list(names=["pytest-*"]) |
79 | | - for network in networks: |
80 | | - if not network.containers: # Check if the network is in use |
81 | | - log(f"Removing network {network.name}") |
82 | | - network.remove() |
83 | | - else: |
84 | | - log(f"Skipping network {network.name}, still in use") |
85 | | - |
86 | | - # Clean up volumes with names starting with 'pytest-*' |
87 | | - volumes = docker_client.volumes.list(filters={"name": "pytest-*"}) |
88 | | - for volume in volumes: |
89 | | - if not volume.attrs.get("UsageData", {}).get("RefCount", 0): # Check if volume is not in use |
90 | | - log(f"Removing volume {volume.name}") |
91 | | - volume.remove() |
92 | | - else: |
93 | | - log(f"Skipping volume {volume.name}, still in use") |
94 | | - |
95 | | - except Exception as e: |
96 | | - log(f"Error occurred while cleaning up Docker resources: {e}") |
97 | | - |
98 | | - log("--- Running Docker resource cleanup ---") |
99 | | - cleanup_docker_resources() |
0 commit comments