Skip to content

Commit 4220caa

Browse files
committed
Add support for configurable pod scheduling options when transformer workers are launched
1 parent c817134 commit 4220caa

File tree

5 files changed

+186
-1
lines changed

5 files changed

+186
-1
lines changed

docs/deployment/reference.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,10 @@ parameters for the [rabbitMQ](https://github.com/bitnami/charts/tree/master/bitn
132132
| `transformer.autoscaler.minReplicas` | Minimum number of transformer pods per request | 1 |
133133
| `transformer.autoscaler.maxReplicas` | Maximum number of transformer pods per request | 20 |
134134
| `transformer.priorityClassName` | priorityClassName for transformer pods (Not setting it means getting global default) | Not Set |
135+
| `transformer.nodeSelector` | Kubernetes nodeSelector for transformer pod scheduling | `{}` |
136+
| `transformer.tolerations` | Kubernetes tolerations for transformer pod scheduling | `[]` |
137+
| `transformer.affinity` | Kubernetes affinity rules for transformer pod scheduling | `{}` |
138+
| `transformer.podAnnotations` | Additional annotations to add to transformer pods | `{}` |
135139
| `transformer.cpuLimit` | Set CPU resource limit for pod in number of cores | 1 |
136140
| `transformer.memoryLimit` | Set memory resource limit for pod (use Kubernetes units, e.g. [the Kubernetes documentation](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-memory)) | 2Gi |
137141
| `transformer.cpuRequest` | Set CPU resource request for pod in number of cores | 500m |

helm/servicex/templates/app/configmap.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,11 @@ data:
129129
TRANSFORMER_PERSISTENCE_PROVIDED_CLAIM = "{{ .Values.transformer.persistence.existingClaim }}"
130130
TRANSFORMER_PERSISTENCE_SUBDIR = "{{ .Values.transformer.persistence.subdir}}"
131131
132+
TRANSFORMER_NODE_SELECTOR = {{ .Values.transformer.nodeSelector | toJson }}
133+
TRANSFORMER_TOLERATIONS = {{ .Values.transformer.tolerations | toJson }}
134+
TRANSFORMER_AFFINITY = {{ .Values.transformer.affinity | toJson }}
135+
TRANSFORMER_POD_ANNOTATIONS = {{ .Values.transformer.podAnnotations | toJson }}
136+
132137
133138
{{ if .Values.objectStore.enabled }}
134139
OBJECT_STORE_ENABLED = True

helm/servicex/values.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,10 @@ transformer:
387387
existingClaim: null
388388
subdir: null
389389
priorityClassName: null
390+
nodeSelector: {}
391+
tolerations: []
392+
affinity: {}
393+
podAnnotations: {}
390394
x509Secrets:
391395
image: sslhep/x509-secrets
392396
initImage: alpine:3.6

servicex_app/servicex_app/transformer_manager.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,14 +339,35 @@ def create_job_object(
339339
)
340340

341341
# Create and Configure a spec section
342+
pod_annotations = current_app.config.get("TRANSFORMER_POD_ANNOTATIONS", {})
343+
node_selector = current_app.config.get("TRANSFORMER_NODE_SELECTOR", {})
344+
tolerations_config = current_app.config.get("TRANSFORMER_TOLERATIONS", [])
345+
affinity_config = current_app.config.get("TRANSFORMER_AFFINITY", {})
346+
347+
# Convert tolerations from config dicts to V1Toleration objects
348+
tolerations = None
349+
if tolerations_config:
350+
tolerations = [client.V1Toleration(**t) for t in tolerations_config]
351+
352+
# Convert affinity from config dict to V1Affinity object
353+
affinity = None
354+
if affinity_config:
355+
affinity = client.V1Affinity(**affinity_config)
356+
342357
template = client.V1PodTemplateSpec(
343-
metadata=client.V1ObjectMeta(labels={"app": "transformer-" + request_id}),
358+
metadata=client.V1ObjectMeta(
359+
labels={"app": "transformer-" + request_id},
360+
annotations=pod_annotations if pod_annotations else None,
361+
),
344362
spec=client.V1PodSpec(
345363
restart_policy="Always",
346364
termination_grace_period_seconds=TransformerManager.POD_TERMINATION_GRACE_PERIOD,
347365
priority_class_name=current_app.config.get(
348366
"TRANSFORMER_PRIORITY_CLASS", None
349367
),
368+
node_selector=node_selector if node_selector else None,
369+
tolerations=tolerations,
370+
affinity=affinity,
350371
containers=[
351372
sidecar,
352373
science_container,

servicex_app/servicex_app_test/test_transformer_manager.py

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1095,3 +1095,154 @@ def test_get_all_hpas(self, mocker, mock_kubernetes):
10951095
with client.application.app_context():
10961096
hpas = transformer_manager.get_all_transformer_hpas()
10971097
assert hpas == [mock_hpa]
1098+
1099+
def test_launch_transformer_with_pod_scheduling_options(self, mocker):
1100+
import kubernetes
1101+
1102+
mocker.patch.object(kubernetes.config, "load_kube_config")
1103+
mock_kubernetes = mocker.patch.object(kubernetes.client, "AppsV1Api")
1104+
1105+
mock_autoscaling = mocker.Mock()
1106+
mocker.patch.object(
1107+
kubernetes.client, "AutoscalingV1Api", return_value=mock_autoscaling
1108+
)
1109+
1110+
transformer = TransformerManager("external-kubernetes")
1111+
transformer.persistent_volume_claim_exists = mocker.Mock(return_value=True)
1112+
1113+
node_selector = {"disktype": "ssd", "region": "us-west"}
1114+
tolerations = [
1115+
{
1116+
"key": "dedicated",
1117+
"operator": "Equal",
1118+
"value": "servicex",
1119+
"effect": "NoSchedule",
1120+
},
1121+
{
1122+
"key": "gpu",
1123+
"operator": "Exists",
1124+
"effect": "NoExecute",
1125+
"toleration_seconds": 3600,
1126+
},
1127+
]
1128+
affinity = {
1129+
"node_affinity": {
1130+
"required_during_scheduling_ignored_during_execution": {
1131+
"node_selector_terms": [
1132+
{
1133+
"match_expressions": [
1134+
{
1135+
"key": "topology.kubernetes.io/zone",
1136+
"operator": "In",
1137+
"values": ["us-west-1a", "us-west-1b"],
1138+
}
1139+
]
1140+
}
1141+
]
1142+
}
1143+
}
1144+
}
1145+
pod_annotations = {
1146+
"prometheus.io/scrape": "true",
1147+
"prometheus.io/port": "8080",
1148+
}
1149+
1150+
client = self._test_client(
1151+
extra_config=make_config(
1152+
TRANSFORMER_AUTOSCALE_ENABLED=False,
1153+
TRANSFORMER_NODE_SELECTOR=node_selector,
1154+
TRANSFORMER_TOLERATIONS=tolerations,
1155+
TRANSFORMER_AFFINITY=affinity,
1156+
TRANSFORMER_POD_ANNOTATIONS=pod_annotations,
1157+
),
1158+
transformation_manager=transformer,
1159+
)
1160+
1161+
with client.application.app_context():
1162+
transformer.launch_transformer_jobs(
1163+
image="sslhep/servicex-transformer:pytest",
1164+
request_id="1234",
1165+
workers=17,
1166+
max_workers=17,
1167+
rabbitmq_uri="ampq://test.com",
1168+
namespace="my-ns",
1169+
result_destination="object-store",
1170+
result_format="arrow",
1171+
x509_secret="x509",
1172+
generated_code_cm=None,
1173+
transformer_language="scala",
1174+
transformer_command="echo",
1175+
)
1176+
called_deployment = mock_kubernetes.mock_calls[1][2]["body"]
1177+
template = called_deployment.spec.template
1178+
1179+
# Verify pod annotations
1180+
assert template.metadata.annotations == pod_annotations
1181+
1182+
# Verify node selector
1183+
assert template.spec.node_selector == node_selector
1184+
1185+
# Verify tolerations
1186+
assert len(template.spec.tolerations) == 2
1187+
assert template.spec.tolerations[0].key == "dedicated"
1188+
assert template.spec.tolerations[0].operator == "Equal"
1189+
assert template.spec.tolerations[0].value == "servicex"
1190+
assert template.spec.tolerations[0].effect == "NoSchedule"
1191+
assert template.spec.tolerations[1].key == "gpu"
1192+
assert template.spec.tolerations[1].operator == "Exists"
1193+
assert template.spec.tolerations[1].effect == "NoExecute"
1194+
assert template.spec.tolerations[1].toleration_seconds == 3600
1195+
1196+
# Verify affinity
1197+
assert template.spec.affinity is not None
1198+
assert template.spec.affinity.node_affinity is not None
1199+
1200+
def test_launch_transformer_with_empty_pod_scheduling_options(self, mocker):
1201+
import kubernetes
1202+
1203+
mocker.patch.object(kubernetes.config, "load_kube_config")
1204+
mock_kubernetes = mocker.patch.object(kubernetes.client, "AppsV1Api")
1205+
1206+
mock_autoscaling = mocker.Mock()
1207+
mocker.patch.object(
1208+
kubernetes.client, "AutoscalingV1Api", return_value=mock_autoscaling
1209+
)
1210+
1211+
transformer = TransformerManager("external-kubernetes")
1212+
transformer.persistent_volume_claim_exists = mocker.Mock(return_value=True)
1213+
1214+
# Test with empty/default values
1215+
client = self._test_client(
1216+
extra_config=make_config(
1217+
TRANSFORMER_AUTOSCALE_ENABLED=False,
1218+
TRANSFORMER_NODE_SELECTOR={},
1219+
TRANSFORMER_TOLERATIONS=[],
1220+
TRANSFORMER_AFFINITY={},
1221+
TRANSFORMER_POD_ANNOTATIONS={},
1222+
),
1223+
transformation_manager=transformer,
1224+
)
1225+
1226+
with client.application.app_context():
1227+
transformer.launch_transformer_jobs(
1228+
image="sslhep/servicex-transformer:pytest",
1229+
request_id="1234",
1230+
workers=17,
1231+
max_workers=17,
1232+
rabbitmq_uri="ampq://test.com",
1233+
namespace="my-ns",
1234+
result_destination="object-store",
1235+
result_format="arrow",
1236+
x509_secret="x509",
1237+
generated_code_cm=None,
1238+
transformer_language="scala",
1239+
transformer_command="echo",
1240+
)
1241+
called_deployment = mock_kubernetes.mock_calls[1][2]["body"]
1242+
template = called_deployment.spec.template
1243+
1244+
# Verify empty values result in None (not empty dicts/lists)
1245+
assert template.metadata.annotations is None
1246+
assert template.spec.node_selector is None
1247+
assert template.spec.tolerations is None
1248+
assert template.spec.affinity is None

0 commit comments

Comments
 (0)