Skip to content

Commit 5ec12ff

Browse files
committed
Mark OCI charts as succeed when OCI is disabled.
Add ability to track hanging background tasks.
1 parent 6a333eb commit 5ec12ff

File tree

22 files changed

+573
-63
lines changed

22 files changed

+573
-63
lines changed

flux_local/helm_controller/controller.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ def _added_listener(self, resource_id: NamedResource, obj: BaseManifest) -> None
9191
if resource_id.kind == "HelmRelease":
9292
self._tasks.append(
9393
self._task_service.create_task(
94-
self.on_helm_release_added(resource_id, obj)
94+
self.on_helm_release_added(resource_id, obj),
95+
f"helm-controller-{resource_id}",
9596
)
9697
)
9798

@@ -386,9 +387,7 @@ async def _resolve_helm_chart(self, helm_release: HelmRelease) -> None:
386387
# Create a new chart from the HelmChartSource instead of mutating the existing one
387388
helm_release.chart = HelmChart.from_helm_chart_source(helm_chart)
388389
else:
389-
_LOGGER.debug(
390-
"HelmChart %s has no version specified", chart_resource
391-
)
390+
_LOGGER.debug("HelmChart %s has no version specified", chart_resource)
392391

393392
async def _apply(self, manifests: list[dict[str, Any]]) -> None:
394393
"""Apply the manifests to the cluster."""

flux_local/image.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@
2525
"Alertmanager", # monitoring.coreos.com/v1
2626
"Prometheus", # monitoring.coreos.com/v1
2727
"AutoscalingRunnerSet", # actions.github.com/v1alpha1
28-
"EnvoyProxy", # gateway.envoyproxy.io/v1alpha1
29-
"Plan", # upgrade.cattle.io/v1
30-
"Grafana", # grafana.integreatly.org/v1beta1
28+
"EnvoyProxy", # gateway.envoyproxy.io/v1alpha1
29+
"Plan", # upgrade.cattle.io/v1
30+
"Grafana", # grafana.integreatly.org/v1beta1
3131
]
3232

3333
# Default image key for most object types.
@@ -45,7 +45,9 @@ def _extract_images(kind: str, doc: dict[str, Any]) -> set[str]:
4545
for key, value in doc.items():
4646
if key == image_key:
4747
if isinstance(value, dict) and "reference" in value:
48-
value = value.get("reference") # https://kubernetes.io/blog/2024/08/16/kubernetes-1-31-image-volume-source/
48+
value = value.get(
49+
"reference"
50+
) # https://kubernetes.io/blog/2024/08/16/kubernetes-1-31-image-volume-source/
4951
if not isinstance(value, str):
5052
raise ValueError(
5153
f"Expected string for image key '{image_key}', got type {type(value).__name__}: {value}"

flux_local/kustomize_controller/controller.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,10 @@ async def _watch_kustomizations(self) -> None:
118118
)
119119
continue
120120
self._tasks.append(
121-
self._task_service.create_task(self.reconcile(resource_id, obj))
121+
self._task_service.create_task(
122+
self.reconcile(resource_id, obj),
123+
f"kustomize-controller-{resource_id}",
124+
)
122125
)
123126
_LOGGER.info("Stopped watching for Kustomization objects")
124127

