Skip to content

Commit 7ac298d

Browse files
authored
Merge pull request #65 from scaleapi/jason/fixes
Adding fixes
2 parents 950120f + 9ee7a02 commit 7ac298d

File tree

2 files changed

+127
-64
lines changed

2 files changed

+127
-64
lines changed

src/agentex/lib/cli/handlers/deploy_handlers.py

Lines changed: 91 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ def merge_deployment_configs(
133133
raise DeploymentError("Repository and image tag are required")
134134

135135
# Start with global configuration
136-
helm_values = {
136+
helm_values: dict[str, Any] = {
137137
"global": {
138138
"image": {
139139
"repository": repository,
@@ -157,54 +157,76 @@ def merge_deployment_configs(
157157
"memory": manifest.deployment.global_config.resources.limits.memory,
158158
},
159159
},
160+
# Enable autoscaling by default for production deployments
161+
"autoscaling": {
162+
"enabled": True,
163+
"minReplicas": 1,
164+
"maxReplicas": 10,
165+
"targetCPUUtilizationPercentage": 50,
166+
},
160167
}
161168

162169
# Handle temporal configuration using new helper methods
163170
if agent_config.is_temporal_agent():
164171
temporal_config = agent_config.get_temporal_workflow_config()
165172
if temporal_config:
166-
helm_values[TEMPORAL_WORKER_KEY] = {}
173+
helm_values[TEMPORAL_WORKER_KEY] = {
174+
"enabled": True,
175+
# Enable autoscaling for temporal workers as well
176+
"autoscaling": {
177+
"enabled": True,
178+
"minReplicas": 1,
179+
"maxReplicas": 10,
180+
"targetCPUUtilizationPercentage": 50,
181+
},
182+
}
167183
helm_values["global"]["workflow"] = {
168184
"name": temporal_config.name,
169185
"taskQueue": temporal_config.queue_name,
170186
}
171-
helm_values[TEMPORAL_WORKER_KEY]["enabled"] = True
172187

173-
secret_env_vars = []
174-
if agent_config.credentials:
175-
for credential in agent_config.credentials:
176-
secret_env_vars.append(
177-
{
178-
"name": credential.env_var_name,
179-
"secretName": credential.secret_name,
180-
"secretKey": credential.secret_key,
181-
}
182-
)
183-
184-
helm_values["secretEnvVars"] = secret_env_vars
185-
if TEMPORAL_WORKER_KEY in helm_values:
186-
helm_values[TEMPORAL_WORKER_KEY]["secretEnvVars"] = secret_env_vars
187-
188-
# Set the agent_config env vars first to the helm values and so then it can be overriden by the cluster config
188+
# Collect all environment variables with conflict detection
189+
all_env_vars: dict[str, str] = {}
190+
secret_env_vars: list[dict[str, str]] = []
191+
192+
# Start with agent_config env vars
189193
if agent_config.env:
190-
helm_values["env"] = agent_config.env
191-
if TEMPORAL_WORKER_KEY in helm_values:
192-
helm_values[TEMPORAL_WORKER_KEY]["env"] = agent_config.env
194+
all_env_vars.update(agent_config.env)
193195

194196
# Add auth principal env var if manifest principal is set
195197
encoded_principal = _encode_principal_context(manifest)
196198
if encoded_principal:
197-
if "env" not in helm_values:
198-
helm_values["env"] = {}
199-
helm_values["env"][EnvVarKeys.AUTH_PRINCIPAL_B64.value] = encoded_principal
199+
all_env_vars[EnvVarKeys.AUTH_PRINCIPAL_B64.value] = encoded_principal
200200

201-
if manifest.deployment and manifest.deployment.imagePullSecrets:
202-
pull_secrets = [
203-
pull_secret.to_dict()
204-
for pull_secret in manifest.deployment.imagePullSecrets
205-
]
206-
helm_values["global"]["imagePullSecrets"] = pull_secrets
207-
helm_values["imagePullSecrets"] = pull_secrets
201+
# Handle credentials and check for conflicts
202+
if agent_config.credentials:
203+
for credential in agent_config.credentials:
204+
# Handle both CredentialMapping objects and legacy dict format
205+
if isinstance(credential, dict):
206+
env_var_name = credential["env_var_name"]
207+
secret_name = credential["secret_name"]
208+
secret_key = credential["secret_key"]
209+
else:
210+
env_var_name = credential.env_var_name
211+
secret_name = credential.secret_name
212+
secret_key = credential.secret_key
213+
214+
# Check if the environment variable name conflicts with existing env vars
215+
if env_var_name in all_env_vars:
216+
logger.warning(
217+
f"Environment variable '{env_var_name}' is defined in both "
218+
f"env and secretEnvVars. The secret value will take precedence."
219+
)
220+
# Remove from regular env vars since secret takes precedence
221+
del all_env_vars[env_var_name]
222+
223+
secret_env_vars.append(
224+
{
225+
"name": env_var_name,
226+
"secretName": secret_name,
227+
"secretKey": secret_key,
228+
}
229+
)
208230

209231
# Apply cluster-specific overrides
210232
if cluster_config:
@@ -235,23 +257,50 @@ def merge_deployment_configs(
235257
}
236258
)
237259

260+
# Handle cluster env vars with conflict detection
238261
if cluster_config.env:
239-
helm_values["env"] = cluster_config.env
262+
# Convert cluster env list to dict for easier conflict detection
263+
cluster_env_dict = {env_var["name"]: env_var["value"] for env_var in cluster_config.env}
264+
265+
# Check for conflicts with secret env vars
266+
for secret_env_var in secret_env_vars:
267+
if secret_env_var["name"] in cluster_env_dict:
268+
logger.warning(
269+
f"Environment variable '{secret_env_var['name']}' is defined in both "
270+
f"cluster config env and secretEnvVars. The secret value will take precedence."
271+
)
272+
del cluster_env_dict[secret_env_var["name"]]
273+
274+
# Update all_env_vars with cluster overrides
275+
all_env_vars.update(cluster_env_dict)
240276

241277
# Apply additional arbitrary overrides
242278
if cluster_config.additional_overrides:
243279
_deep_merge(helm_values, cluster_config.additional_overrides)
244280

245-
# Convert the env vars to a list of dictionaries
246-
if "env" in helm_values:
247-
helm_values["env"] = convert_env_vars_dict_to_list(helm_values["env"])
248-
249-
# Convert the temporal worker env vars to a list of dictionaries
250-
if TEMPORAL_WORKER_KEY in helm_values and "env" in helm_values[TEMPORAL_WORKER_KEY]:
251-
helm_values[TEMPORAL_WORKER_KEY]["env"] = convert_env_vars_dict_to_list(
252-
helm_values[TEMPORAL_WORKER_KEY]["env"]
253-
)
281+
# Set final environment variables
282+
if all_env_vars:
283+
helm_values["env"] = convert_env_vars_dict_to_list(all_env_vars)
254284

285+
if secret_env_vars:
286+
helm_values["secretEnvVars"] = secret_env_vars
287+
288+
# Set environment variables for temporal worker if enabled
289+
if TEMPORAL_WORKER_KEY in helm_values:
290+
if all_env_vars:
291+
helm_values[TEMPORAL_WORKER_KEY]["env"] = convert_env_vars_dict_to_list(all_env_vars)
292+
if secret_env_vars:
293+
helm_values[TEMPORAL_WORKER_KEY]["secretEnvVars"] = secret_env_vars
294+
295+
# Handle image pull secrets
296+
if manifest.deployment and manifest.deployment.imagePullSecrets:
297+
pull_secrets = [
298+
pull_secret.to_dict()
299+
for pull_secret in manifest.deployment.imagePullSecrets
300+
]
301+
helm_values["global"]["imagePullSecrets"] = pull_secrets
302+
helm_values["imagePullSecrets"] = pull_secrets
303+
255304
# Add dynamic ACP command based on manifest configuration
256305
add_acp_command_to_helm_values(helm_values, manifest, manifest_path)
257306

src/agentex/resources/agents.py

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -508,17 +508,24 @@ def send_message_stream(
508508
raise ValueError("Either agent_id or agent_name must be provided")
509509

510510
with raw_agent_rpc_response as response:
511-
for agent_rpc_response_str in response.iter_text():
512-
if agent_rpc_response_str.strip(): # Only process non-empty lines
513-
try:
514-
chunk_rpc_response = SendMessageStreamResponse.model_validate(
515-
json.loads(agent_rpc_response_str),
516-
from_attributes=True
517-
)
518-
yield chunk_rpc_response
519-
except json.JSONDecodeError:
520-
# Skip invalid JSON lines
521-
continue
511+
for _line in response.iter_lines():
512+
if not _line:
513+
continue
514+
line = _line.strip()
515+
# Handle optional SSE-style prefix
516+
if line.startswith("data:"):
517+
line = line[len("data:"):].strip()
518+
if not line:
519+
continue
520+
try:
521+
chunk_rpc_response = SendMessageStreamResponse.model_validate(
522+
json.loads(line),
523+
from_attributes=True
524+
)
525+
yield chunk_rpc_response
526+
except json.JSONDecodeError:
527+
# Skip invalid JSON lines
528+
continue
522529

523530
def send_event(
524531
self,
@@ -1048,17 +1055,24 @@ async def send_message_stream(
10481055
raise ValueError("Either agent_id or agent_name must be provided")
10491056

10501057
async with raw_agent_rpc_response as response:
1051-
async for agent_rpc_response_str in response.iter_text():
1052-
if agent_rpc_response_str.strip(): # Only process non-empty lines
1053-
try:
1054-
chunk_rpc_response = SendMessageStreamResponse.model_validate(
1055-
json.loads(agent_rpc_response_str),
1056-
from_attributes=True
1057-
)
1058-
yield chunk_rpc_response
1059-
except json.JSONDecodeError:
1060-
# Skip invalid JSON lines
1061-
continue
1058+
async for _line in response.iter_lines():
1059+
if not _line:
1060+
continue
1061+
line = _line.strip()
1062+
# Handle optional SSE-style prefix
1063+
if line.startswith("data:"):
1064+
line = line[len("data:"):].strip()
1065+
if not line:
1066+
continue
1067+
try:
1068+
chunk_rpc_response = SendMessageStreamResponse.model_validate(
1069+
json.loads(line),
1070+
from_attributes=True
1071+
)
1072+
yield chunk_rpc_response
1073+
except json.JSONDecodeError:
1074+
# Skip invalid JSON lines
1075+
continue
10621076

10631077
async def send_event(
10641078
self,

0 commit comments

Comments
 (0)