Skip to content

Commit 418eec8

Browse files
ElleNajtclaude
andcommitted
Add VPC Direct Egress support for Cloud Run
cloud_run_client.py: - Add vpc_network/vpc_subnet/vpc_egress fields to CloudRunClientConfig - Configure VPC Direct Egress via run_v2.VpcAccess on job creation - Include VPC fields in config hash - Upload large commands to GCS when they exceed env var limits claude_code_client.py: - Add vpc_network/vpc_subnet/vpc_egress fields to ClaudeCodeClientConfig - Pass VPC config through to CloudRunClientConfig cache_manager.py: - Fix FileBasedCacheManager self-eviction bug README.md: - Document VPC egress firewall setup and configuration Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 024ebf1 commit 418eec8

File tree

6 files changed

+339
-210
lines changed

6 files changed

+339
-210
lines changed

safetytooling/apis/inference/cache_manager.py

Lines changed: 41 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
22
import os
33
import sys
4-
from collections import OrderedDict, deque
4+
from collections import deque
55
from itertools import chain
66
from pathlib import Path
77
from typing import List, Tuple, Union
@@ -137,54 +137,56 @@ def save_embeddings(self, params: EmbeddingParams, response: EmbeddingResponseBa
137137

138138

139139
class FileBasedCacheManager(BaseCacheManager):
140-
"""File-based cache with an LRU-evicted in-memory layer."""
140+
"""Original file-based cache manager implementation."""
141141

142142
def __init__(self, cache_dir: Path, num_bins: int = 20, max_mem_usage_mb: float = 5_000):
143143
super().__init__(cache_dir, num_bins)
144-
self.in_memory_cache: OrderedDict[Path, dict] = OrderedDict()
145-
self.sizes: dict[Path, float] = {}
146-
self.total_usage_mb = 0.0
144+
self.in_memory_cache = {}
145+
self.sizes = {} # Track the size of each cache file in memory
146+
self.total_usage_mb = 0
147147
self.max_mem_usage_mb = max_mem_usage_mb
148148

149-
def _evict_lru(self):
150-
"""Evict the least-recently-used bin from memory."""
151-
lru_key = next(iter(self.in_memory_cache))
152-
del self.in_memory_cache[lru_key]
153-
self.total_usage_mb -= self.sizes.pop(lru_key)
154-
LOGGER.info(f"Evicted LRU entry {lru_key} from mem cache. Total usage is now {self.total_usage_mb:.1f} MB.")
149+
def remove_entry(self, cache_file: Path):
150+
self.in_memory_cache.pop(cache_file)
155151

156-
def add_entry(self, cache_file: Path, contents: dict) -> bool:
157-
"""Add or replace a bin in the in-memory cache, evicting LRU entries if needed.
158-
159-
Returns False if the single entry is larger than the entire cache limit.
160-
"""
161-
size_mb = total_size(contents)
162-
163-
if self.max_mem_usage_mb is not None and size_mb > self.max_mem_usage_mb:
164-
LOGGER.warning(f"Entry {cache_file} ({size_mb:.1f} MB) exceeds cache limit ({self.max_mem_usage_mb} MB).")
165-
return False
166-
167-
# Remove old version first. This prevents self-eviction (the eviction
168-
# loop below can only see OTHER bins) and prevents double-counting.
169-
if cache_file in self.sizes:
170-
del self.in_memory_cache[cache_file]
171-
self.total_usage_mb -= self.sizes.pop(cache_file)
172-
173-
# Evict least-recently-used bins until there's room
174152
if self.max_mem_usage_mb is not None:
175-
while self.in_memory_cache and self.total_usage_mb + size_mb > self.max_mem_usage_mb:
176-
self._evict_lru()
153+
size = self.sizes.pop(cache_file)
154+
self.total_usage_mb -= size
155+
LOGGER.info(f"Removed entry from mem cache. Freed {size} MB.")
177156

178-
# Insert at the back (most-recently-used position)
157+
def add_entry(self, cache_file: Path, contents: dict):
179158
self.in_memory_cache[cache_file] = contents
180-
self.sizes[cache_file] = size_mb
181-
self.total_usage_mb += size_mb
182-
return True
183159

184-
def touch(self, cache_file: Path):
185-
"""Mark a bin as recently used (moves it to the back of the LRU queue)."""
186-
if cache_file in self.in_memory_cache:
187-
self.in_memory_cache.move_to_end(cache_file)
160+
if self.max_mem_usage_mb is not None:
161+
size = total_size(contents)
162+
if self.total_usage_mb + size > self.max_mem_usage_mb:
163+
space_available = self.free_space_for(size)
164+
if not space_available:
165+
return False
166+
self.sizes[cache_file] = size
167+
self.total_usage_mb += size
168+
169+
def free_space_for(self, needed_space_mb: float):
170+
if self.max_mem_usage_mb is None:
171+
return True
172+
173+
if needed_space_mb > self.max_mem_usage_mb:
174+
LOGGER.warning(
175+
f"Needed space {needed_space_mb} MB is greater than max mem usage {self.max_mem_usage_mb} MB. "
176+
"This is not possible."
177+
)
178+
return False
179+
LOGGER.info(f"Evicting entry from mem cache to free up {needed_space_mb} MB")
180+
while self.total_usage_mb > self.max_mem_usage_mb - needed_space_mb:
181+
# Find the entry with the smallest size
182+
try:
183+
smallest_entry = min(self.sizes.items(), key=lambda x: x[1])
184+
except ValueError:
185+
LOGGER.warning("No entries in mem cache to evict")
186+
return True
187+
self.remove_entry(smallest_entry[0])
188+
LOGGER.info(f"Evicted entry from mem cache. Total usage is now {self.total_usage_mb} MB.")
189+
return True
188190

189191
def get_cache_file(self, prompt: Prompt, params: LLMParams) -> tuple[Path, str]:
190192
# Use the SHA-1 hash of the prompt for the dictionary key
@@ -209,7 +211,6 @@ def maybe_load_cache(self, prompt: Prompt, params: LLMParams):
209211
self.add_entry(cache_file, contents)
210212
else:
211213
contents = self.in_memory_cache[cache_file]
212-
self.touch(cache_file)
213214

214215
data = contents.get(prompt_hash, None)
215216
return None if data is None else LLMCache.model_validate_json(data)

safetytooling/infra/cloud_run/README.md

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,12 +177,106 @@ client = ClaudeCodeClient(
177177
- **Without this, Claude could take over your entire GCP project** - don't skip this step!
178178

179179
**What this doesn't limit:**
180-
- Outbound network access (Claude could exfiltrate data to external URLs)
180+
- Outbound network access (see Egress Firewall below)
181181
- Anthropic API usage (Claude could use your API key for other purposes)
182182

183183
For the "yolo Claude" use case, the main risks are data exfiltration and API key abuse.
184184
Containers are ephemeral (destroyed after job), so there's no persistence risk.
185185

186+
## Egress Firewall (Recommended)
187+
188+
By default, containers can make outbound requests to any host. To restrict egress (e.g., only allow `api.anthropic.com` and Google APIs), use VPC Direct Egress with Cloud NGFW firewall rules.
189+
190+
**How it works:** When `vpc_network` is set, all container traffic routes through a VPC where a Cloud NGFW firewall policy controls access by domain name (FQDN rules). This covers both IPv4 and IPv6.
191+
192+
**Usage:**
193+
194+
```python
195+
client = ClaudeCodeClient(
196+
project_id="my-project",
197+
gcs_bucket="my-bucket",
198+
api_key_secret="anthropic-api-key-USERNAME",
199+
service_account="claude-runner@my-project.iam.gserviceaccount.com",
200+
vpc_network="my-egress-vpc", # VPC with NGFW firewall policy
201+
vpc_subnet="my-egress-subnet", # Subnet in the VPC
202+
vpc_egress="all-traffic", # Route all traffic through VPC
203+
)
204+
```
205+
206+
**One-time GCP setup:**
207+
208+
1. **VPC + Subnet** (with Private Google Access for Google APIs):
209+
```bash
210+
gcloud compute networks create egress-firewall-vpc --subnet-mode=custom
211+
gcloud compute networks subnets create egress-firewall-subnet \
212+
--network=egress-firewall-vpc --region=us-central1 \
213+
--range=10.100.0.0/24 --enable-private-ip-google-access
214+
```
215+
216+
2. **Cloud Router + NAT** (required for internet access from VPC):
217+
```bash
218+
gcloud compute routers create egress-firewall-router \
219+
--network=egress-firewall-vpc --region=us-central1
220+
gcloud compute routers nats create egress-firewall-nat \
221+
--router=egress-firewall-router --region=us-central1 \
222+
--auto-allocate-nat-external-ips \
223+
--endpoint-types=ENDPOINT_TYPE_VM,ENDPOINT_TYPE_MANAGED_PROXY_LB \
224+
--nat-all-subnet-ip-ranges
225+
```
226+
Note: `ENDPOINT_TYPE_MANAGED_PROXY_LB` is required — Cloud Run Direct VPC Egress uses managed proxy load balancers internally.
227+
228+
3. **Cloud NGFW firewall policy** with FQDN rules:
229+
```bash
230+
# Create policy and associate with VPC
231+
gcloud compute network-firewall-policies create egress-firewall-policy --global
232+
gcloud compute network-firewall-policies associations create \
233+
--firewall-policy=egress-firewall-policy --network=egress-firewall-vpc --global-firewall-policy
234+
235+
# Allow DNS
236+
gcloud compute network-firewall-policies rules create 100 \
237+
--firewall-policy=egress-firewall-policy --direction=EGRESS --action=allow \
238+
--dest-ip-ranges=0.0.0.0/0 --layer4-configs=udp:53,tcp:53 --global-firewall-policy
239+
240+
# Allow metadata server
241+
gcloud compute network-firewall-policies rules create 200 \
242+
--firewall-policy=egress-firewall-policy --direction=EGRESS --action=allow \
243+
--dest-ip-ranges=169.254.169.254/32 --layer4-configs=all --global-firewall-policy
244+
245+
# Allow Google APIs (list each subdomain — wildcards not supported)
246+
gcloud compute network-firewall-policies rules create 250 \
247+
--firewall-policy=egress-firewall-policy --direction=EGRESS --action=allow \
248+
--dest-fqdns=storage.googleapis.com,oauth2.googleapis.com,www.googleapis.com,\
249+
secretmanager.googleapis.com,accounts.googleapis.com,cloudresourcemanager.googleapis.com,\
250+
run.googleapis.com,logging.googleapis.com,gcr.io,iamcredentials.googleapis.com \
251+
--layer4-configs=tcp:443 --global-firewall-policy
252+
253+
# Allow Private Google Access VIPs
254+
gcloud compute network-firewall-policies rules create 300 \
255+
--firewall-policy=egress-firewall-policy --direction=EGRESS --action=allow \
256+
--dest-ip-ranges=199.36.153.0/24 --layer4-configs=tcp:443 --global-firewall-policy
257+
258+
# Allow your API provider (e.g., Anthropic)
259+
gcloud compute network-firewall-policies rules create 400 \
260+
--firewall-policy=egress-firewall-policy --direction=EGRESS --action=allow \
261+
--dest-fqdns=api.anthropic.com --layer4-configs=tcp:443 --global-firewall-policy
262+
263+
# Deny everything else (IPv4 + IPv6)
264+
gcloud compute network-firewall-policies rules create 10000 \
265+
--firewall-policy=egress-firewall-policy --direction=EGRESS --action=deny \
266+
--dest-ip-ranges=0.0.0.0/0 --layer4-configs=all --global-firewall-policy
267+
gcloud compute network-firewall-policies rules create 10001 \
268+
--firewall-policy=egress-firewall-policy --direction=EGRESS --action=deny \
269+
--dest-ip-ranges=::/0 --layer4-configs=all --global-firewall-policy
270+
```
271+
272+
**Costs:** ~$32/month for Cloud NAT gateway + ~$0.018/GB for NGFW FQDN rule evaluation. The NAT gateway runs 24/7 regardless of job activity.
273+
274+
**Key facts:**
275+
- FQDN rules don't support wildcards — must list each Google API subdomain individually
276+
- ~20s cold start penalty on first outbound connection (NAT port allocation)
277+
- IPv6 is fully blocked at the VPC level (deny `::/0`)
278+
- Cloud NGFW Standard tier pricing applies for FQDN rule traffic
279+
186280
## How It Works
187281

188282
```
@@ -255,6 +349,9 @@ ClaudeCodeClientConfig(
255349
memory: str = "2Gi", # Up to 32Gi
256350
skip_permissions: bool = True, # --dangerously-skip-permissions
257351
image: str = DEFAULT_CLAUDE_CODE_IMAGE, # Pre-built image with Claude Code
352+
vpc_network: str = None, # VPC for egress firewall (see Egress Firewall section)
353+
vpc_subnet: str = None, # Subnet in the VPC (required when vpc_network is set)
354+
vpc_egress: str = "all-traffic", # "all-traffic" or "private-ranges-only"
258355
)
259356
```
260357

@@ -333,6 +430,9 @@ CloudRunClientConfig(
333430
env: dict = {}, # Environment variables
334431
secrets: dict = {}, # Secret Manager secrets as env vars
335432
service_account: str = None, # Restricted service account (see Security Hardening)
433+
vpc_network: str = None, # VPC for egress firewall
434+
vpc_subnet: str = None, # Subnet in the VPC
435+
vpc_egress: str = None, # "all-traffic" or "private-ranges-only"
336436
)
337437
```
338438

safetytooling/infra/cloud_run/claude_code_client.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,12 @@ class ClaudeCodeClientConfig:
8787
SECURITY: Use a restricted service account to limit container access.
8888
See README for setup instructions.
8989
Format: "name@project.iam.gserviceaccount.com"
90+
vpc_network: VPC network name for Direct VPC Egress. When set with vpc_egress="all-traffic",
91+
all outbound traffic routes through the VPC where Cloud NGFW firewall policies
92+
control access. This covers both IPv4 and IPv6. Requires a Cloud NAT gateway
93+
with ENDPOINT_TYPE_MANAGED_PROXY_LB on the VPC for internet access.
94+
vpc_subnet: VPC subnet name (required when vpc_network is set).
95+
vpc_egress: VPC egress setting - "all-traffic" or "private-ranges-only" (default: "all-traffic").
9096
"""
9197

9298
project_id: str
@@ -102,6 +108,9 @@ class ClaudeCodeClientConfig:
102108
image: str = DEFAULT_CLAUDE_CODE_IMAGE
103109
api_key_secret: str | None = None
104110
service_account: str | None = None
111+
vpc_network: str | None = None
112+
vpc_subnet: str | None = None
113+
vpc_egress: str = "all-traffic"
105114

106115

107116
# Instructions prepended to task when output_instructions=True
@@ -405,6 +414,9 @@ def __init__(
405414
env={},
406415
secrets=secrets,
407416
service_account=self.config.service_account,
417+
vpc_network=self.config.vpc_network,
418+
vpc_subnet=self.config.vpc_subnet,
419+
vpc_egress=self.config.vpc_egress if self.config.vpc_network else None,
408420
)
409421
self._cloud_run = CloudRunClient(cloud_run_config)
410422

safetytooling/infra/cloud_run/cloud_run_client.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,9 @@ class CloudRunClientConfig:
102102
env: dict[str, str] = field(default_factory=dict)
103103
secrets: dict[str, str] = field(default_factory=dict)
104104
service_account: str | None = None
105+
vpc_network: str | None = None
106+
vpc_subnet: str | None = None
107+
vpc_egress: str | None = None # "all-traffic" or "private-ranges-only"
105108

106109

107110
@dataclass(frozen=True)
@@ -496,15 +499,27 @@ def _get_or_create_job(self, timeout: int) -> str:
496499
if self.config.service_account:
497500
job.template.template.service_account = self.config.service_account
498501

502+
if self.config.vpc_network:
503+
vpc_access = run_v2.VpcAccess(
504+
network_interfaces=[
505+
run_v2.VpcAccess.NetworkInterface(
506+
network=self.config.vpc_network,
507+
subnetwork=self.config.vpc_subnet,
508+
)
509+
],
510+
)
511+
if self.config.vpc_egress == "all-traffic":
512+
vpc_access.egress = run_v2.VpcAccess.VpcEgress.ALL_TRAFFIC
513+
job.template.template.vpc_access = vpc_access
514+
499515
parent = f"projects/{self.config.project_id}/locations/{self.config.region}"
500-
request = CreateJobRequest(parent=parent, job=job, job_id=job_id)
501516

517+
request = CreateJobRequest(parent=parent, job=job, job_id=job_id)
502518
try:
503519
operation = self._jobs_client.create_job(request=request)
504520
created_job = operation.result()
505521
job_name = created_job.name
506522
except Exception as e:
507-
# Job might already exist (from previous process/session)
508523
if "already exists" in str(e).lower():
509524
job_name = f"{parent}/jobs/{job_id}"
510525
else:
@@ -513,6 +528,19 @@ def _get_or_create_job(self, timeout: int) -> str:
513528
self._job_cache[config_hash] = job_name
514529
return job_name
515530

531+
_GCS_COMMANDS_PREFIX: ClassVar[str] = "cloudrun-commands"
532+
_COMMAND_SIZE_LIMIT: ClassVar[int] = 30000 # Leave headroom below 32768 env var limit
533+
534+
def _upload_command_to_gcs(self, command: str) -> str:
535+
"""Upload a large command to GCS and return its path."""
536+
cmd_hash = hashlib.sha256(command.encode()).hexdigest()[:16]
537+
gcs_path = f"{self._GCS_COMMANDS_PREFIX}/{cmd_hash}.sh"
538+
bucket = self._storage_client.bucket(self.config.gcs_bucket)
539+
blob = bucket.blob(gcs_path)
540+
if not blob.exists():
541+
blob.upload_from_string(command, content_type="text/plain")
542+
return gcs_path
543+
516544
def _run_job_execution(
517545
self,
518546
job_name: str,
@@ -524,7 +552,17 @@ def _run_job_execution(
524552
"""Run an execution of an existing job with specific inputs/outputs/command.
525553
526554
Uses RunJobRequest.Overrides to pass per-execution environment variables.
555+
If the command exceeds the env var size limit, it's uploaded to GCS and
556+
a small bootstrap script downloads and evals it.
527557
"""
558+
# If command is too large for an env var, stash it in GCS
559+
if len(command.encode()) > self._COMMAND_SIZE_LIMIT:
560+
gcs_path = self._upload_command_to_gcs(command)
561+
command = (
562+
f'gcloud storage cp "gs://{self.config.gcs_bucket}/{gcs_path}" /tmp/large_command.sh '
563+
f"&& bash /tmp/large_command.sh"
564+
)
565+
528566
# Build env var overrides for this execution
529567
env_overrides = [
530568
run_v2.EnvVar(name="OUTPUT_GCS_PATH", value=output_gcs_path),
@@ -634,6 +672,9 @@ def _compute_config_hash(self) -> str:
634672
self.config.memory,
635673
self.config.service_account or "",
636674
self.config.gcs_bucket,
675+
self.config.vpc_network or "",
676+
self.config.vpc_subnet or "",
677+
self.config.vpc_egress or "",
637678
]
638679
# Add sorted env vars
639680
for k, v in sorted(self.config.env.items()):

0 commit comments

Comments
 (0)