Pipelock wraps MCP servers used by Google ADK agents as a stdio proxy, scanning
every request and response for credential leaks, prompt injection, and tool
description poisoning. This guide covers McpToolset with StdioConnectionParams
and Docker Compose deployment.
# 1. Install pipelock
go install github.com/luckyPipewrench/pipelock/cmd/pipelock@latest
# 2. Generate a config (or copy a preset)
pipelock generate config --preset balanced > pipelock.yaml
# 3. Verify
pipelock versionfrom google.adk.agents import Agent
from google.adk.tools import McpToolset
from google.adk.tools.mcp_tool.mcp_session_manager import StdioConnectionParams
from mcp import StdioServerParameters
filesystem_toolset = McpToolset(
connection_params=StdioConnectionParams(
server_params=StdioServerParameters(
command="pipelock",
args=[
"mcp", "proxy",
"--config", "pipelock.yaml",
"--",
"npx", "-y", "@modelcontextprotocol/server-filesystem", "/workspace"
],
)
)
)
agent = Agent(
model="gemini-2.0-flash",
name="research_agent",
instruction="You help users research information using available tools.",
tools=[filesystem_toolset],
)That's it. Pipelock intercepts all MCP traffic between the ADK agent and the filesystem server, scanning in both directions.
ADK Agent <--> pipelock mcp proxy <--> MCP Server
(client) (scan both ways) (subprocess)
Pipelock scans three things:
- Outbound requests. Catches credentials leaking through tool arguments (API keys, tokens, private key material).
- Inbound responses. Catches prompt injection in tool results.
- Tool descriptions. Catches poisoned tool definitions and mid-session rug-pull changes.
The standard ADK pattern using Runner for agent execution:
import asyncio
from google.adk.agents import Agent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools import McpToolset
from google.adk.tools.mcp_tool.mcp_session_manager import StdioConnectionParams
from google.genai import types
from mcp import StdioServerParameters
async def main():
toolset = McpToolset(
connection_params=StdioConnectionParams(
server_params=StdioServerParameters(
command="pipelock",
args=[
"mcp", "proxy",
"--config", "pipelock.yaml",
"--",
"npx", "-y", "@modelcontextprotocol/server-filesystem", "/workspace"
],
)
)
)
agent = Agent(
model="gemini-2.0-flash",
name="file_agent",
instruction="You analyze files in the workspace.",
tools=[toolset],
)
runner = Runner(
agent=agent,
app_name="pipelock_demo",
session_service=InMemorySessionService(),
)
session = await runner.session_service.create_session(
app_name="pipelock_demo", user_id="user1"
)
content = types.Content(
role="user",
parts=[types.Part(text="List all files in the workspace")]
)
async for event in runner.run_async(
user_id="user1", session_id=session.id, new_message=content
):
if event.is_final_response():
print(event.content.parts[0].text)
asyncio.run(main())Wrap each server independently with its own Pipelock proxy:
from google.adk.agents import Agent
from google.adk.tools import McpToolset
from google.adk.tools.mcp_tool.mcp_session_manager import StdioConnectionParams
from mcp import StdioServerParameters
filesystem = McpToolset(
connection_params=StdioConnectionParams(
server_params=StdioServerParameters(
command="pipelock",
args=["mcp", "proxy", "--config", "pipelock.yaml", "--",
"npx", "-y", "@modelcontextprotocol/server-filesystem", "/workspace"],
)
)
)
database = McpToolset(
connection_params=StdioConnectionParams(
server_params=StdioServerParameters(
command="pipelock",
args=["mcp", "proxy", "--config", "pipelock.yaml", "--",
"python", "-m", "mcp_server_sqlite", "--db", "/data/app.db"],
)
)
)
agent = Agent(
model="gemini-2.0-flash",
name="multi_tool_agent",
instruction="You work with files and databases.",
tools=[filesystem, database],
)Wrap stdio servers with Pipelock. Remote servers connect directly and are not covered by the stdio proxy:
from google.adk.agents import Agent
from google.adk.tools import McpToolset
from google.adk.tools.mcp_tool.mcp_session_manager import (
StdioConnectionParams,
SseConnectionParams,
)
from mcp import StdioServerParameters
# Local server: wrap with pipelock
local = McpToolset(
connection_params=StdioConnectionParams(
server_params=StdioServerParameters(
command="pipelock",
args=["mcp", "proxy", "--config", "pipelock.yaml", "--",
"npx", "-y", "@modelcontextprotocol/server-filesystem", "/tmp"],
)
)
)
# Remote server: NOT scanned by pipelock
# Pipelock's MCP proxy only wraps stdio servers.
# For remote servers, vet the server before connecting.
remote = McpToolset(
connection_params=SseConnectionParams(url="https://api.example.com/mcp/sse")
)
agent = Agent(
model="gemini-2.0-flash",
name="hybrid_agent",
instruction="You use local and remote tools.",
tools=[local, remote],
)Note: Pipelock's MCP proxy only wraps stdio-based servers. Remote HTTP/SSE
MCP connections go directly to the remote endpoint and bypass Pipelock. For
outbound HTTP traffic from your agent code (API calls, web fetches), route those
through pipelock run as a fetch proxy. See the
HTTP fetch proxy section below.
ADK supports hierarchical agent architectures. Each sub-agent can have its own Pipelock-wrapped MCP servers with different security configs:
from google.adk.agents import Agent
from google.adk.tools import McpToolset
from google.adk.tools.mcp_tool.mcp_session_manager import StdioConnectionParams
from mcp import StdioServerParameters
researcher = Agent(
model="gemini-2.0-flash",
name="researcher",
instruction="You research topics using available tools.",
tools=[
McpToolset(
connection_params=StdioConnectionParams(
server_params=StdioServerParameters(
command="pipelock",
args=["mcp", "proxy", "--config", "pipelock-warn.yaml", "--",
"npx", "-y", "@modelcontextprotocol/server-fetch"],
)
)
),
],
)
writer = Agent(
model="gemini-2.0-flash",
name="writer",
instruction="You write reports. Delegate research to the researcher.",
tools=[
McpToolset(
connection_params=StdioConnectionParams(
server_params=StdioServerParameters(
command="pipelock",
args=["mcp", "proxy", "--config", "pipelock-strict.yaml", "--",
"npx", "-y", "@modelcontextprotocol/server-filesystem", "/output"],
)
)
),
],
sub_agents=[researcher],
)Network-isolated deployment where the agent container has no direct internet access:
networks:
pipelock-internal:
internal: true
driver: bridge
pipelock-external:
driver: bridge
services:
pipelock:
# Pin to a specific version for production (e.g., ghcr.io/luckypipewrench/pipelock:0.x.y)
image: ghcr.io/luckypipewrench/pipelock:latest
networks:
- pipelock-internal
- pipelock-external
command: ["run", "--listen", "0.0.0.0:8888", "--config", "/config/pipelock.yaml"]
volumes:
- ./pipelock.yaml:/config/pipelock.yaml:ro
healthcheck:
test: ["/pipelock", "healthcheck"]
interval: 10s
timeout: 3s
start_period: 5s
retries: 3
adk-agent:
build: .
networks:
- pipelock-internal
environment:
- GOOGLE_API_KEY=${GOOGLE_API_KEY}
- PIPELOCK_FETCH_URL=http://pipelock:8888/fetch
depends_on:
pipelock:
condition: service_healthyThe agent container can only reach the pipelock service. All HTTP traffic goes
through the fetch proxy. MCP servers running as subprocesses inside the agent
container are wrapped with pipelock mcp proxy as shown above.
You can also generate this template with:
pipelock generate docker-compose --agent genericFor scanning HTTP traffic from ADK agents (web fetches, API calls), run Pipelock as a fetch proxy:
pipelock run --config pipelock.yamlConfigure your agent to route HTTP requests through http://localhost:8888/fetch:
import requests
def fetch_through_pipelock(url: str) -> str:
resp = requests.get(
"http://localhost:8888/fetch",
params={"url": url},
headers={"X-Pipelock-Agent": "adk-research"},
timeout=30,
)
resp.raise_for_status()
data = resp.json()
if data.get("blocked"):
raise RuntimeError(f"Pipelock blocked request: {data.get('block_reason')}")
return data.get("content", "")When using pipelock as an HTTP forward proxy (HTTPS_PROXY), CONNECT tunnels
are opaque by default: pipelock only sees the hostname, not the request body or
response content. Enabling TLS interception closes this gap by performing a MITM
on HTTPS connections, giving you full DLP on request bodies and response
injection detection through CONNECT tunnels.
To enable it:
- Generate a CA and enable TLS interception (see the TLS Interception Guide)
- Trust the CA in your Python environment:
export SSL_CERT_FILE=~/.pipelock/ca.pem
# Or for requests/httpx specifically:
export REQUESTS_CA_BUNDLE=~/.pipelock/ca.pemMCP proxy mode (stdio wrapping) does not require TLS interception. It scans traffic in both directions without certificates.
| Config | Action | Best For |
|---|---|---|
balanced |
warn (default) | Recommended starting point (--preset balanced) |
strict |
block (default) | High-security, production (--preset strict) |
generic-agent.yaml |
warn (default) | Agent-specific tuning (copy from configs/) |
claude-code.yaml |
block (default) | Unattended coding agents (copy from configs/) |
Start with balanced to log detections without blocking. Review the logs,
tune thresholds, then switch to strict for production.
Verify the command works without Pipelock first:
npx -y @modelcontextprotocol/server-filesystem /tmpThen wrap it:
pipelock mcp proxy -- npx -y @modelcontextprotocol/server-filesystem /tmpPipelock logs to stderr. To see real-time output during development:
echo '{"jsonrpc":"2.0","id":1,"method":"tools/list"}' | \
pipelock mcp proxy --config pipelock.yaml -- npx -y @modelcontextprotocol/server-filesystem /tmpSwitch to warn mode to see what's being flagged without blocking:
response_scanning:
action: warn
mcp_input_scanning:
action: warn
mcp_tool_scanning:
action: warnReview stderr output, then tighten thresholds.
Use absolute paths if relative paths don't resolve:
McpToolset(
connection_params=StdioConnectionParams(
server_params=StdioServerParameters(
command="pipelock",
args=["mcp", "proxy", "--config", "/etc/pipelock/config.yaml", "--",
"npx", "-y", "@modelcontextprotocol/server-filesystem", "/tmp"],
)
)
)