Skip to content
This repository was archived by the owner on May 27, 2025. It is now read-only.

Commit 2d37680

Browse files
authored
Improve error handling of index job management (#166)
1 parent 35e567b commit 2d37680

File tree

1 file changed

+46
-10
lines changed

1 file changed

+46
-10
lines changed

backend/manage-indexing-jobs.py

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22
# Licensed under the MIT License.
33

44
"""
5+
Note: This script is intended to be executed as a cron job on kubernetes.
6+
57
A naive implementation of a job manager that leverages k8s CronJob and CosmosDB
6-
to schedule graphrag indexing jobs in a first-come-first-serve manner (based on epoch time).
8+
to schedule graphrag indexing jobs on a first-come-first-serve basis (based on epoch time).
79
"""
810

911
import os
@@ -78,36 +80,70 @@ def _generate_aks_job_manifest(
7880
return manifest
7981

8082

83+
def list_k8s_jobs(namespace: str) -> list[str]:
84+
"""List all k8s jobs in a given namespace."""
85+
config.load_incluster_config()
86+
batch_v1 = client.BatchV1Api()
87+
jobs = batch_v1.list_namespaced_job(namespace=namespace)
88+
job_list = []
89+
for job in jobs.items:
90+
job_list.append(job.metadata.name)
91+
return job_list
92+
93+
8194
def main():
95+
"""
96+
There are two places to check to determine if an indexing job should be executed:
97+
* Kubernetes: check if there are any active k8s jobs running in the cluster
98+
* CosmosDB: check if there are any indexing jobs in a scheduled state
99+
100+
Ideally if an indexing job has finished or failed, the job status will be reflected in cosmosdb.
101+
However, if an indexing job failed due to OOM, the job status will not have been updated in cosmosdb.
102+
103+
To avoid a catastrophic failure scenario where all indexing jobs are stuck in a scheduled state,
104+
both checks are necessary.
105+
"""
106+
kubernetes_jobs = list_k8s_jobs(os.environ["AKS_NAMESPACE"])
107+
82108
azure_storage_client_manager = AzureStorageClientManager()
83109
job_container_store_client = (
84110
azure_storage_client_manager.get_cosmos_container_client(
85111
database_name="graphrag", container_name="jobs"
86112
)
87113
)
88-
# retrieve status for all jobs that are either scheduled or running
114+
# retrieve status of all index jobs that are scheduled or running
89115
job_metadata = []
90116
for item in job_container_store_client.read_all_items():
91-
# exit if a job is running
92117
if item["status"] == PipelineJobState.RUNNING.value:
93-
print(
94-
f"Indexing job for '{item['human_readable_index_name']}' already running. Will not schedule another. Exiting..."
95-
)
96-
exit()
118+
# if index job has running state but no associated k8s job, a catastrophic
119+
# failure (OOM for example) occurred. Set job status to failed.
120+
if len(kubernetes_jobs) == 0:
121+
print(
122+
f"Indexing job for '{item['human_readable_index_name']}' in 'running' state but no associated k8s job found. Updating to failed state."
123+
)
124+
pipelinejob = PipelineJob()
125+
pipeline_job = pipelinejob.load_item(item["sanitized_index_name"])
126+
pipeline_job["status"] = PipelineJobState.FAILED.value
127+
else:
128+
print(
129+
f"Indexing job for '{item['human_readable_index_name']}' already running. Will not schedule another. Exiting..."
130+
)
131+
exit()
97132
if item["status"] == PipelineJobState.SCHEDULED.value:
98133
job_metadata.append({
99134
"human_readable_index_name": item["human_readable_index_name"],
100135
"epoch_request_time": item["epoch_request_time"],
101136
"status": item["status"],
102137
"percent_complete": item["percent_complete"],
103138
})
104-
# exit if no jobs found
139+
140+
# exit if no 'scheduled' jobs were found
105141
if not job_metadata:
106142
print("No jobs found")
107143
exit()
108-
# convert to dataframe for easy processing
144+
# convert to dataframe for easier processing
109145
df = pd.DataFrame(job_metadata)
110-
# jobs are run in the order they were requested - sort by epoch_request_time
146+
# jobs should be run in the order they were requested - sort by epoch_request_time
111147
df.sort_values(by="epoch_request_time", ascending=True, inplace=True)
112148
index_to_schedule = df.iloc[0]["human_readable_index_name"]
113149
print(f"Scheduling job for index: {index_to_schedule}")

0 commit comments

Comments
 (0)