Skip to content
This repository was archived by the owner on May 27, 2025. It is now read-only.

Commit 8118923

Browse files
authored
Add indexing job manager (#133)
1 parent a0b0629 commit 8118923

File tree

12 files changed

+345
-126
lines changed

12 files changed

+345
-126
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT License.
3+
4+
# NOTE: the location of this file is important as it gets referenced by the src/main.py script
5+
# and depends on the relative path to this file when uvicorn is run
6+
7+
apiVersion: batch/v1
8+
kind: CronJob
9+
metadata:
10+
name: graphrag-index-manager
11+
spec:
12+
schedule: "*/5 * * * *"
13+
jobTemplate:
14+
spec:
15+
ttlSecondsAfterFinished: 30
16+
template:
17+
metadata:
18+
labels:
19+
azure.workload.identity/use: "true"
20+
spec:
21+
serviceAccountName: PLACEHOLDER
22+
restartPolicy: OnFailure
23+
containers:
24+
- name: index-job-manager
25+
image: PLACEHOLDER
26+
imagePullPolicy: Always
27+
resources:
28+
requests:
29+
cpu: "0.5"
30+
memory: "0.5Gi"
31+
limits:
32+
cpu: "1"
33+
memory: "1Gi"
34+
envFrom:
35+
- configMapRef:
36+
name: graphrag
37+
command:
38+
- python
39+
- "manage-indexing-jobs.py"
Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
# Copyright (c) Microsoft Corporation.
22
# Licensed under the MIT License.
33

4-
# NOTE: the location of this file is important, as it is referenced by the api/index.py script and depends on the relative path to this file when uvicorn is run
5-
# To account for periods of time where an AOAI endpoint may have be getting hammered with too much work and rate-limiting will cause indexing jobs to fail, we set the backoffLimit to a high number (meaning the job will be retried 30 times before it is considered a failure) with exponential backoff
4+
# NOTE: the location of this file is important as it gets referenced by the manage-indexing-jobs.py script
5+
# and depends on the relative path to this file when uvicorn is run
6+
67
apiVersion: batch/v1
78
kind: Job
89
metadata:
910
name: PLACEHOLDER
1011
spec:
11-
ttlSecondsAfterFinished: 0
12-
backoffLimit: 6
12+
ttlSecondsAfterFinished: 30
13+
backoffLimit: 3
1314
template:
1415
metadata:
1516
labels:
@@ -23,10 +24,10 @@ spec:
2324
imagePullPolicy: Always
2425
resources:
2526
requests:
26-
cpu: "4"
27+
cpu: "6"
2728
memory: "24Gi"
2829
limits:
29-
cpu: "8"
30+
cpu: "10"
3031
memory: "32Gi"
3132
envFrom:
3233
- configMapRef:

backend/manage-indexing-jobs.py

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT License.
3+
4+
"""
5+
A naive implementation of a job manager that leverages k8s CronJob and CosmosDB
6+
to schedule graphrag indexing jobs in a first-come-first-serve manner (based on epoch time).
7+
"""
8+
9+
import os
10+
11+
import pandas as pd
12+
import yaml
13+
from kubernetes import (
14+
client,
15+
config,
16+
)
17+
from src.api.azure_clients import AzureStorageClientManager
18+
from src.api.common import sanitize_name
19+
from src.models import PipelineJob
20+
from src.reporting.reporter_singleton import ReporterSingleton
21+
from src.typing.pipeline import PipelineJobState
22+
23+
24+
def schedule_indexing_job(index_name: str):
25+
"""
26+
Schedule a k8s job to run graphrag indexing for a given index name.
27+
"""
28+
try:
29+
config.load_incluster_config()
30+
# get container image name
31+
core_v1 = client.CoreV1Api()
32+
pod_name = os.environ["HOSTNAME"]
33+
pod = core_v1.read_namespaced_pod(
34+
name=pod_name, namespace=os.environ["AKS_NAMESPACE"]
35+
)
36+
# retrieve job manifest template and replace necessary values
37+
job_manifest = _generate_aks_job_manifest(
38+
docker_image_name=pod.spec.containers[0].image,
39+
index_name=index_name,
40+
service_account_name=pod.spec.service_account_name,
41+
)
42+
batch_v1 = client.BatchV1Api()
43+
batch_v1.create_namespaced_job(
44+
body=job_manifest, namespace=os.environ["AKS_NAMESPACE"]
45+
)
46+
except Exception:
47+
reporter = ReporterSingleton().get_instance()
48+
reporter.on_error(
49+
"Index job manager encountered error scheduling indexing job",
50+
)
51+
# In the event of a catastrophic scheduling failure, something in k8s or the job manifest is likely broken.
52+
# Set job status to failed to prevent an infinite loop of re-scheduling
53+
pipelinejob = PipelineJob()
54+
pipeline_job = pipelinejob.load_item(sanitize_name(index_name))
55+
pipeline_job["status"] = PipelineJobState.FAILED
56+
57+
58+
def _generate_aks_job_manifest(
59+
docker_image_name: str,
60+
index_name: str,
61+
service_account_name: str,
62+
) -> dict:
63+
"""Generate an AKS Jobs manifest file with the specified parameters.
64+
65+
The manifest must be valid YAML with certain values replaced by the provided arguments.
66+
"""
67+
# NOTE: this file location is relative to the WORKDIR set in Dockerfile-backend
68+
with open("indexing-job-template.yaml", "r") as f:
69+
manifest = yaml.safe_load(f)
70+
manifest["metadata"]["name"] = f"indexing-job-{sanitize_name(index_name)}"
71+
manifest["spec"]["template"]["spec"]["serviceAccountName"] = service_account_name
72+
manifest["spec"]["template"]["spec"]["containers"][0]["image"] = docker_image_name
73+
manifest["spec"]["template"]["spec"]["containers"][0]["command"] = [
74+
"python",
75+
"run-indexing-job.py",
76+
f"-i={index_name}",
77+
]
78+
return manifest
79+
80+
81+
def main():
82+
azure_storage_client_manager = AzureStorageClientManager()
83+
job_container_store_client = (
84+
azure_storage_client_manager.get_cosmos_container_client(
85+
database_name="graphrag", container_name="jobs"
86+
)
87+
)
88+
# retrieve status for all jobs that are either scheduled or running
89+
job_metadata = []
90+
for item in job_container_store_client.read_all_items():
91+
# exit if a job is running
92+
if item["status"] == PipelineJobState.RUNNING.value:
93+
print(
94+
f"Indexing job for '{item['human_readable_index_name']}' already running. Will not schedule another. Exiting..."
95+
)
96+
exit()
97+
if item["status"] == PipelineJobState.SCHEDULED.value:
98+
job_metadata.append(
99+
{
100+
"human_readable_index_name": item["human_readable_index_name"],
101+
"epoch_request_time": item["epoch_request_time"],
102+
"status": item["status"],
103+
"percent_complete": item["percent_complete"],
104+
}
105+
)
106+
# exit if no jobs found
107+
if not job_metadata:
108+
print("No jobs found")
109+
exit()
110+
# convert to dataframe for easy processing
111+
df = pd.DataFrame(job_metadata)
112+
# jobs are run in the order they were requested - sort by epoch_request_time
113+
df.sort_values(by="epoch_request_time", ascending=True, inplace=True)
114+
index_to_schedule = df.iloc[0]["human_readable_index_name"]
115+
print(f"Scheduling job for index: {index_to_schedule}")
116+
schedule_indexing_job(index_to_schedule)
117+
118+
119+
if __name__ == "__main__":
120+
main()

0 commit comments

Comments
 (0)