diff --git a/docs/ai.md b/docs/ai.md new file mode 100644 index 0000000..225b001 --- /dev/null +++ b/docs/ai.md @@ -0,0 +1,198 @@ +--- +title: "AI Assistant" +sidebar: + order: 5 +--- + +`datumctl ai` is a natural-language interface for Datum Cloud. Describe what you +want to do in plain English and the assistant translates it into resource +operations — listing, inspecting, creating, updating, and deleting resources — +with confirmation prompts before any changes are applied. + +## Prerequisites + +You need an API key from one of the supported LLM providers: + +| Provider | Environment variable | Get a key at | +|-----------|-----------------------|-------------------------------------| +| Anthropic | `ANTHROPIC_API_KEY` | console.anthropic.com | +| OpenAI | `OPENAI_API_KEY` | platform.openai.com | +| Gemini | `GEMINI_API_KEY` | aistudio.google.com | + +You also need to be logged in to Datum Cloud: + +``` +datumctl auth login +``` + +## Quick start + +The fastest way to get started is to save your API key and default organization +once, then never pass flags again: + +``` +# Save your API key +datumctl ai config set anthropic_api_key sk-ant-... + +# Save your default organization or project +datumctl ai config set organization my-org-id +# or +datumctl ai config set project my-project-id + +# Now just run it +datumctl ai "what resources do I have?" +``` + +Find your organization ID with: + +``` +datumctl get organizations +``` + +## Usage + +``` +datumctl ai [query] [flags] +``` + +### Single query + +``` +datumctl ai "list all DNS zones" --project my-project-id +``` + +### Interactive session (REPL) + +Omit the query argument to start a conversation. The assistant remembers +context across turns so you can ask follow-up questions. + +``` +datumctl ai --organization my-org-id +``` + +Type `exit` or `quit` to end the session. + +### Pipe mode + +``` +echo "how many projects do I have?" | datumctl ai --organization my-org-id +``` + +Read-only operations work in pipe mode. Write operations (apply, delete) are +automatically declined — run interactively to apply changes. + +## Configuration + +`datumctl ai config` manages a configuration file that stores defaults for every +`datumctl ai` invocation. On Linux/macOS the file is at +`~/.config/datumctl/ai.yaml`; on Windows it is at `%AppData%\datumctl\ai.yaml`. + +### Set a value + +``` +datumctl ai config set +``` + +| Key | Description | +|---------------------|----------------------------------------------------------| +| `organization` | Default organization ID | +| `project` | Default project ID (mutually exclusive with organization)| +| `namespace` | Default namespace (default: `default`) | +| `provider` | LLM provider: `anthropic`, `openai`, or `gemini` | +| `model` | Model name, e.g. `claude-sonnet-4-6`, `gpt-4o` | +| `max_iterations` | Agentic loop iteration cap (default: `20`) | +| `anthropic_api_key` | Anthropic API key | +| `openai_api_key` | OpenAI API key | +| `gemini_api_key` | Gemini API key | + +### Show current configuration + +API keys are redacted in the output. + +``` +datumctl ai config show +``` + +### Remove a value + +``` +datumctl ai config unset organization +``` + +### Priority order + +Later entries override earlier ones: + +1. Config file (`~/.config/datumctl/ai.yaml`) +2. CLI flags (`--organization`, `--model`, etc.) +3. Environment variables (`ANTHROPIC_API_KEY`, `OPENAI_API_KEY`, `GEMINI_API_KEY`) + +## Flags + +| Flag | Description | +|--------------------|----------------------------------------------------| +| `--organization` | Organization context (overrides config file) | +| `--project` | Project context (overrides config file) | +| `--namespace` | Default namespace (overrides config file) | +| `--model` | Model override, e.g. `claude-sonnet-4-6`, `gpt-4o`| +| `--max-iterations` | Agentic loop iteration cap (default: `20`) | + +## How it works + +The assistant has access to the following tools, which map directly to the same +operations available via `datumctl get`, `datumctl apply`, and the MCP server: + +| Tool | Operation | Requires confirmation | +|-----------------------|----------------------------------------|-----------------------| +| `list_resource_types` | Discover available resource types | No | +| `get_resource_schema` | Fetch the schema for a resource type | No | +| `list_resources` | List resources of a given kind | No | +| `get_resource` | Get a single resource by name | No | +| `validate_manifest` | Server-side dry-run validation | No | +| `apply_manifest` | Create or update a resource | **Yes** | +| `delete_resource` | Delete a resource | **Yes** | +| `change_context` | Switch organization/project/namespace | No | + +Read operations execute immediately. For `apply_manifest` and `delete_resource` +the assistant shows a preview of the proposed change and prompts: + +``` +--- Proposed action --- +Tool: apply_manifest +Details: +{ + "yaml": "apiVersion: ..." +} +----------------------- +Apply changes? [y/N]: +``` + +Type `y` to proceed. Any other input cancels the operation — the assistant is +informed it was skipped and will ask what to do next. + +## Provider selection + +The provider is chosen automatically from whichever API key is available. +When multiple keys are present the priority is Anthropic → OpenAI → Gemini. + +Override with `--model` using a provider-prefixed model name: + +``` +datumctl ai "list zones" --model claude-opus-4-6 # Anthropic +datumctl ai "list zones" --model gpt-4o # OpenAI +datumctl ai "list zones" --model gemini-2.0-flash # Gemini +``` + +Or set a permanent default: + +``` +datumctl ai config set model claude-sonnet-4-6 +``` + +## Default models + +| Provider | Default model | +|-----------|----------------------| +| Anthropic | `claude-sonnet-4-6` | +| OpenAI | `gpt-4o` | +| Gemini | `gemini-2.0-flash` | diff --git a/go.mod b/go.mod index 9753a9d..920eb30 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( go.miloapis.com/activity v0.3.1 go.miloapis.com/milo v0.21.0 golang.org/x/oauth2 v0.36.0 + golang.org/x/term v0.39.0 k8s.io/apiextensions-apiserver v0.35.2 k8s.io/apimachinery v0.35.2 k8s.io/cli-runtime v0.35.2 @@ -95,7 +96,6 @@ require ( golang.org/x/net v0.49.0 // indirect golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.40.0 // indirect - golang.org/x/term v0.39.0 // indirect golang.org/x/text v0.33.0 // indirect golang.org/x/time v0.14.0 // indirect google.golang.org/protobuf v1.36.11 // indirect diff --git a/go.sum b/go.sum index 9482f8d..b6402e7 100644 --- a/go.sum +++ b/go.sum @@ -16,6 +16,8 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chai2010/gettext-go v1.0.3 h1:9liNh8t+u26xl5ddmWLmsOsdNLwkdRTg5AG+JnTiM80= github.com/chai2010/gettext-go v1.0.3/go.mod h1:y+wnP2cHYaVj19NZhYKAwEMH2CI1gNHeQQ+5AjwawxA= +github.com/clipperhouse/uax29/v2 v2.2.0 h1:ChwIKnQN3kcZteTXMgb1wztSgaU+ZemkgWdohwgs8tY= +github.com/clipperhouse/uax29/v2 v2.2.0/go.mod h1:EFJ2TJMRUaplDxHKj1qAEhCtQPW2tJSwu5BF98AuoVM= github.com/coreos/go-oidc/v3 v3.17.0 h1:hWBGaQfbi0iVviX4ibC7bk8OKT5qNr4klBaCHVNvehc= github.com/coreos/go-oidc/v3 v3.17.0/go.mod h1:wqPbKFrVnE90vty060SB40FCJ8fTHTxSwyXJqZH+sI8= github.com/cpuguy83/go-md2man/v2 v2.0.6 h1:XJtiaUW6dEEqVuZiMTn1ldk455QWwEIsMIJlo5vtkx0= @@ -90,7 +92,6 @@ github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg= github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= github.com/google/gnostic-models v0.7.1 h1:SisTfuFKJSKM5CPZkffwi6coztzzeYUhc3v4yxLWH8c= github.com/google/gnostic-models v0.7.1/go.mod h1:whL5G0m6dmc5cPxKc5bdKdEN3UjI7OUGxBlw57miDrQ= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -122,8 +123,8 @@ github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhn github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE= github.com/lithammer/dedent v1.1.0 h1:VNzHMVCBNG1j0fh3OrsFRkVUwStdDArbgBWoPAffktY= github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc= -github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc= -github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/mattn/go-runewidth v0.0.21 h1:jJKAZiQH+2mIinzCJIaIG9Be1+0NR+5sz/lYEEjdM8w= +github.com/mattn/go-runewidth v0.0.21/go.mod h1:XBkDxAl56ILZc9knddidhrOlY5R/pDhgLpndooCuJAs= github.com/mitchellh/go-wordwrap v1.0.1 h1:TLuKupo69TCn6TQSyGxwI1EblZZEsQ0vMlAFQflz0v0= github.com/mitchellh/go-wordwrap v1.0.1/go.mod h1:R62XHJLzvMFRBbcrT7m7WgmE1eOyTSsCt+hzestvNj0= github.com/moby/spdystream v0.5.0 h1:7r0J1Si3QO/kjRitvSLVVFUjxMEb/YLj6S9FF62JBCU= @@ -163,10 +164,6 @@ github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9Z github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= -github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= -github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= -github.com/rodaine/table v1.3.0 h1:4/3S3SVkHnVZX91EHFvAMV7K42AnJ0XuymRR2C5HlGE= -github.com/rodaine/table v1.3.0/go.mod h1:47zRsHar4zw0jgxGxL9YtFfs7EGN6B/TaS+/Dmk4WxU= github.com/rodaine/table v1.3.1 h1:jBVgg1bEu5EzEdYSrwUUlQpayDtkvtTmgFS0FPAxOq8= github.com/rodaine/table v1.3.1/go.mod h1:VYCJRCHa2DpD25uFALcB6hi5ECF3eEJQVhCXRjHgXc4= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= @@ -181,17 +178,11 @@ github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk= github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= @@ -200,12 +191,8 @@ github.com/xlab/treeprint v1.2.0 h1:HzHnuAF1plUN2zGlAFHbSQP2qJ0ZAD3XF5XD7OesXRQ= github.com/xlab/treeprint v1.2.0/go.mod h1:gj5Gd3gPdKtR1ikdDK6fnFLdmIS0X30kTTuNd/WEJu0= github.com/zalando/go-keyring v0.2.6 h1:r7Yc3+H+Ux0+M72zacZoItR3UDxeWfKTcabvkI8ua9s= github.com/zalando/go-keyring v0.2.6/go.mod h1:2TCrxYrbUNYfNS/Kgy/LSrkSQzZ5UPVH85RwfczwvcI= -go.miloapis.com/activity v0.3.0 h1:ajcl5U1UeNZZiA+iQ+NbCi0ysvCyQbQ1rKKdOEA06/c= -go.miloapis.com/activity v0.3.0/go.mod h1:W6Y8qvMCY47DFZPnZ35H+ambSEAbROOw3RGS3RT4M0Q= go.miloapis.com/activity v0.3.1 h1:Yq8pdfphiAqr3DqZNQ0a50SadHrbdZyqng/HEwHe4WI= go.miloapis.com/activity v0.3.1/go.mod h1:W6Y8qvMCY47DFZPnZ35H+ambSEAbROOw3RGS3RT4M0Q= -go.miloapis.com/milo v0.16.2 h1:MqrBQvTYWIWBlniJXqBe3ycO/sZxgkL2tYems4s1+LY= -go.miloapis.com/milo v0.16.2/go.mod h1:xOFYvUsvSZV3z6eow5YdB5C/qRQf2s/5/arcfJs5XPg= go.miloapis.com/milo v0.21.0 h1:bTazLscUTQWSnBwD22sZ2dlUFHiQfZOz6rcYeX4PecU= go.miloapis.com/milo v0.21.0/go.mod h1:xOFYvUsvSZV3z6eow5YdB5C/qRQf2s/5/arcfJs5XPg= go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48= @@ -226,8 +213,6 @@ golang.org/x/mod v0.31.0 h1:HaW9xtz0+kOcWKwli0ZXy79Ix+UW/vOfmWI5QVd2tgI= golang.org/x/mod v0.31.0/go.mod h1:43JraMp9cGx1Rx3AqioxrbrhNsLl2l/iNAvuBkrezpg= golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o= golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8= -golang.org/x/oauth2 v0.34.0 h1:hqK/t4AKgbqWkdkcAeI8XLmbK+4m4G5YeQRrmiotGlw= -golang.org/x/oauth2 v0.34.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA= golang.org/x/oauth2 v0.36.0 h1:peZ/1z27fi9hUOFCAZaHyrpWG5lwe0RJEEEeH0ThlIs= golang.org/x/oauth2 v0.36.0/go.mod h1:YDBUJMTkDnJS+A4BP4eZBjCqtokkg1hODuPjwiGPO7Q= golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= @@ -256,54 +241,32 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -k8s.io/api v0.35.0 h1:iBAU5LTyBI9vw3L5glmat1njFK34srdLmktWwLTprlY= -k8s.io/api v0.35.0/go.mod h1:AQ0SNTzm4ZAczM03QH42c7l3bih1TbAXYo0DkF8ktnA= k8s.io/api v0.35.2 h1:tW7mWc2RpxW7HS4CoRXhtYHSzme1PN1UjGHJ1bdrtdw= k8s.io/api v0.35.2/go.mod h1:7AJfqGoAZcwSFhOjcGM7WV05QxMMgUaChNfLTXDRE60= -k8s.io/apiextensions-apiserver v0.35.0 h1:3xHk2rTOdWXXJM+RDQZJvdx0yEOgC0FgQ1PlJatA5T4= -k8s.io/apiextensions-apiserver v0.35.0/go.mod h1:E1Ahk9SADaLQ4qtzYFkwUqusXTcaV2uw3l14aqpL2LU= k8s.io/apiextensions-apiserver v0.35.2 h1:iyStXHoJZsUXPh/nFAsjC29rjJWdSgUmG1XpApE29c0= k8s.io/apiextensions-apiserver v0.35.2/go.mod h1:OdyGvcO1FtMDWQ+rRh/Ei3b6X3g2+ZDHd0MSRGeS8rU= -k8s.io/apimachinery v0.35.0 h1:Z2L3IHvPVv/MJ7xRxHEtk6GoJElaAqDCCU0S6ncYok8= -k8s.io/apimachinery v0.35.0/go.mod h1:jQCgFZFR1F4Ik7hvr2g84RTJSZegBc8yHgFWKn//hns= k8s.io/apimachinery v0.35.2 h1:NqsM/mmZA7sHW02JZ9RTtk3wInRgbVxL8MPfzSANAK8= k8s.io/apimachinery v0.35.2/go.mod h1:jQCgFZFR1F4Ik7hvr2g84RTJSZegBc8yHgFWKn//hns= -k8s.io/apiserver v0.35.0 h1:CUGo5o+7hW9GcAEF3x3usT3fX4f9r8xmgQeCBDaOgX4= -k8s.io/apiserver v0.35.0/go.mod h1:QUy1U4+PrzbJaM3XGu2tQ7U9A4udRRo5cyxkFX0GEds= k8s.io/apiserver v0.35.2 h1:rb52v0CZGEL0FkhjS+I6jHflAp7fZ4MIaKcEHX7wmDk= k8s.io/apiserver v0.35.2/go.mod h1:CROJUAu0tfjZLyYgSeBsBan2T7LUJGh0ucWwTCSSk7g= -k8s.io/cli-runtime v0.35.0 h1:PEJtYS/Zr4p20PfZSLCbY6YvaoLrfByd6THQzPworUE= -k8s.io/cli-runtime v0.35.0/go.mod h1:VBRvHzosVAoVdP3XwUQn1Oqkvaa8facnokNkD7jOTMY= k8s.io/cli-runtime v0.35.2 h1:3DNctzpPNXavqyrm/FFiT60TLk4UjUxuUMYbKOE970E= k8s.io/cli-runtime v0.35.2/go.mod h1:G2Ieu0JidLm5m1z9b0OkFhnykvJ1w+vjbz1tR5OFKL0= -k8s.io/client-go v0.35.0 h1:IAW0ifFbfQQwQmga0UdoH0yvdqrbwMdq9vIFEhRpxBE= -k8s.io/client-go v0.35.0/go.mod h1:q2E5AAyqcbeLGPdoRB+Nxe3KYTfPce1Dnu1myQdqz9o= k8s.io/client-go v0.35.2 h1:YUfPefdGJA4aljDdayAXkc98DnPkIetMl4PrKX97W9o= k8s.io/client-go v0.35.2/go.mod h1:4QqEwh4oQpeK8AaefZ0jwTFJw/9kIjdQi0jpKeYvz7g= -k8s.io/component-base v0.35.0 h1:+yBrOhzri2S1BVqyVSvcM3PtPyx5GUxCK2tinZz1G94= -k8s.io/component-base v0.35.0/go.mod h1:85SCX4UCa6SCFt6p3IKAPej7jSnF3L8EbfSyMZayJR0= k8s.io/component-base v0.35.2 h1:btgR+qNrpWuRSuvWSnQYsZy88yf5gVwemvz0yw79pGc= k8s.io/component-base v0.35.2/go.mod h1:B1iBJjooe6xIJYUucAxb26RwhAjzx0gHnqO9htWIX+0= -k8s.io/component-helpers v0.35.0 h1:wcXv7HJRksgVjM4VlXJ1CNFBpyDHruRI99RrBtrJceA= -k8s.io/component-helpers v0.35.0/go.mod h1:ahX0m/LTYmu7fL3W8zYiIwnQ/5gT28Ex4o2pymF63Co= k8s.io/component-helpers v0.35.2 h1:7Ea4CDgHnyOGrl3ZhD8e46SdTyf1itTONnreJ2Q52UM= k8s.io/component-helpers v0.35.2/go.mod h1:ybIoc8i92FG7xJFrBcEMzB8ul1wlZgfF0I4Z9w0V6VQ= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20260127142750-a19766b6e2d4 h1:HhDfevmPS+OalTjQRKbTHppRIz01AWi8s45TMXStgYY= k8s.io/kube-openapi v0.0.0-20260127142750-a19766b6e2d4/go.mod h1:kdmbQkyfwUagLfXIad1y2TdrjPFWp2Q89B3qkRwf/pQ= -k8s.io/kubectl v0.35.0 h1:cL/wJKHDe8E8+rP3G7avnymcMg6bH6JEcR5w5uo06wc= -k8s.io/kubectl v0.35.0/go.mod h1:VR5/TSkYyxZwrRwY5I5dDq6l5KXmiCb+9w8IKplk3Qo= k8s.io/kubectl v0.35.2 h1:aSmqhSOfsoG9NR5oR8OD5eMKpLN9x8oncxfqLHbJJII= k8s.io/kubectl v0.35.2/go.mod h1:+OJC779UsDJGxNPbHxCwvb4e4w9Eh62v/DNYU2TlsyM= -k8s.io/metrics v0.35.0 h1:xVFoqtAGm2dMNJAcB5TFZJPCen0uEqqNt52wW7ABbX8= -k8s.io/metrics v0.35.0/go.mod h1:g2Up4dcBygZi2kQSEQVDByFs+VUwepJMzzQLJJLpq4M= k8s.io/metrics v0.35.2 h1:PJRP88qeadR5evg4ZKJAh3NR3ICchwM51/Aidd0LHjc= k8s.io/metrics v0.35.2/go.mod h1:w1pJmSu2j8ftVI26MGcJtMnpmZ06oKwb4Enm+xVl06Q= k8s.io/utils v0.0.0-20260108192941-914a6e750570 h1:JT4W8lsdrGENg9W+YwwdLJxklIuKWdRm+BC+xt33FOY= k8s.io/utils v0.0.0-20260108192941-914a6e750570/go.mod h1:xDxuJ0whA3d0I4mf/C4ppKHxXynQ+fxnkmQH0vTHnuk= -sigs.k8s.io/controller-runtime v0.23.1 h1:TjJSM80Nf43Mg21+RCy3J70aj/W6KyvDtOlpKf+PupE= -sigs.k8s.io/controller-runtime v0.23.1/go.mod h1:B6COOxKptp+YaUT5q4l6LqUJTRpizbgf9KSRNdQGns0= sigs.k8s.io/controller-runtime v0.23.3 h1:VjB/vhoPoA9l1kEKZHBMnQF33tdCLQKJtydy4iqwZ80= sigs.k8s.io/controller-runtime v0.23.3/go.mod h1:B6COOxKptp+YaUT5q4l6LqUJTRpizbgf9KSRNdQGns0= sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 h1:IpInykpT6ceI+QxKBbEflcR5EXP7sU1kvOlxwZh5txg= diff --git a/internal/ai/agent.go b/internal/ai/agent.go new file mode 100644 index 0000000..b74918d --- /dev/null +++ b/internal/ai/agent.go @@ -0,0 +1,197 @@ +package ai + +import ( + "bufio" + "context" + "fmt" + "io" + "os" + + "go.datum.net/datumctl/internal/ai/llm" +) + +const ( + // historyWindowSize is the maximum number of messages sent to the LLM in a + // single Chat call. Older messages are dropped from the window (not from + // the full history slice) to avoid hitting provider token limits in long + // interactive sessions. A warning is printed to stderr when truncation occurs. + historyWindowSize = 40 +) + +// AgentOptions configures an Agent. +type AgentOptions struct { + LLM llm.LLMClient + Registry *Registry + SystemPrompt string + MaxIterations int + + // In/Out/ErrOut allow callers to substitute streams for testing. + // When nil, os.Stdin/os.Stdout/os.Stderr are used. + In io.Reader + Out io.Writer + ErrOut io.Writer + + // Interactive, when true, starts a REPL after the initial query completes. + Interactive bool + + // IsTerminal controls spinner and pipe-mode behaviour. When false, + // mutations are auto-declined and a clear message is shown. + IsTerminal bool +} + +// Agent runs the agentic loop. +type Agent struct { + opts AgentOptions + history []llm.Message +} + +// NewAgent creates an Agent from the given options. +func NewAgent(opts AgentOptions) *Agent { + if opts.In == nil { + opts.In = os.Stdin + } + if opts.Out == nil { + opts.Out = os.Stdout + } + if opts.ErrOut == nil { + opts.ErrOut = os.Stderr + } + if opts.MaxIterations <= 0 { + opts.MaxIterations = 20 + } + return &Agent{opts: opts} +} + +// Run executes the agentic loop starting with initialQuery. In interactive +// mode it loops, reading subsequent queries from stdin after each response. +func (a *Agent) Run(ctx context.Context, initialQuery string) error { + a.history = append(a.history, llm.Message{Role: llm.RoleUser, Content: initialQuery}) + + for { + if err := a.runOnce(ctx); err != nil { + return err + } + if !a.opts.Interactive { + return nil + } + // Read the next user input for the interactive REPL. + fmt.Fprintf(a.opts.Out, "\n> ") + sc := bufio.NewScanner(a.opts.In) + if !sc.Scan() { + return nil // EOF + } + line := sc.Text() + if line == "" || line == "exit" || line == "quit" { + return nil + } + a.history = append(a.history, llm.Message{Role: llm.RoleUser, Content: line}) + } +} + +// runOnce executes one question→tool-calls→answer cycle, up to MaxIterations. +func (a *Agent) runOnce(ctx context.Context) error { + toolDefs := a.opts.Registry.Defs() + + for iter := 0; iter < a.opts.MaxIterations; iter++ { + spinner := NewSpinner(a.opts.ErrOut, a.opts.IsTerminal) + spinner.Run() + + window := a.historyWindow() + + // For the final text response (no tool calls expected or on last-leg + // iterations) we stream directly to Out so the user sees tokens as they + // arrive. For intermediate tool-calling turns we collect the full + // response first, since tool call arguments must be complete before we + // can execute them. + resp, err := a.opts.LLM.StreamChat(ctx, a.opts.SystemPrompt, window, toolDefs, &spinnerClearWriter{ + spinner: spinner, + out: a.opts.Out, + }) + spinner.Stop() + if err != nil { + return fmt.Errorf("LLM error: %w", err) + } + a.history = append(a.history, resp) + + if len(resp.ToolCalls) == 0 { + // Text was already streamed; just add a trailing newline. + fmt.Fprintln(a.opts.Out) + return nil + } + + // Process all tool calls in the batch before the next Chat call. + for _, tc := range resp.ToolCalls { + result, isErr := a.executeToolCall(ctx, tc) + a.history = append(a.history, llm.Message{ + Role: llm.RoleToolResult, + ToolResult: &llm.ToolResult{ + CallID: tc.ID, + Content: result, + IsError: isErr, + }, + }) + } + } + + return fmt.Errorf("agentic loop exceeded %d iterations without completing", a.opts.MaxIterations) +} + +// spinnerClearWriter wraps the real output writer. On the very first Write it +// stops the spinner (clearing the spinner line) so streamed text begins on a +// clean line. Subsequent writes go straight through. +type spinnerClearWriter struct { + spinner *Spinner + out io.Writer + cleared bool +} + +func (w *spinnerClearWriter) Write(p []byte) (int, error) { + if !w.cleared { + w.spinner.Stop() + w.cleared = true + } + return w.out.Write(p) +} + +// executeToolCall finds the tool, handles confirmation gates, and runs it. +// Returns the result string and whether it represents an error. +func (a *Agent) executeToolCall(ctx context.Context, tc llm.ToolCall) (string, bool) { + tool, ok := a.opts.Registry.Find(tc.ToolName) + if !ok { + return fmt.Sprintf("unknown tool %q", tc.ToolName), true + } + + if tool.RequiresConfirm { + if !a.opts.IsTerminal { + fmt.Fprintf(a.opts.ErrOut, + "[ai] mutation skipped: %s requires interactive mode (not a terminal)\n", tc.ToolName) + return `{"skipped":true,"reason":"mutations require interactive mode — run without piping to apply changes"}`, false + } + PrintPreview(a.opts.ErrOut, tc) + if !Ask(a.opts.In, a.opts.ErrOut) { + return `{"skipped":true,"reason":"user declined"}`, false + } + } + + result, err := tool.Execute(ctx, tc.Arguments) + if err != nil { + fmt.Fprintf(a.opts.ErrOut, "[ai] tool %s error: %v\n", tc.ToolName, err) + return err.Error(), true + } + return result, false +} + +// historyWindow returns the slice of history to send to Chat, capped at +// historyWindowSize messages. A stderr warning is printed when truncation +// occurs to signal the user that context is being dropped. +func (a *Agent) historyWindow() []llm.Message { + if len(a.history) <= historyWindowSize { + return a.history + } + fmt.Fprintf(a.opts.ErrOut, + "[ai] warning: conversation history (%d messages) exceeds window (%d); oldest messages dropped\n", + len(a.history), historyWindowSize) + return a.history[len(a.history)-historyWindowSize:] +} + + diff --git a/internal/ai/config.go b/internal/ai/config.go new file mode 100644 index 0000000..61fe7ca --- /dev/null +++ b/internal/ai/config.go @@ -0,0 +1,102 @@ +package ai + +import ( + "fmt" + "os" + "path/filepath" + + syaml "sigs.k8s.io/yaml" +) + +// Config holds user-level AI preferences loaded from the config file. +// All fields are optional; zero values fall back to provider/env defaults. +// Flag values always take precedence over config file values. +type Config struct { + // LLM provider: "anthropic", "openai", or "gemini". Auto-detected from API keys if empty. + Provider string `json:"provider,omitempty" yaml:"provider"` + + // Model overrides the provider default (e.g. "claude-sonnet-4-6"). + Model string `json:"model,omitempty" yaml:"model"` + + // MaxIterations caps the agentic loop. Defaults to 20. + MaxIterations int `json:"max_iterations,omitempty" yaml:"max_iterations"` + + // Stream is reserved for v2. Always false in v1. + Stream bool `json:"stream,omitempty" yaml:"stream"` + + // Default context — overridden by --organization/--project/--namespace flags. + Organization string `json:"organization,omitempty" yaml:"organization"` + Project string `json:"project,omitempty" yaml:"project"` + Namespace string `json:"namespace,omitempty" yaml:"namespace"` + + // API keys — overridden by ANTHROPIC_API_KEY / OPENAI_API_KEY / GEMINI_API_KEY env vars. + // Stored here so users don't have to export env vars in every shell session. + AnthropicAPIKey string `json:"anthropic_api_key,omitempty" yaml:"anthropic_api_key"` + OpenAIAPIKey string `json:"openai_api_key,omitempty" yaml:"openai_api_key"` + GeminiAPIKey string `json:"gemini_api_key,omitempty" yaml:"gemini_api_key"` +} + +// ConfigFilePath returns the platform-appropriate path to the AI config file. +// On Linux/macOS: ~/.config/datumctl/ai.yaml +// On Windows: %AppData%\datumctl\ai.yaml +func ConfigFilePath() (string, error) { + dir, err := os.UserConfigDir() + if err != nil { + return "", fmt.Errorf("resolve config dir: %w", err) + } + return filepath.Join(dir, "datumctl", "ai.yaml"), nil +} + +// LoadConfig reads the config file and returns a Config. A missing file is not +// an error — the returned Config will have all zero values. +func LoadConfig() (Config, error) { + path, err := ConfigFilePath() + if err != nil { + return Config{}, err + } + data, err := os.ReadFile(path) + if os.IsNotExist(err) { + return Config{}, nil + } + if err != nil { + return Config{}, fmt.Errorf("read AI config %s: %w", path, err) + } + var cfg Config + if err := syaml.Unmarshal(data, &cfg); err != nil { + return Config{}, fmt.Errorf("parse AI config %s: %w", path, err) + } + return cfg, nil +} + +// SaveConfig writes cfg to the config file, creating the directory if needed. +func SaveConfig(cfg Config) error { + path, err := ConfigFilePath() + if err != nil { + return err + } + if err := os.MkdirAll(filepath.Dir(path), 0o700); err != nil { + return fmt.Errorf("create config dir: %w", err) + } + data, err := syaml.Marshal(cfg) + if err != nil { + return fmt.Errorf("marshal config: %w", err) + } + if err := os.WriteFile(path, data, 0o600); err != nil { + return fmt.Errorf("write AI config %s: %w", path, err) + } + return nil +} + +// ApplyEnvOverrides replaces empty API key fields with values from environment +// variables. Environment variables always win over config file values. +func (c *Config) ApplyEnvOverrides() { + if v := os.Getenv("ANTHROPIC_API_KEY"); v != "" { + c.AnthropicAPIKey = v + } + if v := os.Getenv("OPENAI_API_KEY"); v != "" { + c.OpenAIAPIKey = v + } + if v := os.Getenv("GEMINI_API_KEY"); v != "" { + c.GeminiAPIKey = v + } +} diff --git a/internal/ai/confirm.go b/internal/ai/confirm.go new file mode 100644 index 0000000..28a8e00 --- /dev/null +++ b/internal/ai/confirm.go @@ -0,0 +1,34 @@ +package ai + +import ( + "bufio" + "encoding/json" + "fmt" + "io" + "strings" + + "go.datum.net/datumctl/internal/ai/llm" +) + +// PrintPreview writes a human-readable description of a proposed mutating +// action to w. It is called by the agentic loop before prompting for +// confirmation. +func PrintPreview(w io.Writer, call llm.ToolCall) { + fmt.Fprintf(w, "\n--- Proposed action ---\n") + fmt.Fprintf(w, "Tool: %s\n", call.ToolName) + b, _ := json.MarshalIndent(call.Arguments, "", " ") + fmt.Fprintf(w, "Details:\n%s\n", string(b)) + fmt.Fprintf(w, "-----------------------\n") +} + +// Ask prompts the user on out and reads a response from in. It returns true +// only if the user explicitly types "y" (case-insensitive). Any other input +// or EOF is treated as "no". +func Ask(in io.Reader, out io.Writer) bool { + fmt.Fprint(out, "Apply changes? [y/N]: ") + sc := bufio.NewScanner(in) + if !sc.Scan() { + return false + } + return strings.ToLower(strings.TrimSpace(sc.Text())) == "y" +} diff --git a/internal/ai/llm/anthropic.go b/internal/ai/llm/anthropic.go new file mode 100644 index 0000000..7777efc --- /dev/null +++ b/internal/ai/llm/anthropic.go @@ -0,0 +1,418 @@ +package llm + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "strings" + "time" +) + +const ( + anthropicDefaultModel = "claude-sonnet-4-6" + anthropicAPIURL = "https://api.anthropic.com/v1/messages" + anthropicVersion = "2023-06-01" + anthropicMaxTokens = 4096 + anthropicMaxRetries = 3 + anthropicRetryInitialMs = 500 + anthropicRetryMaxMs = 30000 +) + +type anthropicClient struct { + apiKey string + model string +} + +func newAnthropicClient(cfg Config) (LLMClient, error) { + key := os.Getenv("ANTHROPIC_API_KEY") + if key == "" { + key = cfg.AnthropicAPIKey + } + if key == "" { + return nil, fmt.Errorf("no Anthropic API key; set ANTHROPIC_API_KEY or run: datumctl ai config set anthropic_api_key ") + } + model := cfg.Model + if model == "" { + model = anthropicDefaultModel + } + return &anthropicClient{apiKey: key, model: model}, nil +} + +func (c *anthropicClient) Provider() string { return "anthropic" } +func (c *anthropicClient) Model() string { return c.model } + +// --- wire types --- + +type anthropicRequest struct { + Model string `json:"model"` + MaxTokens int `json:"max_tokens"` + System string `json:"system,omitempty"` + Messages []anthropicMessage `json:"messages"` + Tools []anthropicTool `json:"tools,omitempty"` + Stream bool `json:"stream,omitempty"` +} + +type anthropicMessage struct { + Role string `json:"role"` + Content any `json:"content"` // string or []anthropicContent +} + +type anthropicContent struct { + Type string `json:"type"` + Text string `json:"text,omitempty"` + ID string `json:"id,omitempty"` + Name string `json:"name,omitempty"` + Input *map[string]any `json:"input,omitempty"` // pointer so nil=omit, &{}="{}" + ToolUseID string `json:"tool_use_id,omitempty"` + Content string `json:"content,omitempty"` + IsError bool `json:"is_error,omitempty"` +} + +type anthropicTool struct { + Name string `json:"name"` + Description string `json:"description"` + InputSchema map[string]any `json:"input_schema"` +} + +type anthropicResponse struct { + ID string `json:"id"` + Type string `json:"type"` + Role string `json:"role"` + Content []anthropicContent `json:"content"` + StopReason string `json:"stop_reason"` + Error *anthropicError `json:"error,omitempty"` +} + +type anthropicError struct { + Type string `json:"type"` + Message string `json:"message"` +} + +// Chat implements LLMClient. It converts the internal history to Anthropic's +// wire format, sends the request with retry logic, and converts the response +// back to the internal Message type. +func (c *anthropicClient) Chat(ctx context.Context, systemPrompt string, messages []Message, tools []ToolDef) (Message, error) { + wireMessages := toAnthropicMessages(messages) + + req := anthropicRequest{ + Model: c.model, + MaxTokens: anthropicMaxTokens, + System: systemPrompt, + Messages: wireMessages, + } + for _, t := range tools { + schema := t.InputSchema + if schema == nil { + schema = map[string]any{"type": "object", "properties": map[string]any{}} + } + req.Tools = append(req.Tools, anthropicTool{ + Name: t.Name, + Description: t.Description, + InputSchema: schema, + }) + } + + body, err := json.Marshal(req) + if err != nil { + return Message{}, fmt.Errorf("anthropic: marshal request: %w", err) + } + + var resp anthropicResponse + if err := c.doWithRetry(ctx, body, &resp); err != nil { + return Message{}, err + } + if resp.Error != nil { + return Message{}, fmt.Errorf("anthropic: %s: %s", resp.Error.Type, resp.Error.Message) + } + + return fromAnthropicResponse(resp), nil +} + +// toAnthropicMessages converts the internal history to Anthropic wire format. +// Key rules: +// - RoleUser → role "user", string content +// - RoleAssistant → role "assistant", content array (text + tool_use blocks) +// - RoleToolResult → batched into a single role "user" message with +// tool_result content blocks (consecutive RoleToolResult entries share one message) +func toAnthropicMessages(messages []Message) []anthropicMessage { + var result []anthropicMessage + + for i := 0; i < len(messages); { + msg := messages[i] + switch msg.Role { + case RoleUser: + result = append(result, anthropicMessage{Role: "user", Content: msg.Content}) + i++ + + case RoleAssistant: + var blocks []anthropicContent + if msg.Content != "" { + blocks = append(blocks, anthropicContent{Type: "text", Text: msg.Content}) + } + for _, tc := range msg.ToolCalls { + input := tc.Arguments + if input == nil { + input = map[string]any{} + } + blocks = append(blocks, anthropicContent{ + Type: "tool_use", + ID: tc.ID, + Name: tc.ToolName, + Input: &input, + }) + } + result = append(result, anthropicMessage{Role: "assistant", Content: blocks}) + i++ + + case RoleToolResult: + // Batch all consecutive tool_result messages into one user message. + var blocks []anthropicContent + for i < len(messages) && messages[i].Role == RoleToolResult { + tr := messages[i].ToolResult + block := anthropicContent{ + Type: "tool_result", + ToolUseID: tr.CallID, + Content: tr.Content, + } + if tr.IsError { + block.IsError = true + } + blocks = append(blocks, block) + i++ + } + result = append(result, anthropicMessage{Role: "user", Content: blocks}) + + default: + i++ + } + } + + return result +} + +// fromAnthropicResponse converts an Anthropic response to the internal Message type. +func fromAnthropicResponse(resp anthropicResponse) Message { + msg := Message{Role: RoleAssistant} + for _, block := range resp.Content { + switch block.Type { + case "text": + msg.Content += block.Text + case "tool_use": + var args map[string]any + if block.Input != nil { + args = *block.Input + } else { + args = map[string]any{} + } + msg.ToolCalls = append(msg.ToolCalls, ToolCall{ + ID: block.ID, + ToolName: block.Name, + Arguments: args, + }) + } + } + return msg +} + +// --- streaming wire types --- + +type anthropicStreamData struct { + Type string `json:"type"` + Index int `json:"index"` + Error *anthropicError `json:"error,omitempty"` + ContentBlock *anthropicStreamBlock `json:"content_block,omitempty"` + Delta *anthropicStreamDelta `json:"delta,omitempty"` +} + +type anthropicStreamBlock struct { + Type string `json:"type"` + ID string `json:"id,omitempty"` + Name string `json:"name,omitempty"` +} + +type anthropicStreamDelta struct { + Type string `json:"type"` + Text string `json:"text,omitempty"` + PartialJSON string `json:"partial_json,omitempty"` +} + +// StreamChat implements LLMClient. It uses Anthropic's streaming Messages API +// to write text delta chunks to textOut as they arrive, and reconstructs tool +// calls from accumulated input_json_delta events. +func (c *anthropicClient) StreamChat(ctx context.Context, systemPrompt string, messages []Message, tools []ToolDef, textOut io.Writer) (Message, error) { + wireMessages := toAnthropicMessages(messages) + req := anthropicRequest{ + Model: c.model, + MaxTokens: anthropicMaxTokens, + System: systemPrompt, + Messages: wireMessages, + Stream: true, + } + for _, t := range tools { + schema := t.InputSchema + if schema == nil { + schema = map[string]any{"type": "object", "properties": map[string]any{}} + } + req.Tools = append(req.Tools, anthropicTool{ + Name: t.Name, + Description: t.Description, + InputSchema: schema, + }) + } + + body, err := json.Marshal(req) + if err != nil { + return Message{}, fmt.Errorf("anthropic: marshal request: %w", err) + } + + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, anthropicAPIURL, bytes.NewReader(body)) + if err != nil { + return Message{}, fmt.Errorf("anthropic: build request: %w", err) + } + httpReq.Header.Set("x-api-key", c.apiKey) + httpReq.Header.Set("anthropic-version", anthropicVersion) + httpReq.Header.Set("content-type", "application/json") + httpReq.Header.Set("accept", "text/event-stream") + + resp, err := http.DefaultClient.Do(httpReq) + if err != nil { + return Message{}, fmt.Errorf("anthropic: http: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + respBody, _ := io.ReadAll(resp.Body) + return Message{}, fmt.Errorf("anthropic: HTTP %d: %s", resp.StatusCode, string(respBody)) + } + + // blockType tracks whether each content block index is "text" or "tool_use". + type toolCallAccum struct { + id string + name string + args strings.Builder + } + blockType := map[int]string{} + toolCalls := map[int]*toolCallAccum{} + toolOrder := []int{} // preserves insertion order + + var textBuf strings.Builder + for ev := range scanSSE(resp.Body) { + if ev.data == "" || ev.data == "[DONE]" { + continue + } + var d anthropicStreamData + if err := json.Unmarshal([]byte(ev.data), &d); err != nil { + continue + } + switch d.Type { + case "error": + if d.Error != nil { + return Message{}, fmt.Errorf("anthropic: %s: %s", d.Error.Type, d.Error.Message) + } + case "content_block_start": + if d.ContentBlock == nil { + continue + } + blockType[d.Index] = d.ContentBlock.Type + if d.ContentBlock.Type == "tool_use" { + toolCalls[d.Index] = &toolCallAccum{ + id: d.ContentBlock.ID, + name: d.ContentBlock.Name, + } + toolOrder = append(toolOrder, d.Index) + } + case "content_block_delta": + if d.Delta == nil { + continue + } + switch d.Delta.Type { + case "text_delta": + textBuf.WriteString(d.Delta.Text) + if textOut != nil { + fmt.Fprint(textOut, d.Delta.Text) + } + case "input_json_delta": + if tc, ok := toolCalls[d.Index]; ok { + tc.args.WriteString(d.Delta.PartialJSON) + } + } + } + } + + msg := Message{Role: RoleAssistant, Content: textBuf.String()} + for _, idx := range toolOrder { + tc := toolCalls[idx] + var args map[string]any + if tc.args.Len() > 0 { + _ = json.Unmarshal([]byte(tc.args.String()), &args) + } + if args == nil { + args = map[string]any{} + } + msg.ToolCalls = append(msg.ToolCalls, ToolCall{ + ID: tc.id, + ToolName: tc.name, + Arguments: args, + }) + } + return msg, nil +} + +func (c *anthropicClient) doWithRetry(ctx context.Context, body []byte, out *anthropicResponse) error { + delay := time.Duration(anthropicRetryInitialMs) * time.Millisecond + for attempt := 0; attempt < anthropicMaxRetries; attempt++ { + if attempt > 0 { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(delay): + } + delay *= 2 + if delay > time.Duration(anthropicRetryMaxMs)*time.Millisecond { + delay = time.Duration(anthropicRetryMaxMs) * time.Millisecond + } + } + + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, anthropicAPIURL, bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("anthropic: build request: %w", err) + } + httpReq.Header.Set("x-api-key", c.apiKey) + httpReq.Header.Set("anthropic-version", anthropicVersion) + httpReq.Header.Set("content-type", "application/json") + + resp, err := http.DefaultClient.Do(httpReq) + if err != nil { + return fmt.Errorf("anthropic: http: %w", err) + } + + respBody, err := io.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + return fmt.Errorf("anthropic: read response: %w", err) + } + + if resp.StatusCode == http.StatusTooManyRequests || + resp.StatusCode == http.StatusInternalServerError || + resp.StatusCode == http.StatusServiceUnavailable { + if attempt < anthropicMaxRetries-1 { + continue + } + return fmt.Errorf("anthropic: HTTP %d: %s", resp.StatusCode, string(respBody)) + } + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("anthropic: HTTP %d: %s", resp.StatusCode, string(respBody)) + } + + if err := json.Unmarshal(respBody, out); err != nil { + return fmt.Errorf("anthropic: unmarshal response: %w", err) + } + return nil + } + return fmt.Errorf("anthropic: max retries exceeded") +} diff --git a/internal/ai/llm/gemini.go b/internal/ai/llm/gemini.go new file mode 100644 index 0000000..98e423e --- /dev/null +++ b/internal/ai/llm/gemini.go @@ -0,0 +1,363 @@ +package llm + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "strings" + "time" +) + +const ( + geminiDefaultModel = "gemini-2.0-flash" + geminiAPIBase = "https://generativelanguage.googleapis.com/v1beta/models" + geminiMaxRetries = 3 + geminiRetryInitialMs = 500 + geminiRetryMaxMs = 30000 +) + +type geminiClient struct { + apiKey string + model string +} + +func newGeminiClient(cfg Config) (LLMClient, error) { + key := os.Getenv("GEMINI_API_KEY") + if key == "" { + key = cfg.GeminiAPIKey + } + if key == "" { + return nil, fmt.Errorf("no Gemini API key; set GEMINI_API_KEY or run: datumctl ai config set gemini_api_key ") + } + model := cfg.Model + if model == "" { + model = geminiDefaultModel + } + return &geminiClient{apiKey: key, model: model}, nil +} + +func (c *geminiClient) Provider() string { return "gemini" } +func (c *geminiClient) Model() string { return c.model } + +// --- wire types --- + +type geminiRequest struct { + Contents []geminiContent `json:"contents"` + Tools []geminiToolList `json:"tools,omitempty"` + SystemInstruction *geminiContent `json:"systemInstruction,omitempty"` +} + +type geminiContent struct { + Role string `json:"role,omitempty"` + Parts []geminiPart `json:"parts"` +} + +type geminiPart struct { + Text string `json:"text,omitempty"` + FunctionCall *geminiFunctionCall `json:"functionCall,omitempty"` + FunctionResponse *geminiFunctionResponse `json:"functionResponse,omitempty"` +} + +type geminiFunctionCall struct { + Name string `json:"name"` + Args map[string]any `json:"args"` +} + +type geminiFunctionResponse struct { + Name string `json:"name"` + Response map[string]any `json:"response"` +} + +type geminiToolList struct { + FunctionDeclarations []geminiFunctionDecl `json:"functionDeclarations"` +} + +type geminiFunctionDecl struct { + Name string `json:"name"` + Description string `json:"description"` + Parameters map[string]any `json:"parameters,omitempty"` +} + +type geminiResponse struct { + Candidates []geminiCandidate `json:"candidates"` + Error *geminiError `json:"error,omitempty"` +} + +type geminiCandidate struct { + Content geminiContent `json:"content"` + FinishReason string `json:"finishReason"` +} + +type geminiError struct { + Code int `json:"code"` + Message string `json:"message"` + Status string `json:"status"` +} + +// Chat implements LLMClient for Gemini. +func (c *geminiClient) Chat(ctx context.Context, systemPrompt string, messages []Message, tools []ToolDef) (Message, error) { + req := geminiRequest{ + Contents: toGeminiContents(messages), + } + if systemPrompt != "" { + req.SystemInstruction = &geminiContent{ + Parts: []geminiPart{{Text: systemPrompt}}, + } + } + if len(tools) > 0 { + var decls []geminiFunctionDecl + for _, t := range tools { + params := t.InputSchema + if params == nil { + params = map[string]any{"type": "object", "properties": map[string]any{}} + } + decls = append(decls, geminiFunctionDecl{ + Name: t.Name, + Description: t.Description, + Parameters: params, + }) + } + req.Tools = []geminiToolList{{FunctionDeclarations: decls}} + } + + body, err := json.Marshal(req) + if err != nil { + return Message{}, fmt.Errorf("gemini: marshal request: %w", err) + } + + var resp geminiResponse + if err := c.doWithRetry(ctx, body, &resp); err != nil { + return Message{}, err + } + if resp.Error != nil { + return Message{}, fmt.Errorf("gemini: %s (%d): %s", resp.Error.Status, resp.Error.Code, resp.Error.Message) + } + if len(resp.Candidates) == 0 { + return Message{}, fmt.Errorf("gemini: empty candidates in response") + } + + return fromGeminiContent(resp.Candidates[0].Content), nil +} + +// toGeminiContents converts the internal history to Gemini wire format. +// Gemini uses "user" and "model" roles; tool calls and responses are embedded +// as function call/response parts within those turns. +func toGeminiContents(messages []Message) []geminiContent { + var result []geminiContent + + for i := 0; i < len(messages); { + msg := messages[i] + switch msg.Role { + case RoleUser: + result = append(result, geminiContent{ + Role: "user", + Parts: []geminiPart{{Text: msg.Content}}, + }) + i++ + + case RoleAssistant: + var parts []geminiPart + if msg.Content != "" { + parts = append(parts, geminiPart{Text: msg.Content}) + } + for _, tc := range msg.ToolCalls { + parts = append(parts, geminiPart{ + FunctionCall: &geminiFunctionCall{Name: tc.ToolName, Args: tc.Arguments}, + }) + } + result = append(result, geminiContent{Role: "model", Parts: parts}) + i++ + + case RoleToolResult: + // Batch consecutive tool results into a single "user" turn. + var parts []geminiPart + for i < len(messages) && messages[i].Role == RoleToolResult { + tr := messages[i].ToolResult + responseMap := map[string]any{"output": tr.Content} + if tr.IsError { + responseMap = map[string]any{"error": tr.Content} + } + parts = append(parts, geminiPart{ + FunctionResponse: &geminiFunctionResponse{ + Name: tr.CallID, + Response: responseMap, + }, + }) + i++ + } + result = append(result, geminiContent{Role: "user", Parts: parts}) + + default: + i++ + } + } + + return result +} + +// fromGeminiContent converts a Gemini response content to the internal Message type. +func fromGeminiContent(c geminiContent) Message { + msg := Message{Role: RoleAssistant} + for _, part := range c.Parts { + if part.Text != "" { + msg.Content += part.Text + } + if part.FunctionCall != nil { + // Gemini does not provide stable IDs; use tool name as a stand-in. + // This works because we batch all results before the next Chat call. + msg.ToolCalls = append(msg.ToolCalls, ToolCall{ + ID: part.FunctionCall.Name, + ToolName: part.FunctionCall.Name, + Arguments: part.FunctionCall.Args, + }) + } + } + return msg +} + +// StreamChat implements LLMClient using Gemini's streamGenerateContent endpoint. +// Gemini streams a JSON array where each element is a full geminiResponse chunk. +// Text parts are written to textOut as they arrive; function call parts are +// accumulated and returned in the Message. +func (c *geminiClient) StreamChat(ctx context.Context, systemPrompt string, messages []Message, tools []ToolDef, textOut io.Writer) (Message, error) { + req := geminiRequest{Contents: toGeminiContents(messages)} + if systemPrompt != "" { + req.SystemInstruction = &geminiContent{ + Parts: []geminiPart{{Text: systemPrompt}}, + } + } + if len(tools) > 0 { + var decls []geminiFunctionDecl + for _, t := range tools { + params := t.InputSchema + if params == nil { + params = map[string]any{"type": "object", "properties": map[string]any{}} + } + decls = append(decls, geminiFunctionDecl{ + Name: t.Name, + Description: t.Description, + Parameters: params, + }) + } + req.Tools = []geminiToolList{{FunctionDeclarations: decls}} + } + + body, err := json.Marshal(req) + if err != nil { + return Message{}, fmt.Errorf("gemini: marshal request: %w", err) + } + + url := fmt.Sprintf("%s/%s:streamGenerateContent?alt=sse&key=%s", geminiAPIBase, c.model, c.apiKey) + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) + if err != nil { + return Message{}, fmt.Errorf("gemini: build request: %w", err) + } + httpReq.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(httpReq) + if err != nil { + return Message{}, fmt.Errorf("gemini: http: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + respBody, _ := io.ReadAll(resp.Body) + return Message{}, fmt.Errorf("gemini: HTTP %d: %s", resp.StatusCode, string(respBody)) + } + + var textBuf strings.Builder + msg := Message{Role: RoleAssistant} + + for ev := range scanSSE(resp.Body) { + if ev.data == "" || ev.data == "[DONE]" { + continue + } + var chunk geminiResponse + if err := json.Unmarshal([]byte(ev.data), &chunk); err != nil { + continue + } + if chunk.Error != nil { + return Message{}, fmt.Errorf("gemini: %s (%d): %s", chunk.Error.Status, chunk.Error.Code, chunk.Error.Message) + } + if len(chunk.Candidates) == 0 { + continue + } + for _, part := range chunk.Candidates[0].Content.Parts { + if part.Text != "" { + textBuf.WriteString(part.Text) + if textOut != nil { + fmt.Fprint(textOut, part.Text) + } + } + if part.FunctionCall != nil { + msg.ToolCalls = append(msg.ToolCalls, ToolCall{ + ID: part.FunctionCall.Name, + ToolName: part.FunctionCall.Name, + Arguments: part.FunctionCall.Args, + }) + } + } + } + + msg.Content = textBuf.String() + return msg, nil +} + +func (c *geminiClient) doWithRetry(ctx context.Context, body []byte, out *geminiResponse) error { + url := fmt.Sprintf("%s/%s:generateContent?key=%s", geminiAPIBase, c.model, c.apiKey) + delay := time.Duration(geminiRetryInitialMs) * time.Millisecond + + for attempt := 0; attempt < geminiMaxRetries; attempt++ { + if attempt > 0 { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(delay): + } + delay *= 2 + if delay > time.Duration(geminiRetryMaxMs)*time.Millisecond { + delay = time.Duration(geminiRetryMaxMs) * time.Millisecond + } + } + + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("gemini: build request: %w", err) + } + httpReq.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(httpReq) + if err != nil { + return fmt.Errorf("gemini: http: %w", err) + } + + respBody, err := io.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + return fmt.Errorf("gemini: read response: %w", err) + } + + if resp.StatusCode == http.StatusTooManyRequests || + resp.StatusCode == http.StatusInternalServerError || + resp.StatusCode == http.StatusServiceUnavailable { + if attempt < geminiMaxRetries-1 { + continue + } + return fmt.Errorf("gemini: HTTP %d: %s", resp.StatusCode, string(respBody)) + } + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("gemini: HTTP %d: %s", resp.StatusCode, string(respBody)) + } + + if err := json.Unmarshal(respBody, out); err != nil { + return fmt.Errorf("gemini: unmarshal response: %w", err) + } + return nil + } + return fmt.Errorf("gemini: max retries exceeded") +} diff --git a/internal/ai/llm/llm.go b/internal/ai/llm/llm.go new file mode 100644 index 0000000..91d82f7 --- /dev/null +++ b/internal/ai/llm/llm.go @@ -0,0 +1,121 @@ +// Package llm provides a provider-agnostic LLM client interface and shared +// types used by the datumctl ai agentic loop. +package llm + +import ( + "context" + "fmt" + "io" + "strings" +) + +// Role identifies the speaker of a conversation turn. +type Role string + +const ( + RoleUser Role = "user" + RoleAssistant Role = "assistant" + // RoleToolResult is an internal role for tool result messages stored in the + // shared history slice. It is NEVER sent verbatim to any provider. + // Each provider's Chat() implementation translates it to the appropriate + // wire format (Anthropic: role "user" with tool_result content blocks; + // OpenAI: role "tool"). + RoleToolResult Role = "tool_result" +) + +// Message is one turn in the conversation history. +type Message struct { + Role Role + Content string + ToolCalls []ToolCall // populated when the LLM requests tool invocations + ToolResult *ToolResult // populated for RoleToolResult messages +} + +// ToolCall represents the LLM's request to invoke a named tool. +type ToolCall struct { + ID string // provider-assigned ID used to correlate results + ToolName string + Arguments map[string]any +} + +// ToolResult is the response fed back to the LLM after a tool executes. +type ToolResult struct { + CallID string + Content string + IsError bool +} + +// ToolDef is the schema the LLM sees when deciding which tools to call. +type ToolDef struct { + Name string + Description string + InputSchema map[string]any // JSON Schema object +} + +// LLMClient is the provider abstraction used by the agentic loop. +type LLMClient interface { + // Chat sends the conversation history and available tools to the provider + // and returns the next assistant message. The system prompt is not included + // in messages — providers receive it separately via their constructor config. + Chat(ctx context.Context, systemPrompt string, messages []Message, tools []ToolDef) (Message, error) + + // StreamChat is like Chat but streams text delta chunks to textOut as they + // arrive from the provider. Tool-call arguments are accumulated internally + // and returned in the Message. textOut may be nil to suppress streaming. + StreamChat(ctx context.Context, systemPrompt string, messages []Message, tools []ToolDef, textOut io.Writer) (Message, error) + + Provider() string + Model() string +} + +// Config holds provider, model, and API key preferences for NewClient. +// API keys here are fallbacks; environment variables always take precedence. +type Config struct { + Provider string + Model string + AnthropicAPIKey string // fallback if ANTHROPIC_API_KEY env var is unset + OpenAIAPIKey string // fallback if OPENAI_API_KEY env var is unset + GeminiAPIKey string // fallback if GEMINI_API_KEY env var is unset +} + +// NewClient constructs an LLMClient using the following priority: +// 1. Model name prefix: claude-→Anthropic, gpt-/o1/o3→OpenAI, gemini-→Gemini +// 2. cfg.Provider explicit override +// 3. Which API key is available (env var > config file key) +func NewClient(cfg Config) (LLMClient, error) { + if cfg.Model != "" { + switch { + case strings.HasPrefix(cfg.Model, "claude-"): + return newAnthropicClient(cfg) + case strings.HasPrefix(cfg.Model, "gpt-"), + strings.HasPrefix(cfg.Model, "o1"), + strings.HasPrefix(cfg.Model, "o3"): + return newOpenAIClient(cfg) + case strings.HasPrefix(cfg.Model, "gemini-"): + return newGeminiClient(cfg) + } + } + + switch cfg.Provider { + case "anthropic": + return newAnthropicClient(cfg) + case "openai": + return newOpenAIClient(cfg) + case "gemini": + return newGeminiClient(cfg) + } + + // Auto-detect from available API keys. + switch { + case cfg.AnthropicAPIKey != "": + return newAnthropicClient(cfg) + case cfg.OpenAIAPIKey != "": + return newOpenAIClient(cfg) + case cfg.GeminiAPIKey != "": + return newGeminiClient(cfg) + default: + return nil, fmt.Errorf( + "no LLM API key found; set ANTHROPIC_API_KEY, OPENAI_API_KEY, or GEMINI_API_KEY,\n" + + "or run: datumctl ai config set anthropic_api_key ") + } +} diff --git a/internal/ai/llm/openai.go b/internal/ai/llm/openai.go new file mode 100644 index 0000000..626a9ba --- /dev/null +++ b/internal/ai/llm/openai.go @@ -0,0 +1,397 @@ +package llm + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "strings" + "time" +) + +const ( + openaiDefaultModel = "gpt-4o" + openaiAPIURL = "https://api.openai.com/v1/chat/completions" + openaiMaxRetries = 3 + openaiRetryInitialMs = 500 + openaiRetryMaxMs = 30000 +) + +type openaiClient struct { + apiKey string + model string +} + +func newOpenAIClient(cfg Config) (LLMClient, error) { + key := os.Getenv("OPENAI_API_KEY") + if key == "" { + key = cfg.OpenAIAPIKey + } + if key == "" { + return nil, fmt.Errorf("no OpenAI API key; set OPENAI_API_KEY or run: datumctl ai config set openai_api_key ") + } + model := cfg.Model + if model == "" { + model = openaiDefaultModel + } + return &openaiClient{apiKey: key, model: model}, nil +} + +func (c *openaiClient) Provider() string { return "openai" } +func (c *openaiClient) Model() string { return c.model } + +// --- wire types --- + +type openaiRequest struct { + Model string `json:"model"` + Messages []openaiMessage `json:"messages"` + Tools []openaiTool `json:"tools,omitempty"` +} + +type openaiMessage struct { + Role string `json:"role"` + Content any `json:"content"` // string or null + ToolCalls []openaiToolCall `json:"tool_calls,omitempty"` + ToolCallID string `json:"tool_call_id,omitempty"` + Name string `json:"name,omitempty"` +} + +type openaiToolCall struct { + ID string `json:"id"` + Type string `json:"type"` + Function openaiToolFunction `json:"function"` +} + +type openaiToolFunction struct { + Name string `json:"name"` + Arguments string `json:"arguments"` // JSON string +} + +type openaiTool struct { + Type string `json:"type"` + Function openaiToolDef `json:"function"` +} + +type openaiToolDef struct { + Name string `json:"name"` + Description string `json:"description"` + Parameters map[string]any `json:"parameters"` +} + +type openaiResponse struct { + Choices []openaiChoice `json:"choices"` + Error *openaiError `json:"error,omitempty"` +} + +type openaiChoice struct { + Message openaiMessage `json:"message"` + FinishReason string `json:"finish_reason"` +} + +type openaiError struct { + Message string `json:"message"` + Type string `json:"type"` +} + +// Chat implements LLMClient for OpenAI. +func (c *openaiClient) Chat(ctx context.Context, systemPrompt string, messages []Message, tools []ToolDef) (Message, error) { + wireMessages := []openaiMessage{} + if systemPrompt != "" { + wireMessages = append(wireMessages, openaiMessage{Role: "system", Content: systemPrompt}) + } + wireMessages = append(wireMessages, toOpenAIMessages(messages)...) + + req := openaiRequest{ + Model: c.model, + Messages: wireMessages, + } + for _, t := range tools { + params := t.InputSchema + if params == nil { + params = map[string]any{"type": "object", "properties": map[string]any{}} + } + req.Tools = append(req.Tools, openaiTool{ + Type: "function", + Function: openaiToolDef{ + Name: t.Name, + Description: t.Description, + Parameters: params, + }, + }) + } + + body, err := json.Marshal(req) + if err != nil { + return Message{}, fmt.Errorf("openai: marshal request: %w", err) + } + + var resp openaiResponse + if err := c.doWithRetry(ctx, body, &resp); err != nil { + return Message{}, err + } + if resp.Error != nil { + return Message{}, fmt.Errorf("openai: %s: %s", resp.Error.Type, resp.Error.Message) + } + if len(resp.Choices) == 0 { + return Message{}, fmt.Errorf("openai: empty choices in response") + } + + return fromOpenAIMessage(resp.Choices[0].Message), nil +} + +// toOpenAIMessages converts internal history to OpenAI wire format. +func toOpenAIMessages(messages []Message) []openaiMessage { + var result []openaiMessage + for _, msg := range messages { + switch msg.Role { + case RoleUser: + result = append(result, openaiMessage{Role: "user", Content: msg.Content}) + + case RoleAssistant: + m := openaiMessage{Role: "assistant", Content: msg.Content} + for _, tc := range msg.ToolCalls { + argsJSON, _ := json.Marshal(tc.Arguments) + m.ToolCalls = append(m.ToolCalls, openaiToolCall{ + ID: tc.ID, + Type: "function", + Function: openaiToolFunction{ + Name: tc.ToolName, + Arguments: string(argsJSON), + }, + }) + } + if len(m.ToolCalls) > 0 { + m.Content = nil // OpenAI expects null content when tool_calls present + } + result = append(result, m) + + case RoleToolResult: + tr := msg.ToolResult + content := tr.Content + if tr.IsError { + content = "error: " + content + } + result = append(result, openaiMessage{ + Role: "tool", + Content: content, + ToolCallID: tr.CallID, + }) + } + } + return result +} + +// fromOpenAIMessage converts an OpenAI response message to the internal type. +func fromOpenAIMessage(m openaiMessage) Message { + msg := Message{Role: RoleAssistant} + if s, ok := m.Content.(string); ok { + msg.Content = s + } + for _, tc := range m.ToolCalls { + var args map[string]any + _ = json.Unmarshal([]byte(tc.Function.Arguments), &args) + msg.ToolCalls = append(msg.ToolCalls, ToolCall{ + ID: tc.ID, + ToolName: tc.Function.Name, + Arguments: args, + }) + } + return msg +} + +// StreamChat implements LLMClient using OpenAI's streaming chat completions API. +func (c *openaiClient) StreamChat(ctx context.Context, systemPrompt string, messages []Message, tools []ToolDef, textOut io.Writer) (Message, error) { + wireMessages := []openaiMessage{} + if systemPrompt != "" { + wireMessages = append(wireMessages, openaiMessage{Role: "system", Content: systemPrompt}) + } + wireMessages = append(wireMessages, toOpenAIMessages(messages)...) + + type openaiStreamRequest struct { + Model string `json:"model"` + Messages []openaiMessage `json:"messages"` + Tools []openaiTool `json:"tools,omitempty"` + Stream bool `json:"stream"` + } + req := openaiStreamRequest{ + Model: c.model, + Messages: wireMessages, + Stream: true, + } + for _, t := range tools { + params := t.InputSchema + if params == nil { + params = map[string]any{"type": "object", "properties": map[string]any{}} + } + req.Tools = append(req.Tools, openaiTool{ + Type: "function", + Function: openaiToolDef{ + Name: t.Name, + Description: t.Description, + Parameters: params, + }, + }) + } + + body, err := json.Marshal(req) + if err != nil { + return Message{}, fmt.Errorf("openai: marshal request: %w", err) + } + + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, openaiAPIURL, bytes.NewReader(body)) + if err != nil { + return Message{}, fmt.Errorf("openai: build request: %w", err) + } + httpReq.Header.Set("Authorization", "Bearer "+c.apiKey) + httpReq.Header.Set("Content-Type", "application/json") + httpReq.Header.Set("Accept", "text/event-stream") + + resp, err := http.DefaultClient.Do(httpReq) + if err != nil { + return Message{}, fmt.Errorf("openai: http: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + respBody, _ := io.ReadAll(resp.Body) + return Message{}, fmt.Errorf("openai: HTTP %d: %s", resp.StatusCode, string(respBody)) + } + + // openaiIndexedTC is the streaming tool_call delta with an explicit index. + type openaiIndexedTC struct { + Index int `json:"index"` + ID string `json:"id"` + Function openaiToolFunction `json:"function"` + } + type openaiStreamChunkRich struct { + Choices []struct { + Delta struct { + Content string `json:"content"` + ToolCalls []openaiIndexedTC `json:"tool_calls"` + } `json:"delta"` + } `json:"choices"` + Error *openaiError `json:"error,omitempty"` + } + + // toolAccum accumulates streamed tool call fragments by index. + type toolAccum struct { + id string + name string + args strings.Builder + } + toolByIndex := map[int]*toolAccum{} + toolOrder := []int{} + + var textBuf strings.Builder + for ev := range scanSSE(resp.Body) { + if ev.data == "" || ev.data == "[DONE]" { + continue + } + var chunk openaiStreamChunkRich + if err := json.Unmarshal([]byte(ev.data), &chunk); err != nil { + continue + } + if chunk.Error != nil { + return Message{}, fmt.Errorf("openai: %s: %s", chunk.Error.Type, chunk.Error.Message) + } + if len(chunk.Choices) == 0 { + continue + } + delta := chunk.Choices[0].Delta + if delta.Content != "" { + textBuf.WriteString(delta.Content) + if textOut != nil { + fmt.Fprint(textOut, delta.Content) + } + } + for _, tc := range delta.ToolCalls { + if _, exists := toolByIndex[tc.Index]; !exists { + toolByIndex[tc.Index] = &toolAccum{} + toolOrder = append(toolOrder, tc.Index) + } + ta := toolByIndex[tc.Index] + if tc.ID != "" && ta.id == "" { + ta.id = tc.ID + } + if tc.Function.Name != "" && ta.name == "" { + ta.name = tc.Function.Name + } + ta.args.WriteString(tc.Function.Arguments) + } + } + + msg := Message{Role: RoleAssistant, Content: textBuf.String()} + for _, idx := range toolOrder { + ta := toolByIndex[idx] + var args map[string]any + if ta.args.Len() > 0 { + _ = json.Unmarshal([]byte(ta.args.String()), &args) + } + if args == nil { + args = map[string]any{} + } + msg.ToolCalls = append(msg.ToolCalls, ToolCall{ + ID: ta.id, + ToolName: ta.name, + Arguments: args, + }) + } + return msg, nil +} + +func (c *openaiClient) doWithRetry(ctx context.Context, body []byte, out *openaiResponse) error { + delay := time.Duration(openaiRetryInitialMs) * time.Millisecond + for attempt := 0; attempt < openaiMaxRetries; attempt++ { + if attempt > 0 { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(delay): + } + delay *= 2 + if delay > time.Duration(openaiRetryMaxMs)*time.Millisecond { + delay = time.Duration(openaiRetryMaxMs) * time.Millisecond + } + } + + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, openaiAPIURL, bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("openai: build request: %w", err) + } + httpReq.Header.Set("Authorization", "Bearer "+c.apiKey) + httpReq.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(httpReq) + if err != nil { + return fmt.Errorf("openai: http: %w", err) + } + + respBody, err := io.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + return fmt.Errorf("openai: read response: %w", err) + } + + if resp.StatusCode == http.StatusTooManyRequests || + resp.StatusCode == http.StatusInternalServerError || + resp.StatusCode == http.StatusServiceUnavailable { + if attempt < openaiMaxRetries-1 { + continue + } + return fmt.Errorf("openai: HTTP %d: %s", resp.StatusCode, string(respBody)) + } + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("openai: HTTP %d: %s", resp.StatusCode, string(respBody)) + } + + if err := json.Unmarshal(respBody, out); err != nil { + return fmt.Errorf("openai: unmarshal response: %w", err) + } + return nil + } + return fmt.Errorf("openai: max retries exceeded") +} diff --git a/internal/ai/llm/sse.go b/internal/ai/llm/sse.go new file mode 100644 index 0000000..8afb2b5 --- /dev/null +++ b/internal/ai/llm/sse.go @@ -0,0 +1,45 @@ +package llm + +import ( + "bufio" + "io" + "strings" +) + +// sseEvent is a parsed Server-Sent Event. +type sseEvent struct { + typ string // from "event: " line + data string // from "data: " line +} + +// scanSSE reads SSE events from r and sends them on the returned channel. +// The channel is closed when r is exhausted or returns an error. +// Each complete event (delimited by a blank line) is sent as one sseEvent. +func scanSSE(r io.Reader) <-chan sseEvent { + ch := make(chan sseEvent, 16) + go func() { + defer close(ch) + sc := bufio.NewScanner(r) + var ev sseEvent + for sc.Scan() { + line := sc.Text() + switch { + case line == "": + // Blank line = event boundary. + if ev.data != "" || ev.typ != "" { + ch <- ev + ev = sseEvent{} + } + case strings.HasPrefix(line, "event:"): + ev.typ = strings.TrimSpace(strings.TrimPrefix(line, "event:")) + case strings.HasPrefix(line, "data:"): + ev.data = strings.TrimSpace(strings.TrimPrefix(line, "data:")) + } + } + // Flush any trailing event not followed by a blank line. + if ev.data != "" || ev.typ != "" { + ch <- ev + } + }() + return ch +} diff --git a/internal/ai/prompt.go b/internal/ai/prompt.go new file mode 100644 index 0000000..fc0c80f --- /dev/null +++ b/internal/ai/prompt.go @@ -0,0 +1,67 @@ +package ai + +import "fmt" + +// BuildSystemPrompt constructs the system prompt for the agentic loop, +// injecting the current organization, project, and namespace context. +func BuildSystemPrompt(org, project, namespace string) string { + orgDisplay := org + if orgDisplay == "" { + orgDisplay = "(none)" + } + projectDisplay := project + if projectDisplay == "" { + projectDisplay = "(none)" + } + + contextSection := fmt.Sprintf(`Current context: + Organization: %s + Project: %s + Namespace: %s`, orgDisplay, projectDisplay, namespace) + + toolsSection := `RESOURCE DISCOVERY: +- Always call list_resource_types first if you are unsure what resource types exist. +- Call get_resource_schema before generating any manifest to ensure field correctness. +- Do not guess at field names or resource structures. + +CONTEXT MODEL: +- Resources live under either an organization or a project. +- Namespaced resources also require a namespace (typically "default"). +- The organization/project/namespace context is already configured for this session. + You do not need to include them in tool arguments unless explicitly switching context. + +MUTATION CONFIRMATION: +- For apply_manifest and delete_resource, the user will be shown a preview and asked + to confirm before execution. This is enforced by the system. +- If the user declines, you will receive a tool result indicating the action was + skipped. Accept this gracefully and ask if they would like to do something else.` + + if org == "" && project == "" { + toolsSection = `NO CONTEXT SET: +- No --organization or --project was provided. Resource management tools are not available. +- Answer general questions about Datum Cloud, explain concepts, and suggest commands. +- If the user wants to list or manage resources, ask them to re-run with + --organization or --project . +- To find their org ID: datumctl get organizations +- To find their project ID: datumctl get projects --organization ` + } + + return fmt.Sprintf(`You are an AI assistant for Datum Cloud, a connectivity infrastructure platform. +You help users manage Datum Cloud resources from their terminal. + +CRITICAL: This is NOT Kubernetes. Datum Cloud has its own resource types. +There are NO pods, NO deployments, NO services, NO nodes, NO configmaps, +NO secrets, NO daemonsets, NO statefulsets, NO replicasets, NO ingresses. +Do not suggest or attempt to create any of these resource types. + +%s + +%s + +RESPONSE STYLE: +- Be concise and factual. Summarize results clearly. +- When listing resources, prefer a table or structured summary over raw YAML. +- When something fails, explain what went wrong and suggest a corrective action. +- Do not repeat full YAML blobs in your response unless the user explicitly asks.`, + contextSection, toolsSection) +} diff --git a/internal/ai/spinner.go b/internal/ai/spinner.go new file mode 100644 index 0000000..5eb65a5 --- /dev/null +++ b/internal/ai/spinner.go @@ -0,0 +1,67 @@ +package ai + +import ( + "fmt" + "io" + "sync" + "time" +) + +const spinnerInterval = 100 * time.Millisecond + +var spinnerFrames = []string{"|", "/", "-", "\\"} + +// Spinner displays an animated thinking indicator on w. Call Stop to halt it. +// The spinner only runs when isTerminal is true; otherwise it is a no-op. +// Stop may be called multiple times safely. +type Spinner struct { + w io.Writer + stop chan struct{} + done chan struct{} + isTerminal bool + once sync.Once +} + +// NewSpinner creates a Spinner that writes to w. Start it with Run. +func NewSpinner(w io.Writer, isTerminal bool) *Spinner { + return &Spinner{ + w: w, + stop: make(chan struct{}), + done: make(chan struct{}), + isTerminal: isTerminal, + } +} + +// Run starts the spinner in the background. It returns immediately. +func (s *Spinner) Run() { + if !s.isTerminal { + close(s.done) + return + } + go func() { + defer close(s.done) + i := 0 + for { + fmt.Fprintf(s.w, "\r\033[K%s Thinking...", spinnerFrames[i%len(spinnerFrames)]) + i++ + select { + case <-s.stop: + // Clear the spinner line so streamed text starts clean. + fmt.Fprint(s.w, "\r\033[K") + return + case <-time.After(spinnerInterval): + } + } + }() +} + +// Stop halts the spinner and waits for the goroutine to finish. +// Safe to call multiple times. +func (s *Spinner) Stop() { + s.once.Do(func() { + if s.isTerminal { + close(s.stop) + } + }) + <-s.done +} diff --git a/internal/ai/tools.go b/internal/ai/tools.go new file mode 100644 index 0000000..ee2dcd7 --- /dev/null +++ b/internal/ai/tools.go @@ -0,0 +1,423 @@ +package ai + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + syaml "sigs.k8s.io/yaml" + + "go.datum.net/datumctl/internal/ai/llm" + "go.datum.net/datumctl/internal/client" + mcpsvc "go.datum.net/datumctl/internal/mcp" +) + +// Tool is a single capability exposed to the LLM. The Execute function is +// called when the LLM requests invocation; RequiresConfirm gates writes. +type Tool struct { + Def llm.ToolDef + RequiresConfirm bool + Execute func(ctx context.Context, args map[string]any) (string, error) +} + +// Registry holds the set of tools available in an agentic session. +type Registry struct { + tools []Tool +} + +// NewRegistry builds the full tool set by wrapping mcp.Service methods. +// apply_manifest calls svc.K.Apply directly since Service.K is exported. +func NewRegistry(svc *mcpsvc.Service) *Registry { + r := &Registry{} + + r.add(Tool{ + Def: llm.ToolDef{ + Name: "list_resource_types", + Description: "List all Datum Cloud resource types available in the current context.", + InputSchema: map[string]any{ + "type": "object", + "properties": map[string]any{}, + }, + }, + Execute: func(ctx context.Context, _ map[string]any) (string, error) { + resp, err := svc.ListCRDs(ctx) + if err != nil { + // Fall back to the discovery API when CRD access is forbidden. + items, discErr := svc.K.DiscoverResourceTypes() + if discErr != nil { + return "", fmt.Errorf("list resource types (CRD forbidden: %v; discovery: %v)", err, discErr) + } + b, _ := json.Marshal(map[string]any{"items": items}) + return string(b), nil + } + b, _ := json.Marshal(resp) + return string(b), nil + }, + }) + + r.add(Tool{ + Def: llm.ToolDef{ + Name: "get_resource_schema", + Description: "Get the full schema for a Datum Cloud resource type by CRD name.", + InputSchema: map[string]any{ + "type": "object", + "properties": map[string]any{ + "name": map[string]any{ + "type": "string", + "description": "CRD name, e.g. dnszones.networking.datumapis.com", + }, + "mode": map[string]any{ + "type": "string", + "description": "Output format: yaml (default), json, or describe", + "enum": []string{"yaml", "json", "describe"}, + }, + }, + "required": []string{"name"}, + }, + }, + Execute: func(ctx context.Context, args map[string]any) (string, error) { + name, _ := args["name"].(string) + mode, _ := args["mode"].(string) + resp, err := svc.GetCRD(ctx, mcpsvc.GetCRDReq{Name: name, Mode: mode}) + if err != nil { + // CRD access is restricted — fall back to the OpenAPI V3 schema. + // CRD names are in the form "plural.group", e.g. "dnszones.networking.datumapis.com" + // Extract the group and try to find the API version from discovery. + parts := strings.SplitN(name, ".", 2) + if len(parts) == 2 { + group := parts[1] + // Find the version for this group via discovery. + lists, _ := svc.K.DiscoverResourceTypes() + version := "" + for _, item := range lists { + if g, _ := item["group"].(string); g == group { + if vs, ok := item["versions"].([]string); ok && len(vs) > 0 { + version = vs[0] + break + } + } + } + if version != "" { + schema, oaErr := svc.K.GetOpenAPISchema(group + "/" + version) + if oaErr == nil { + return schema, nil + } + } + } + return fmt.Sprintf(`{"error":"schema not available","hint":"CRD and OpenAPI access both failed. Use list_resources to inspect existing resources of this type and infer field structure from their spec.","crd":"%s"}`, name), nil + } + return resp.Text, nil + }, + }) + + r.add(Tool{ + Def: llm.ToolDef{ + Name: "list_resources", + Description: "List Datum Cloud resources of a given type.", + InputSchema: map[string]any{ + "type": "object", + "properties": map[string]any{ + "kind": map[string]any{ + "type": "string", + "description": "Resource kind, e.g. DNSZone", + }, + "apiVersion": map[string]any{ + "type": "string", + "description": "API version, e.g. networking.datumapis.com/v1alpha (optional)", + }, + "namespace": map[string]any{ + "type": "string", + "description": "Namespace (optional, uses session default if omitted)", + }, + "labelSelector": map[string]any{ + "type": "string", + "description": "Label selector to filter results (optional)", + }, + "limit": map[string]any{ + "type": "integer", + "description": "Maximum number of results (optional)", + }, + }, + "required": []string{"kind"}, + }, + }, + Execute: func(ctx context.Context, args map[string]any) (string, error) { + req := mcpsvc.ListResourcesReq{ + Kind: stringArg(args, "kind"), + APIVersion: stringArg(args, "apiVersion"), + Namespace: stringArg(args, "namespace"), + LabelSelector: stringArg(args, "labelSelector"), + } + if v, ok := args["limit"]; ok { + switch n := v.(type) { + case float64: + i := int64(n) + req.Limit = &i + case int64: + req.Limit = &n + } + } + resp, err := svc.ListResources(ctx, req) + if err != nil { + return "", err + } + return resp.Text, nil + }, + }) + + r.add(Tool{ + Def: llm.ToolDef{ + Name: "get_resource", + Description: "Get a single Datum Cloud resource by kind and name.", + InputSchema: map[string]any{ + "type": "object", + "properties": map[string]any{ + "kind": map[string]any{ + "type": "string", + "description": "Resource kind, e.g. DNSZone", + }, + "name": map[string]any{ + "type": "string", + "description": "Resource name", + }, + "apiVersion": map[string]any{ + "type": "string", + "description": "API version (optional)", + }, + "namespace": map[string]any{ + "type": "string", + "description": "Namespace (optional, uses session default if omitted)", + }, + "format": map[string]any{ + "type": "string", + "description": "Output format: yaml (default) or json", + "enum": []string{"yaml", "json"}, + }, + }, + "required": []string{"kind", "name"}, + }, + }, + Execute: func(ctx context.Context, args map[string]any) (string, error) { + resp, err := svc.GetResource(ctx, mcpsvc.GetResourceReq{ + Kind: stringArg(args, "kind"), + Name: stringArg(args, "name"), + APIVersion: stringArg(args, "apiVersion"), + Namespace: stringArg(args, "namespace"), + Format: stringArg(args, "format"), + }) + if err != nil { + return "", err + } + return resp.Text, nil + }, + }) + + r.add(Tool{ + Def: llm.ToolDef{ + Name: "validate_manifest", + Description: "Validate a resource manifest via server-side dry run without persisting changes.", + InputSchema: map[string]any{ + "type": "object", + "properties": map[string]any{ + "yaml": map[string]any{ + "type": "string", + "description": "YAML manifest to validate", + }, + }, + "required": []string{"yaml"}, + }, + }, + Execute: func(ctx context.Context, args map[string]any) (string, error) { + resp := svc.ValidateYAML(ctx, mcpsvc.ValidateReq{ + YAML: stringArg(args, "yaml"), + }) + b, _ := json.Marshal(resp) + return string(b), nil + }, + }) + + r.add(Tool{ + Def: llm.ToolDef{ + Name: "apply_manifest", + Description: "Create or update a Datum Cloud resource by applying a YAML manifest (server-side apply / upsert). Requires confirmation.", + InputSchema: map[string]any{ + "type": "object", + "properties": map[string]any{ + "yaml": map[string]any{ + "type": "string", + "description": "YAML manifest describing the desired resource state", + }, + "dryRun": map[string]any{ + "type": "boolean", + "description": "If true, validate only without persisting (default false)", + }, + }, + "required": []string{"yaml"}, + }, + }, + RequiresConfirm: true, + Execute: func(ctx context.Context, args map[string]any) (string, error) { + rawYAML := stringArg(args, "yaml") + dryRun, _ := args["dryRun"].(bool) + + obj, err := parseYAMLManifest(rawYAML) + if err != nil { + return "", fmt.Errorf("parse manifest: %w", err) + } + + spec, _, _ := unstructured.NestedMap(obj.Object, "spec") + labels := obj.GetLabels() + annotations := obj.GetAnnotations() + + result, err := svc.K.Apply(ctx, client.ApplyOptions{ + Kind: obj.GetKind(), + APIVersion: obj.GetAPIVersion(), + Name: obj.GetName(), + Namespace: obj.GetNamespace(), + Labels: labels, + Annotations: annotations, + Spec: spec, + DryRun: dryRun, + }) + if err != nil { + return "", err + } + return fmt.Sprintf("applied %s/%s (resourceVersion: %s)", + result.GetKind(), result.GetName(), result.GetResourceVersion()), nil + }, + }) + + r.add(Tool{ + Def: llm.ToolDef{ + Name: "delete_resource", + Description: "Delete a Datum Cloud resource by kind and name. Always requires confirmation.", + InputSchema: map[string]any{ + "type": "object", + "properties": map[string]any{ + "kind": map[string]any{ + "type": "string", + "description": "Resource kind, e.g. DNSZone", + }, + "name": map[string]any{ + "type": "string", + "description": "Resource name", + }, + "apiVersion": map[string]any{ + "type": "string", + "description": "API version (optional)", + }, + "namespace": map[string]any{ + "type": "string", + "description": "Namespace (optional, uses session default if omitted)", + }, + }, + "required": []string{"kind", "name"}, + }, + }, + RequiresConfirm: true, + Execute: func(ctx context.Context, args map[string]any) (string, error) { + dryRun := false + _, err := svc.DeleteResource(ctx, mcpsvc.DeleteResourceReq{ + Kind: stringArg(args, "kind"), + Name: stringArg(args, "name"), + APIVersion: stringArg(args, "apiVersion"), + Namespace: stringArg(args, "namespace"), + DryRun: &dryRun, + }) + if err != nil { + return "", err + } + return fmt.Sprintf("deleted %s/%s", stringArg(args, "kind"), stringArg(args, "name")), nil + }, + }) + + r.add(Tool{ + Def: llm.ToolDef{ + Name: "change_context", + Description: "Switch the active organization, project, or namespace for the current session.", + InputSchema: map[string]any{ + "type": "object", + "properties": map[string]any{ + "org": map[string]any{ + "type": "string", + "description": "New organization ID (mutually exclusive with project)", + }, + "project": map[string]any{ + "type": "string", + "description": "New project ID (mutually exclusive with org)", + }, + "namespace": map[string]any{ + "type": "string", + "description": "New default namespace", + }, + }, + }, + }, + Execute: func(ctx context.Context, args map[string]any) (string, error) { + resp, err := svc.ChangeContext(ctx, mcpsvc.ChangeContextReq{ + Org: stringArg(args, "org"), + Project: stringArg(args, "project"), + Namespace: stringArg(args, "namespace"), + }) + if err != nil { + return "", err + } + b, _ := json.Marshal(resp) + return string(b), nil + }, + }) + + return r +} + +// Defs returns the ToolDef slice to pass to LLMClient.Chat. +func (r *Registry) Defs() []llm.ToolDef { + defs := make([]llm.ToolDef, len(r.tools)) + for i, t := range r.tools { + defs[i] = t.Def + } + return defs +} + +// Find looks up a tool by name. Returns nil, false if not found. +func (r *Registry) Find(name string) (*Tool, bool) { + for i := range r.tools { + if r.tools[i].Def.Name == name { + return &r.tools[i], true + } + } + return nil, false +} + +func (r *Registry) add(t Tool) { r.tools = append(r.tools, t) } + +// NewEmptyRegistry returns a Registry with no tools, used when no org/project +// context is set and the agent is answering general questions only. +func NewEmptyRegistry() *Registry { return &Registry{} } + +// --- helpers --- + +func stringArg(args map[string]any, key string) string { + if v, ok := args[key]; ok { + if s, ok := v.(string); ok { + return s + } + } + return "" +} + +// parseYAMLManifest parses a raw YAML string into an unstructured.Unstructured object. +func parseYAMLManifest(rawYAML string) (*unstructured.Unstructured, error) { + // sigs.k8s.io/yaml converts YAML to JSON first, then unmarshals. + jsonBytes, err := syaml.YAMLToJSON([]byte(rawYAML)) + if err != nil { + return nil, fmt.Errorf("yaml to json: %w", err) + } + var obj map[string]any + if err := json.Unmarshal(jsonBytes, &obj); err != nil { + return nil, fmt.Errorf("unmarshal manifest: %w", err) + } + return &unstructured.Unstructured{Object: obj}, nil +} diff --git a/internal/client/context_switch.go b/internal/client/context_switch.go index 87a6fcc..ae32cc8 100644 --- a/internal/client/context_switch.go +++ b/internal/client/context_switch.go @@ -11,6 +11,13 @@ import ( "go.datum.net/datumctl/internal/authutil" ) +// RestConfigForContext constructs a *rest.Config scoped to exactly one of +// organizationID or projectID. It is the shared implementation used by both +// internal/cmd/mcp and internal/cmd/ai. +func RestConfigForContext(ctx context.Context, organizationID, projectID string) (*rest.Config, error) { + return restConfigFor(ctx, organizationID, projectID) +} + func NewForProject(ctx context.Context, projectID, defaultNamespace string) (*K8sClient, error) { cfg, err := restConfigFor(ctx, "", projectID) if err != nil { diff --git a/internal/client/k8s_client.go b/internal/client/k8s_client.go index dc22441..a684dd8 100644 --- a/internal/client/k8s_client.go +++ b/internal/client/k8s_client.go @@ -5,6 +5,8 @@ import ( "context" "encoding/json" "fmt" + "io" + "net/http" "strings" apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" @@ -86,6 +88,78 @@ func (c *K8sClient) GetCRD(ctx context.Context, name string) (*apiextv1.CustomRe return crd, nil } +// DiscoverResourceTypes returns all API resource types using the discovery API. +// Unlike ListCRDs, this does not require access to the apiextensions.k8s.io API +// and works with any authenticated user. +func (c *K8sClient) DiscoverResourceTypes() ([]map[string]any, error) { + _, lists, err := c.disco.ServerGroupsAndResources() + if err != nil { + return nil, fmt.Errorf("discovery: %w", err) + } + var items []map[string]any + for _, list := range lists { + gv := list.GroupVersion + var group, version string + if idx := strings.LastIndex(gv, "/"); idx >= 0 { + group, version = gv[:idx], gv[idx+1:] + } else { + version = gv + } + for _, r := range list.APIResources { + if strings.Contains(r.Name, "/") { + continue // skip subresources + } + items = append(items, map[string]any{ + "name": r.Name + "." + group, + "group": group, + "kind": r.Kind, + "versions": []string{version}, + }) + } + } + return items, nil +} + +// GetOpenAPISchema fetches the OpenAPI V3 schema for a specific API group/version +// directly from the server. This is accessible to any authenticated user and does +// not require CRD access. Returns the raw JSON schema document. +func (c *K8sClient) GetOpenAPISchema(groupVersion string) (string, error) { + // Build the OpenAPI v3 path: /openapi/v3/apis/{group}/{version} + // For core group (e.g. "v1"): /openapi/v3/api/v1 + var path string + parts := strings.SplitN(groupVersion, "/", 2) + if len(parts) == 2 { + path = fmt.Sprintf("/openapi/v3/apis/%s/%s", parts[0], parts[1]) + } else { + path = fmt.Sprintf("/openapi/v3/api/%s", groupVersion) + } + + httpClient, err := rest.HTTPClientFor(c.cfg) + if err != nil { + return "", fmt.Errorf("build http client: %w", err) + } + url := strings.TrimRight(c.cfg.Host, "/") + path + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return "", fmt.Errorf("build request: %w", err) + } + req.Header.Set("Accept", "application/json") + + resp, err := httpClient.Do(req) + if err != nil { + return "", fmt.Errorf("openapi request: %w", err) + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return "", fmt.Errorf("read openapi response: %w", err) + } + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("openapi HTTP %d: %s", resp.StatusCode, string(body)) + } + return string(body), nil +} + // ValidateYAML validates one or more YAML documents using Create with server-side dry-run. // // Semantics: diff --git a/internal/cmd/ai/ai.go b/internal/cmd/ai/ai.go new file mode 100644 index 0000000..ae2acb5 --- /dev/null +++ b/internal/cmd/ai/ai.go @@ -0,0 +1,196 @@ +// Package ai defines the `datumctl ai` cobra command. +package ai + +import ( + "errors" + "fmt" + "io" + "os" + "strings" + + "github.com/spf13/cobra" + "golang.org/x/term" + + datumai "go.datum.net/datumctl/internal/ai" + "go.datum.net/datumctl/internal/ai/llm" + "go.datum.net/datumctl/internal/client" + mcpsvc "go.datum.net/datumctl/internal/mcp" +) + +// Command returns the cobra.Command for `datumctl ai`. +func Command() *cobra.Command { + var ( + organization string + project string + namespace string + model string + maxIter int + ) + + cmd := &cobra.Command{ + Use: "ai [query]", + Short: "Ask a natural-language question about your Datum Cloud resources", + Long: `Start an AI-powered assistant that translates natural language into +Datum Cloud operations. + +In interactive mode (no query argument), the assistant maintains conversation +context so you can ask follow-up questions. Read operations execute immediately; +write operations always show a preview and ask for confirmation. + +Configuration is read from the ai config file (see 'datumctl ai config show'). +Flag values override config file values. API keys in the config file are +overridden by environment variables (ANTHROPIC_API_KEY, OPENAI_API_KEY, GEMINI_API_KEY).`, + Example: ` # First-time setup (store API key and default org) + datumctl ai config set anthropic_api_key sk-ant-... + datumctl ai config set organization my-org-id + + # Then just run it — no flags needed + datumctl ai "list all DNS zones" + + # Override the default org for one query + datumctl ai "list projects" --organization other-org-id + + # Interactive session + datumctl ai --project my-project-id`, + RunE: func(cmd *cobra.Command, args []string) error { + // Load config file first — flags override below. + aiCfg, err := datumai.LoadConfig() + if err != nil { + fmt.Fprintf(cmd.ErrOrStderr(), "[ai] warning: could not load config: %v\n", err) + } + + // Environment variables override config file API keys. + aiCfg.ApplyEnvOverrides() + + // Flags override config file context. + if organization != "" { + aiCfg.Organization = organization + } + if project != "" { + aiCfg.Project = project + } + if namespace != "" { + aiCfg.Namespace = namespace + } + if model != "" { + aiCfg.Model = model + } + if cmd.Flags().Changed("max-iterations") { + aiCfg.MaxIterations = maxIter + } + if aiCfg.MaxIterations <= 0 { + aiCfg.MaxIterations = 20 + } + if aiCfg.Namespace == "" { + aiCfg.Namespace = "default" + } + + // Validate: cannot set both org and project. + if aiCfg.Organization != "" && aiCfg.Project != "" { + return errors.New("organization and project are mutually exclusive; unset one with 'datumctl ai config unset'") + } + + hasContext := aiCfg.Organization != "" || aiCfg.Project != "" + + // Determine terminal/interactive mode. + isTTY := term.IsTerminal(int(os.Stdin.Fd())) + isInteractive := isTTY && len(args) == 0 + + // Resolve the initial query. + query, err := resolveQuery(args, isTTY) + if err != nil { + return err + } + + // Construct the LLM client. + llmClient, err := llm.NewClient(llm.Config{ + Provider: aiCfg.Provider, + Model: aiCfg.Model, + AnthropicAPIKey: aiCfg.AnthropicAPIKey, + OpenAIAPIKey: aiCfg.OpenAIAPIKey, + GeminiAPIKey: aiCfg.GeminiAPIKey, + }) + if err != nil { + return fmt.Errorf("initialize LLM: %w\n\nRun 'datumctl ai config set anthropic_api_key ' to save your key", err) + } + fmt.Fprintf(cmd.ErrOrStderr(), "[ai] using %s/%s\n", llmClient.Provider(), llmClient.Model()) + + var registry *datumai.Registry + if hasContext { + cfg, err := client.RestConfigForContext(cmd.Context(), aiCfg.Organization, aiCfg.Project) + if err != nil { + return err + } + k, err := client.NewK8sFromRESTConfig(cfg) + if err != nil { + return err + } + k.Namespace = aiCfg.Namespace + if err := k.Preflight(cmd.Context()); err != nil { + return err + } + svc := mcpsvc.NewService(k) + registry = datumai.NewRegistry(svc) + } else { + fmt.Fprintf(cmd.ErrOrStderr(), "[ai] no organization or project set — running without resource tools\n") + fmt.Fprintf(cmd.ErrOrStderr(), "[ai] tip: run 'datumctl ai config set organization ' to set a default\n") + registry = datumai.NewEmptyRegistry() + } + + systemPrompt := datumai.BuildSystemPrompt(aiCfg.Organization, aiCfg.Project, aiCfg.Namespace) + + agent := datumai.NewAgent(datumai.AgentOptions{ + LLM: llmClient, + Registry: registry, + SystemPrompt: systemPrompt, + MaxIterations: aiCfg.MaxIterations, + In: cmd.InOrStdin(), + Out: cmd.OutOrStdout(), + ErrOut: cmd.ErrOrStderr(), + Interactive: isInteractive, + IsTerminal: isTTY, + }) + + if isInteractive { + fmt.Fprintf(cmd.OutOrStdout(), "Datum Cloud AI assistant (type 'exit' to quit)\n") + if hasContext { + fmt.Fprintf(cmd.OutOrStdout(), "Organization: %s Project: %s Namespace: %s\n\n", + aiCfg.Organization, aiCfg.Project, aiCfg.Namespace) + } else { + fmt.Fprintf(cmd.OutOrStdout(), "No context set — add --organization or --project to manage resources.\n\n") + } + } + + return agent.Run(cmd.Context(), query) + }, + } + + cmd.Flags().StringVar(&organization, "organization", "", "Organization context (overrides config file)") + cmd.Flags().StringVar(&project, "project", "", "Project context (overrides config file)") + cmd.Flags().StringVar(&namespace, "namespace", "", "Default namespace (overrides config file)") + cmd.Flags().StringVar(&model, "model", "", "Model override, e.g. claude-sonnet-4-6, gpt-4o, gemini-2.0-flash") + cmd.Flags().IntVar(&maxIter, "max-iterations", 20, "Agentic loop iteration cap") + + cmd.AddCommand(configCommand()) + + return cmd +} + +// resolveQuery determines the initial query string from CLI args or stdin. +func resolveQuery(args []string, isTTY bool) (string, error) { + if len(args) > 0 { + return strings.Join(args, " "), nil + } + if !isTTY { + data, err := io.ReadAll(os.Stdin) + if err != nil { + return "", fmt.Errorf("read stdin: %w", err) + } + q := strings.TrimSpace(string(data)) + if q == "" { + return "", errors.New("no query provided via argument or stdin") + } + return q, nil + } + return "Hello! What would you like to do with your Datum Cloud resources?", nil +} diff --git a/internal/cmd/ai/config_cmd.go b/internal/cmd/ai/config_cmd.go new file mode 100644 index 0000000..cc7d15a --- /dev/null +++ b/internal/cmd/ai/config_cmd.go @@ -0,0 +1,199 @@ +package ai + +import ( + "fmt" + "strings" + + "github.com/spf13/cobra" + syaml "sigs.k8s.io/yaml" + + datumai "go.datum.net/datumctl/internal/ai" +) + +// validKeys is the set of keys accepted by `datumctl ai config set`. +var validKeys = map[string]string{ + "organization": "Default organization ID (overridden by --organization flag)", + "project": "Default project ID (overridden by --project flag)", + "namespace": "Default namespace (overridden by --namespace flag)", + "provider": "LLM provider: anthropic, openai, or gemini", + "model": "LLM model override (e.g. claude-sonnet-4-6, gpt-4o)", + "max_iterations": "Agentic loop iteration cap (default 20)", + "anthropic_api_key": "Anthropic API key (overridden by ANTHROPIC_API_KEY env var)", + "openai_api_key": "OpenAI API key (overridden by OPENAI_API_KEY env var)", + "gemini_api_key": "Gemini API key (overridden by GEMINI_API_KEY env var)", +} + +func configCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "config", + Short: "Manage datumctl ai configuration", + Long: `Read and write the datumctl ai configuration file. + +Settings in the config file provide defaults for every 'datumctl ai' invocation. +Flag values always override config file values; environment variables always +override config file API keys.`, + } + cmd.AddCommand(configSetCommand(), configShowCommand(), configUnsetCommand()) + return cmd +} + +func configSetCommand() *cobra.Command { + return &cobra.Command{ + Use: "set ", + Short: "Set a configuration value", + Long: `Set a key in the datumctl ai configuration file. + +Valid keys: + organization Default organization ID + project Default project ID + namespace Default namespace (default: "default") + provider LLM provider: anthropic, openai, or gemini + model LLM model (e.g. claude-sonnet-4-6, gpt-4o, gemini-2.0-flash) + max_iterations Agentic loop iteration cap (default 20) + anthropic_api_key Anthropic API key + openai_api_key OpenAI API key + gemini_api_key Gemini API key`, + Example: ` datumctl ai config set organization datum-demos-iy50km + datumctl ai config set anthropic_api_key sk-ant-... + datumctl ai config set model claude-sonnet-4-6`, + Args: cobra.ExactArgs(2), + RunE: func(cmd *cobra.Command, args []string) error { + key, value := args[0], args[1] + if _, ok := validKeys[key]; !ok { + return fmt.Errorf("unknown config key %q; valid keys: %s", + key, strings.Join(sortedKeys(validKeys), ", ")) + } + + cfg, err := datumai.LoadConfig() + if err != nil { + return err + } + + if err := setConfigKey(&cfg, key, value); err != nil { + return err + } + + if err := datumai.SaveConfig(cfg); err != nil { + return err + } + + path, _ := datumai.ConfigFilePath() + fmt.Fprintf(cmd.OutOrStdout(), "Set %s in %s\n", key, path) + return nil + }, + } +} + +func configUnsetCommand() *cobra.Command { + return &cobra.Command{ + Use: "unset ", + Short: "Remove a configuration value", + Example: ` datumctl ai config unset organization + datumctl ai config unset anthropic_api_key`, + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + key := args[0] + if _, ok := validKeys[key]; !ok { + return fmt.Errorf("unknown config key %q", key) + } + + cfg, err := datumai.LoadConfig() + if err != nil { + return err + } + + if err := setConfigKey(&cfg, key, ""); err != nil { + return err + } + + if err := datumai.SaveConfig(cfg); err != nil { + return err + } + + path, _ := datumai.ConfigFilePath() + fmt.Fprintf(cmd.OutOrStdout(), "Unset %s in %s\n", key, path) + return nil + }, + } +} + +func configShowCommand() *cobra.Command { + return &cobra.Command{ + Use: "show", + Short: "Show the current configuration", + Example: ` datumctl ai config show`, + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + path, err := datumai.ConfigFilePath() + if err != nil { + return err + } + fmt.Fprintf(cmd.OutOrStdout(), "# %s\n", path) + + cfg, err := datumai.LoadConfig() + if err != nil { + return err + } + + // Redact API keys for display. + display := cfg + display.AnthropicAPIKey = redact(cfg.AnthropicAPIKey) + display.OpenAIAPIKey = redact(cfg.OpenAIAPIKey) + display.GeminiAPIKey = redact(cfg.GeminiAPIKey) + + out, err := syaml.Marshal(display) + if err != nil { + return err + } + fmt.Fprint(cmd.OutOrStdout(), string(out)) + return nil + }, + } +} + +func setConfigKey(cfg *datumai.Config, key, value string) error { + switch key { + case "organization": + cfg.Organization = value + case "project": + cfg.Project = value + case "namespace": + cfg.Namespace = value + case "provider": + cfg.Provider = value + case "model": + cfg.Model = value + case "max_iterations": + if value == "" { + cfg.MaxIterations = 0 + return nil + } + var n int + if _, err := fmt.Sscanf(value, "%d", &n); err != nil { + return fmt.Errorf("max_iterations must be an integer, got %q", value) + } + cfg.MaxIterations = n + case "anthropic_api_key": + cfg.AnthropicAPIKey = value + case "openai_api_key": + cfg.OpenAIAPIKey = value + case "gemini_api_key": + cfg.GeminiAPIKey = value + } + return nil +} + +func redact(s string) string { + if len(s) <= 8 { + return strings.Repeat("*", len(s)) + } + return s[:4] + strings.Repeat("*", len(s)-8) + s[len(s)-4:] +} + +func sortedKeys(m map[string]string) []string { + keys := make([]string, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + return keys +} diff --git a/internal/cmd/auth/auth.go b/internal/cmd/auth/auth.go index 01205ab..cdc2b21 100644 --- a/internal/cmd/auth/auth.go +++ b/internal/cmd/auth/auth.go @@ -22,6 +22,17 @@ Typical workflow: Advanced — kubectl integration: If you use kubectl and want to point it at a Datum Cloud control plane, see 'datumctl auth update-kubeconfig --help'.`, + Example: ` # Log in to Datum Cloud + datumctl auth login + + # Show all logged-in accounts + datumctl auth list + + # Switch the active account + datumctl auth switch user@example.com + + # Log out a specific account + datumctl auth logout user@example.com`, } cmd.AddCommand( diff --git a/internal/cmd/docs/docs.go b/internal/cmd/docs/docs.go index a04feae..85e227f 100644 --- a/internal/cmd/docs/docs.go +++ b/internal/cmd/docs/docs.go @@ -16,6 +16,11 @@ Subcommands: generate-cli-docs Generate markdown documentation files for all datumctl commands (used to build the published CLI reference at datum.net/docs).`, + Example: ` # Browse platform-wide APIs in a local Swagger UI + datumctl docs openapi + + # Generate CLI reference docs into a local directory + datumctl docs generate-cli-docs --output-dir /tmp/datumctl-docs`, } cmd.AddCommand(OpenAPICmd()) diff --git a/internal/cmd/mcp/mcp.go b/internal/cmd/mcp/mcp.go index 13a4698..9e1e417 100644 --- a/internal/cmd/mcp/mcp.go +++ b/internal/cmd/mcp/mcp.go @@ -2,16 +2,10 @@ package mcp import ( - "context" "errors" - "fmt" - "net/http" "github.com/spf13/cobra" - "golang.org/x/oauth2" - "k8s.io/client-go/rest" - "go.datum.net/datumctl/internal/authutil" "go.datum.net/datumctl/internal/client" serversvc "go.datum.net/datumctl/internal/mcp" ) @@ -69,7 +63,7 @@ Exactly one of --organization or --project is required.`, } // Build *rest.Config from Datum context (no kubeconfig reliance). - cfg, err := restConfigFromFlags(cmd.Context(), organization, project) + cfg, err := client.RestConfigForContext(cmd.Context(), organization, project) if err != nil { return err } @@ -105,37 +99,3 @@ Exactly one of --organization or --project is required.`, return cmd } -// restConfigFromFlags constructs a client-go *rest.Config using the same auth + host -// pattern as internal/client/user_context.go, but scoped to an org OR a project. -func restConfigFromFlags(ctx context.Context, organizationID, projectID string) (*rest.Config, error) { - // OIDC token & API hostname from stored credentials - tknSrc, err := authutil.GetTokenSource(ctx) - if err != nil { - return nil, fmt.Errorf("get token source: %w", err) - } - apiHostname, err := authutil.GetAPIHostname() - if err != nil { - return nil, fmt.Errorf("get API hostname: %w", err) - } - - if (organizationID == "") == (projectID == "") { - return nil, errors.New("exactly one of organizationID or projectID must be provided") - } - - // Build the control-plane endpoint similar to user_context.go - var host string - if organizationID != "" { - host = fmt.Sprintf("https://%s/apis/resourcemanager.miloapis.com/v1alpha1/organizations/%s/control-plane", - apiHostname, organizationID) - } else { - host = fmt.Sprintf("https://%s/apis/resourcemanager.miloapis.com/v1alpha1/projects/%s/control-plane", - apiHostname, projectID) - } - - return &rest.Config{ - Host: host, - WrapTransport: func(rt http.RoundTripper) http.RoundTripper { - return &oauth2.Transport{Source: tknSrc, Base: rt} - }, - }, nil -} diff --git a/internal/cmd/root.go b/internal/cmd/root.go index 2e386c0..fb8998d 100644 --- a/internal/cmd/root.go +++ b/internal/cmd/root.go @@ -8,6 +8,7 @@ import ( "go.datum.net/datumctl/internal/cmd/auth" "go.datum.net/datumctl/internal/cmd/create" "go.datum.net/datumctl/internal/cmd/docs" + aicmd "go.datum.net/datumctl/internal/cmd/ai" "go.datum.net/datumctl/internal/cmd/mcp" activity "go.miloapis.com/activity/pkg/cmd" "k8s.io/cli-runtime/pkg/genericclioptions" @@ -212,6 +213,24 @@ what would be deleted before committing.` rootCmd.AddCommand(WrapResourceCommand(deleteCmd)) createCmd := create.NewCmdCreate(factory, ioStreams) + createCmd.Short = "Create a Datum Cloud resource from a file or stdin" + createCmd.Long = `Create a new Datum Cloud resource by providing a manifest in YAML or JSON +format, either from a file or piped through stdin. + +datumctl create accepts Datum Cloud resource manifests — not Kubernetes +built-in resources. Use 'datumctl apply' for idempotent creation or updates. + +Resource manifests must specify the correct apiVersion and kind for the +Datum Cloud resource type. Use 'datumctl explain ' to see the schema +for a resource type and 'datumctl api-resources' to list available types.` + createCmd.Example = ` # Create a project from a manifest file + datumctl create -f ./project.yaml --organization + + # Create a resource from stdin + cat dnszone.yaml | datumctl create -f - --project + + # Validate the resource without creating it + datumctl create -f ./project.yaml --organization --dry-run=server` hideFlags(createCmd, "allow-missing-template-keys", "kustomize", "template", "save-config", ) @@ -431,6 +450,7 @@ the server.` rootCmd.AddCommand(versionCmd) rootCmd.AddCommand(mcp.Command()) + rootCmd.AddCommand(aicmd.Command()) activityCmd := activity.NewActivityCommand(activity.ActivityCommandOptions{ Factory: factory,