flux_local/orchestrator/orchestrator.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ def __init__(
110110
self.store = store
111111
self.config = config or OrchestratorConfig()
112112
self.controllers: dict[str, Any] = {}
113+
self._tasks: list[asyncio.Task[None]] = []
113114

114115
def _create_controllers(self) -> None:
115116
"""Create and initialize all controllers."""
@@ -141,8 +142,12 @@ async def start(self) -> None:
141142
if self.controllers:
142143
return
143144

144-
_LOGGER.info("Starting orchestrator")
145145
self._create_controllers()
146+
self._tasks.append(
147+
get_task_service().create_background_task(
148+
self._log_active_tasks(), "log-active-tasks"
149+
)
150+
)
146151

147152
async def stop(self) -> None:
148153
"""Stop the orchestrator and all controllers."""
@@ -152,9 +157,15 @@ async def stop(self) -> None:
152157
_LOGGER.info("Stopping orchestrator")
153158

154159
# Stop controllers in reverse order
160+
close_tasks = []
155161
for name, controller in reversed(self.controllers.items()):
156162
_LOGGER.debug("Stopping controller: %s", name)
157-
await controller.close()
163+
close_tasks.append(controller.close())
164+
await asyncio.gather(*close_tasks, return_exceptions=True)
165+
166+
for task in self._tasks:
167+
task.cancel()
168+
await asyncio.gather(*self._tasks, return_exceptions=True)
158169

159170
# Wait for all tasks to complete
160171
await get_task_service().block_till_done()
@@ -332,3 +343,14 @@ async def run(self) -> None:
332343
raise FluxException(f"Uncaught error in orchestrator: {err}") from err
333344
finally:
334345
await self.stop()
346+
347+
async def _log_active_tasks(self) -> None:
348+
"""Log active tasks periodically."""
349+
task_service = get_task_service()
350+
while True:
351+
await asyncio.sleep(5)
352+
active_tasks = task_service.get_active_tasks()
353+
if active_tasks:
354+
_LOGGER.debug("Active tasks (%d):", len(active_tasks))
355+
for task in active_tasks:
356+
_LOGGER.debug(" %s", task.get_name())

flux_local/source_controller/controller.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,8 @@ def listener(resource_id: NamedResource, obj: BaseManifest) -> None:
9090
if resource_id.kind in self.SUPPORTED_KINDS:
9191
self._tasks.append(
9292
self._task_service.create_task(
93-
self.on_source_added(resource_id, obj)
93+
self.on_source_added(resource_id, obj),
94+
f"source-controller-{resource_id}",
9495
)
9596
)
9697

@@ -131,6 +132,9 @@ async def on_source_added(
131132
return
132133
if isinstance(obj, OCIRepository) and not self._config.enable_oci:
133134
_LOGGER.info("OCI support is disabled, skipping %s", resource_id)
135+
self._store.update_status(
136+
resource_id, Status.READY, "OCI support is disabled"
137+
)
134138
return
135139
if isinstance(obj, GitRepository) and self._store.get_artifact(
136140
resource_id, GitArtifact

flux_local/store/store.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,17 @@
1616
V = TypeVar("V", bound=BaseManifest | StatusInfo | Artifact)
1717

1818

19-
SUPPORTS_STATUS: set[str] = set({
19+
SUPPORTS_STATUS: set[str] = set(
20+
{
2021
"Kustomization",
2122
"GitRepository",
2223
"HelmRelease",
2324
"HelmRepository",
2425
"Bucket",
2526
"OCIRepository",
2627
"ImageRepository",
27-
})
28+
}
29+
)
2830

2931

3032
class StoreEvent(str, Enum):

flux_local/store/watcher.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,8 @@ async def watch(self) -> AsyncGenerator[DependencyResolutionEvent, None]:
258258
for dep_id in list(self._dependencies.keys()):
259259
if self._dependencies[dep_id] is None:
260260
self._dependencies[dep_id] = self._task_service.create_task(
261-
self._watch_single_dependency(dep_id)
261+
self._watch_single_dependency(dep_id),
262+
f"dependency-watcher-{dep_id}",
262263
)
263264

264265
active_watches = len(self._dependencies)
@@ -292,7 +293,9 @@ async def watch(self) -> AsyncGenerator[DependencyResolutionEvent, None]:
292293
"DependencyWaiter.watch for %s cancelled. Cleaning up.",
293294
self._parent_resource_id,
294295
)
295-
await self.cancel_pending_watches() # Ensure all sub-tasks are cancelled
296+
await (
297+
self.cancel_pending_watches()
298+
) # Ensure all sub-tasks are cancelled
296299
# Yield any remaining events that might have been queued before cancellation
297300
while not self._event_queue.empty():
298301
try:

flux_local/task/service.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ async def wait_for_task(self, task: asyncio.Task[None]) -> None:
6767
def get_num_active_tasks(self) -> int:
6868
"""Get the number of active non-background tasks."""
6969

70+
@abstractmethod
71+
def get_active_tasks(self) -> list[asyncio.Task[Any]]:
72+
"""Get the active non-background tasks."""
73+
7074

7175
class TaskServiceImpl(TaskService):
7276
"""Service for tracking and waiting for asynchronous tasks.
@@ -107,6 +111,7 @@ def create_background_task(
107111
The created task
108112
"""
109113
task = asyncio.create_task(coro, name=name)
114+
_LOGGER.debug("Created background task: %s", name)
110115
self._background_tasks.add(task)
111116
task.add_done_callback(partial(self._task_done, self._background_tasks))
112117
return task
@@ -170,3 +175,7 @@ async def wait_for_task(self, task: asyncio.Task[None]) -> None:
170175
def get_num_active_tasks(self) -> int:
171176
"""Get the number of active tasks."""
172177
return len(self._active_tasks)
178+
179+
def get_active_tasks(self) -> list[asyncio.Task[Any]]:
180+
"""Get the active tasks."""
181+
return list(self._active_tasks)

flux_local/tool/shell/action.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ class ShellAction:
2121

2222
@classmethod
2323
def register(
24-
cls, subparsers: SubParsersAction # type: ignore[type-arg]
24+
cls,
25+
subparsers: SubParsersAction, # type: ignore[type-arg]
2526
) -> ArgumentParser:
2627
"""Register the shell subcommand."""
2728
parser = cast(
@@ -54,7 +55,9 @@ async def run(self, **kwargs: Any) -> None:
5455

5556
try:
5657
# Bootstrap the system with the specified path
57-
task = asyncio.create_task(orchestrator.bootstrap(bootstrap_options))
58+
task = asyncio.create_task(
59+
orchestrator.bootstrap(bootstrap_options), name="bootstrap"
60+
)
5861
_LOGGER.debug("Orchestrator started, bootstrapping with path: %s", path)
5962

6063
# Create the shell with the bootstrapped store

flux_local/values.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,6 @@ def _update_helmrelease_values(
199199
"""Expand value references in the HelmRelease."""
200200

201201
if ref.target_path:
202-
203202
raw_parts = re.split(r"(?<!\\)\.", ref.target_path)
204203
parts = [re.sub(r"\\(.)", r"\1", raw_part) for raw_part in raw_parts]
205204

0 commit comments

Comments
 (0)