diff --git a/AI/modelcloud/README.md b/AI/modelcloud/README.md new file mode 100644 index 000000000..e430dda86 --- /dev/null +++ b/AI/modelcloud/README.md @@ -0,0 +1,148 @@ +# modelcloud example + +## Goal: Production-grade inference on AI-conformant kubernetes clusters + +This goal of this example is to build up production-grade inference +on AI-conformant kubernetes clusters. + +We (aspirationally) aim to demonstrate the capabilities of the AI-conformance +profile. Where we cannot achieve production-grade inference, we hope to +motivate discussion of extensions to the AI-conformance profile to plug those gaps. + +## Walkthrough + +### Deploying to a kubernetes cluster + +Create a kubernetes cluster, we currently test with GKE and gcr.io but do not aim +to depend on non-conformant functionality; PRs to add support for deployment +to other conformant environments are very welcome. + +1. From the modelcloud directory, run `dev/tools/push-images` to push to `gcr.io/$(gcloud config get project)/...` + +1. Run `dev/tools/deploy-to-kube` to deploy. + +We deploy two workloads: + +1. `blob-server`, a statefulset with a persistent volume to hold the model blobs (files) + +1. `gemma3`, a deployment running vLLM, with a frontend go process that will download the model from `blob-server`. + +### Uploading a model + +For now, we will likely be dealing with models from huggingface. + +Begin by cloning the model locally (and note that currently only google/gemma-3-1b-it is supported): + +``` +git clone https://huggingface.co/google/gemma-3-1b-it +``` + +If you now run `go run ./cmd/model-hasher --src gemma-3-1b-it` you should see it print the hashes for each file: + +``` +spec: + files: + - hash: 1468e03275c15d522c721bd987f03c4ee21b8e246b901803c67448bbc25acf58 + path: .gitattributes + - hash: 60be259533fe3acba21d109a51673815a4c29aefdb7769862695086fcedbeb7a + path: README.md + - hash: 50b2f405ba56a26d4913fd772089992252d7f942123cc0a034d96424221ba946 + path: added_tokens.json + - hash: 19cb5d28c97778271ba2b3c3df47bf76bdd6706724777a2318b3522230afe91e + path: config.json + - hash: fd9324becc53c4be610db39e13a613006f09fd6ef71a95fb6320dc33157490a3 + path: generation_config.json + - hash: 3d4ef8d71c14db7e448a09ebe891cfb6bf32c57a9b44499ae0d1c098e48516b6 + path: model.safetensors + - hash: 2f7b0adf4fb469770bb1490e3e35df87b1dc578246c5e7e6fc76ecf33213a397 + path: special_tokens_map.json + - hash: 4667f2089529e8e7657cfb6d1c19910ae71ff5f28aa7ab2ff2763330affad795 + path: tokenizer.json + - hash: 1299c11d7cf632ef3b4e11937501358ada021bbdf7c47638d13c0ee982f2e79c + path: tokenizer.model + - hash: bfe25c2735e395407beb78456ea9a6984a1f00d8c16fa04a8b75f2a614cf53e1 + path: tokenizer_config.json +``` + +Inside the vllm-frontend, this [list of files is currently embedded](cmd/vllm-frontend/models/google/gemma-3-1b-it/model.yaml). +This is why we only support gemma-3-1b-it today, though we plan to relax this in future (e.g. a CRD?) + +The blob-server accepts uploads, and we can upload the blobs using a port-forward: + +``` +kubectl port-forward blob-server-0 8081:8080 & +go run ./cmd/model-hasher/ --src gemma-3-1b-it/ --upload http://127.0.0.1:8081 +``` + +This will then store the blobs on the persistent disk of blob-server, so they are now available in-cluster, +you can verify this with `kubectl debug`: + +``` +> kubectl debug blob-server-0 -it --image=debian:latest --profile=general --share-processes --target blob-server +root@blob-server-0:/# cat /proc/1/mounts | grep blob +/dev/sdb /blobs ext4 rw,relatime 0 0 +root@blob-server-0:/# ls -l /proc/1/root/blobs/ +total 1991324 +-rw------- 1 root root 4689074 Sep 8 21:40 1299c11d7cf632ef3b4e11937501358ada021bbdf7c47638d13c0ee982f2e79c +-rw------- 1 root root 1676 Sep 8 21:39 1468e03275c15d522c721bd987f03c4ee21b8e246b901803c67448bbc25acf58 +-rw------- 1 root root 899 Sep 8 21:39 19cb5d28c97778271ba2b3c3df47bf76bdd6706724777a2318b3522230afe91e +-rw------- 1 root root 662 Sep 8 21:40 2f7b0adf4fb469770bb1490e3e35df87b1dc578246c5e7e6fc76ecf33213a397 +-rw------- 1 root root 1999811208 Sep 8 21:40 3d4ef8d71c14db7e448a09ebe891cfb6bf32c57a9b44499ae0d1c098e48516b6 +-rw------- 1 root root 33384568 Sep 8 21:40 4667f2089529e8e7657cfb6d1c19910ae71ff5f28aa7ab2ff2763330affad795 +-rw------- 1 root root 35 Sep 8 21:39 50b2f405ba56a26d4913fd772089992252d7f942123cc0a034d96424221ba946 +-rw------- 1 root root 24265 Sep 8 21:39 60be259533fe3acba21d109a51673815a4c29aefdb7769862695086fcedbeb7a +-rw------- 1 root root 1156999 Sep 8 21:40 bfe25c2735e395407beb78456ea9a6984a1f00d8c16fa04a8b75f2a614cf53e1 +-rw------- 1 root root 215 Sep 8 21:39 fd9324becc53c4be610db39e13a613006f09fd6ef71a95fb6320dc33157490a3 +drwx------ 2 root root 16384 Sep 8 21:38 lost+found +``` + +## Using the inference server + +At this point the vLLM process should (hopefully) have downloaded the model and started. + +```bash +kubectl wait --for=condition=Available --timeout=10s deployment/gemma3 +kubectl get pods -l app=gemma3 +``` + +To check logs (particularly if this is not already ready) +```bash +kubectl logs -f -l app=gemma3 +``` + + +## Verification / Seeing it Work + +Forward local requests to vLLM service: + +```bash +# Forward a local port (e.g., 8080) to the service port (e.g., 8080) +kubectl port-forward service/gemma3 8080:80 & +``` + +2. Send request to local forwarding port: + +```bash +curl -X POST http://localhost:8080/v1/chat/completions \ +-H "Content-Type: application/json" \ +-d '{ + "model": "google/gemma-3-1b-it", + "messages": [{"role": "user", "content": "Explain Quantum Computing in simple terms."}], + "max_tokens": 100 +}' +``` + +Expected output (or similar): + +```json +{"id":"chatcmpl-462b3e153fd34e5ca7f5f02f3bcb6b0c","object":"chat.completion","created":1753164476,"model":"google/gemma-3-1b-it","choices":[{"index":0,"message":{"role":"assistant","reasoning_content":null,"content":"Okay, let’s break down quantum computing in a way that’s hopefully understandable without getting lost in too much jargon. Here's the gist:\n\n**1. Classical Computers vs. Quantum Computers:**\n\n* **Classical Computers:** These are the computers you use every day – laptops, phones, servers. They store information as *bits*. A bit is like a light switch: it's either on (1) or off (0). Everything a classical computer does – from playing games","tool_calls":[]},"logprobs":null,"finish_reason":"length","stop_reason":null}],"usage":{"prompt_tokens":16,"total_tokens":116,"completion_tokens":100,"prompt_tokens_details":null},"prompt_logprobs":null} +``` + +--- + +## Cleanup + +```bash +kubectl delete deployment gemma3 +kubectl delete statefulset blob-server +``` diff --git a/AI/modelcloud/cmd/blob-server/main.go b/AI/modelcloud/cmd/blob-server/main.go new file mode 100644 index 000000000..8efe9ad88 --- /dev/null +++ b/AI/modelcloud/cmd/blob-server/main.go @@ -0,0 +1,196 @@ +// Copyright 2025 The Kubernetes Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "flag" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "strings" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "k8s.io/examples/AI/modelcloud/pkg/blobs" + "k8s.io/klog/v2" +) + +func main() { + if err := run(context.Background()); err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + os.Exit(1) + } +} + +func run(ctx context.Context) error { + log := klog.FromContext(ctx) + + listen := ":8080" + cacheDir := os.Getenv("CACHE_DIR") + if cacheDir == "" { + // We expect CACHE_DIR to be set when running on kubernetes, but default sensibly for local dev + cacheDir = "~/.cache/blob-server/blobs" + } + flag.StringVar(&listen, "listen", listen, "listen address") + flag.StringVar(&cacheDir, "cache-dir", cacheDir, "cache directory") + flag.Parse() + + if strings.HasPrefix(cacheDir, "~/") { + homeDir, err := os.UserHomeDir() + if err != nil { + return fmt.Errorf("getting home directory: %w", err) + } + cacheDir = filepath.Join(homeDir, strings.TrimPrefix(cacheDir, "~/")) + } + + if err := os.MkdirAll(cacheDir, 0755); err != nil { + return fmt.Errorf("creating cache directory %q: %w", cacheDir, err) + } + + blobStore := &blobs.LocalBlobStore{ + LocalDir: cacheDir, + } + + blobCache := &blobCache{ + CacheDir: cacheDir, + blobStore: blobStore, + } + + s := &httpServer{ + blobCache: blobCache, + tmpDir: filepath.Join(cacheDir, "tmp"), + } + + log.Info("serving http", "endpoint", listen) + if err := http.ListenAndServe(listen, s); err != nil { + return fmt.Errorf("serving on %q: %w", listen, err) + } + + return nil +} + +type httpServer struct { + blobCache *blobCache + tmpDir string +} + +func (s *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { + tokens := strings.Split(strings.TrimPrefix(r.URL.Path, "/"), "/") + if len(tokens) == 1 { + if r.Method == "GET" { + hash := tokens[0] + s.serveGETBlob(w, r, hash) + return + } + if r.Method == "PUT" { + hash := tokens[0] + s.servePUTBlob(w, r, hash) + return + } + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + http.Error(w, "not found", http.StatusNotFound) +} + +func (s *httpServer) serveGETBlob(w http.ResponseWriter, r *http.Request, hash string) { + ctx := r.Context() + + log := klog.FromContext(ctx) + + // TODO: Validate hash is hex, right length etc + + f, err := s.blobCache.GetBlob(ctx, hash) + if err != nil { + if status.Code(err) == codes.NotFound { + log.Info("blob not found", "hash", hash) + http.Error(w, "not found", http.StatusNotFound) + return + } + log.Error(err, "error getting blob") + http.Error(w, "internal server error", http.StatusInternalServerError) + return + } + defer f.Close() + p := f.Name() + + log.Info("serving blob", "hash", hash, "path", p) + http.ServeFile(w, r, p) +} + +func (s *httpServer) servePUTBlob(w http.ResponseWriter, r *http.Request, hash string) { + ctx := r.Context() + + log := klog.FromContext(ctx) + + // TODO: Download to temp file first? + + if err := s.blobCache.PutBlob(ctx, hash, r.Body); err != nil { + log.Error(err, "error stoing blob") + http.Error(w, "internal server error", http.StatusInternalServerError) + return + } + + log.Info("uploaded blob", "hash", hash) + + w.WriteHeader(http.StatusCreated) +} + +type blobCache struct { + CacheDir string + blobStore blobs.BlobStore +} + +func (c *blobCache) GetBlob(ctx context.Context, hash string) (*os.File, error) { + log := klog.FromContext(ctx) + + localPath := filepath.Join(c.CacheDir, hash) + f, err := os.Open(localPath) + if err == nil { + return f, nil + } else if !os.IsNotExist(err) { + return nil, fmt.Errorf("opening blob %q: %w", hash, err) + } + + log.Info("blob not found in cache, downloading", "hash", hash) + + err = c.blobStore.Download(ctx, blobs.BlobInfo{Hash: hash}, localPath) + if err == nil { + f, err := os.Open(localPath) + if err != nil { + return nil, fmt.Errorf("opening blob %q after download: %w", hash, err) + } + return f, nil + } + + return nil, err +} + +func (c *blobCache) PutBlob(ctx context.Context, hash string, r io.Reader) error { + log := klog.FromContext(ctx) + + if err := c.blobStore.Upload(ctx, r, blobs.BlobInfo{Hash: hash}); err != nil { + log.Error(err, "error uploading blob") + return fmt.Errorf("uploading blob %q: %w", hash, err) + } + + // TODO: Side-load into local cache too? + + return nil +} diff --git a/AI/modelcloud/cmd/model-hasher/main.go b/AI/modelcloud/cmd/model-hasher/main.go new file mode 100644 index 000000000..c545af7fb --- /dev/null +++ b/AI/modelcloud/cmd/model-hasher/main.go @@ -0,0 +1,150 @@ +// Copyright 2025 The Kubernetes Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "crypto/sha256" + "flag" + "fmt" + "io" + "io/fs" + "net/url" + "os" + "path/filepath" + "strings" + + "k8s.io/examples/AI/modelcloud/pkg/api" + "k8s.io/examples/AI/modelcloud/pkg/blobs" + + "sigs.k8s.io/yaml" + + "k8s.io/klog/v2" +) + +func main() { + if err := run(context.Background()); err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + os.Exit(1) + } +} + +func run(ctx context.Context) error { + log := klog.FromContext(ctx) + + srcDir := "" + flag.StringVar(&srcDir, "src", srcDir, "Directory to scan for model files") + + upload := "" + flag.StringVar(&upload, "upload", upload, "If set, the URL to upload to") + + flag.Parse() + + if srcDir == "" { + return fmt.Errorf("--src is required") + } + + model := &api.Model{} + + // Walk all the files in the directory, and print their SHA256 hashes + err := filepath.WalkDir(srcDir, func(path string, dent fs.DirEntry, err error) error { + if err != nil { + return err + } + relativePath, err := filepath.Rel(srcDir, path) + if err != nil { + return fmt.Errorf("getting relative path for %q: %w", path, err) + } + if dent.IsDir() { + if relativePath == ".git" { + return filepath.SkipDir + } + return nil + } + + hash, err := fileHash(path) + if err != nil { + log.Error(err, "error hashing file", "path", path) + return nil + } + + model.Spec.Files = append(model.Spec.Files, api.ModelFile{ + Path: relativePath, + Hash: hash, + }) + return nil + }) + if err != nil { + return fmt.Errorf("walking directory %q: %w", srcDir, err) + } + + y, err := yaml.Marshal(model) + if err != nil { + return fmt.Errorf("marshalling model to YAML: %w", err) + } + fmt.Println(string(y)) + + if upload != "" { + var blobstore blobs.BlobStore + + if strings.HasPrefix(upload, "http://") || strings.HasPrefix(upload, "https://") { + u, err := url.Parse(upload) + if err != nil { + return fmt.Errorf("parsing upload URL %q: %w", upload, err) + } + log.Info("using blob server", "url", u.String()) + blobstore = &blobs.BlobServer{ + URL: u, + } + } else { + return fmt.Errorf("upload must be a URL (https:///), got %q", upload) + } + + for _, file := range model.Spec.Files { + info := blobs.BlobInfo{ + Hash: file.Hash, + } + srcPath := filepath.Join(srcDir, file.Path) + r, err := os.Open(srcPath) + if err != nil { + return fmt.Errorf("opening file %q: %w", srcPath, err) + } + defer r.Close() + + if err := blobstore.Upload(ctx, r, info); err != nil { + return fmt.Errorf("uploading file %q: %w", file.Path, err) + } + log.Info("uploaded file", "path", file.Path, "hash", file.Hash) + } + } + + return nil +} + +func fileHash(p string) (string, error) { + f, err := os.Open(p) + if err != nil { + return "", fmt.Errorf("opening file %q: %w", p, err) + } + defer f.Close() + + hasher := sha256.New() + if _, err := io.Copy(hasher, f); err != nil { + return "", fmt.Errorf("hashing file %q: %w", p, err) + } + + hash := fmt.Sprintf("%x", hasher.Sum(nil)) + return hash, nil +} diff --git a/AI/modelcloud/cmd/vllm-frontend/main.go b/AI/modelcloud/cmd/vllm-frontend/main.go new file mode 100644 index 000000000..9780f57f6 --- /dev/null +++ b/AI/modelcloud/cmd/vllm-frontend/main.go @@ -0,0 +1,151 @@ +// Copyright 2025 The Kubernetes Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law of agetd in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "flag" + "fmt" + "net/url" + "os" + "os/exec" + "path/filepath" + "time" + + models "k8s.io/examples/AI/modelcloud/cmd/vllm-frontend/models" + + "k8s.io/examples/AI/modelcloud/pkg/blobs" + "k8s.io/klog/v2" +) + +func main() { + ctx := context.Background() + if err := run(ctx); err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + os.Exit(1) + } +} + +func run(ctx context.Context) error { + log := klog.FromContext(ctx) + + llmModelID := os.Getenv("MODEL_NAME") + flag.StringVar(&llmModelID, "model-name", llmModelID, "identifier of model.") + + blobserverFlag := os.Getenv("BLOBSERVER") + if blobserverFlag == "" { + blobserverFlag = "http://blob-server" + } + flag.StringVar(&blobserverFlag, "blobserver", blobserverFlag, "base url to blobserver") + + klog.InitFlags(nil) + + flag.Parse() + + llmModel, err := models.LoadModel(llmModelID) + if err != nil { + return fmt.Errorf("loading model %q: %w", llmModelID, err) + } + + blobserverURL, err := url.Parse(blobserverFlag) + if err != nil { + return fmt.Errorf("parsing blobserver url %q: %w", blobserverFlag, err) + } + blobStore := &blobs.BlobServer{ + URL: blobserverURL, + } + + loader := &ModelLoader{ + reader: blobStore, + maxDownloadAttempts: 5, + } + modelDir := filepath.Join(os.TempDir(), "model", llmModelID) + if err := os.MkdirAll(modelDir, 0755); err != nil { + return fmt.Errorf("creating temp directory %q: %w", modelDir, err) + } + log.Info("downloading model to", "dir", modelDir) + + for _, file := range llmModel.Spec.Files { + localPath := filepath.Join(modelDir, file.Path) + + info := blobs.BlobInfo{ + Hash: file.Hash, + } + if err := loader.downloadToFile(ctx, info, localPath); err != nil { + return fmt.Errorf("downloading model: %w", err) + } + log.Info("downloaded model file", "path", file.Path) + } + + var vllmArgs []string + + baseArgs := []string{ + "python3", "-m", "vllm.entrypoints.openai.api_server", + "--host=0.0.0.0", + "--port=8080", + "--model=" + modelDir, + "--served_model_name=" + llmModelID, + } + + vllmArgs = append(vllmArgs, baseArgs...) + vllmArgs = append(vllmArgs, flag.Args()...) + + log.Info("starting vllm", "args", vllmArgs) + + cmd := exec.CommandContext(ctx, vllmArgs[0], vllmArgs[1:]...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + env := os.Environ() + cmd.Env = env + + if err := cmd.Start(); err != nil { + return fmt.Errorf("starting vllm: %w", err) + } + + if err := cmd.Wait(); err != nil { + return fmt.Errorf("vllm exited with error: %w", err) + } + return nil +} + +type ModelLoader struct { + // reader is the interface to fetch blobs + reader blobs.BlobReader + + // maxDownloadAttempts is the number of times to attempt a download before failing + maxDownloadAttempts int +} + +func (l *ModelLoader) downloadToFile(ctx context.Context, info blobs.BlobInfo, destPath string) error { + log := klog.FromContext(ctx) + + attempt := 0 + for { + attempt++ + + err := l.reader.Download(ctx, info, destPath) + if err == nil { + return nil + } + + if attempt >= l.maxDownloadAttempts { + return err + } + + log.Error(err, "downloading blob, will retry", "info", info, "attempt", attempt) + time.Sleep(5 * time.Second) + } +} diff --git a/AI/modelcloud/cmd/vllm-frontend/models/embed.go b/AI/modelcloud/cmd/vllm-frontend/models/embed.go new file mode 100644 index 000000000..4c99f8e5c --- /dev/null +++ b/AI/modelcloud/cmd/vllm-frontend/models/embed.go @@ -0,0 +1,40 @@ +// Copyright 2025 The Kubernetes Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package models + +import ( + "embed" + "fmt" + + api "k8s.io/examples/AI/modelcloud/pkg/api" + + "sigs.k8s.io/yaml" +) + +//go:embed */*/*.yaml +var embedModels embed.FS + +func LoadModel(modelID string) (*api.Model, error) { + b, err := embedModels.ReadFile(modelID + "/model.yaml") + if err != nil { + return nil, fmt.Errorf("reading embedded model %q: %w", modelID, err) + } + + model := &api.Model{} + if err := yaml.Unmarshal(b, model); err != nil { + return nil, fmt.Errorf("parsing embedded model %q: %w", modelID, err) + } + return model, nil +} diff --git a/AI/modelcloud/cmd/vllm-frontend/models/google/gemma-3-1b-it/model.yaml b/AI/modelcloud/cmd/vllm-frontend/models/google/gemma-3-1b-it/model.yaml new file mode 100644 index 000000000..a8b0a7d9b --- /dev/null +++ b/AI/modelcloud/cmd/vllm-frontend/models/google/gemma-3-1b-it/model.yaml @@ -0,0 +1,22 @@ +spec: + files: + - hash: 1468e03275c15d522c721bd987f03c4ee21b8e246b901803c67448bbc25acf58 + path: .gitattributes + - hash: 60be259533fe3acba21d109a51673815a4c29aefdb7769862695086fcedbeb7a + path: README.md + - hash: 50b2f405ba56a26d4913fd772089992252d7f942123cc0a034d96424221ba946 + path: added_tokens.json + - hash: 19cb5d28c97778271ba2b3c3df47bf76bdd6706724777a2318b3522230afe91e + path: config.json + - hash: fd9324becc53c4be610db39e13a613006f09fd6ef71a95fb6320dc33157490a3 + path: generation_config.json + - hash: 3d4ef8d71c14db7e448a09ebe891cfb6bf32c57a9b44499ae0d1c098e48516b6 + path: model.safetensors + - hash: 2f7b0adf4fb469770bb1490e3e35df87b1dc578246c5e7e6fc76ecf33213a397 + path: special_tokens_map.json + - hash: 4667f2089529e8e7657cfb6d1c19910ae71ff5f28aa7ab2ff2763330affad795 + path: tokenizer.json + - hash: 1299c11d7cf632ef3b4e11937501358ada021bbdf7c47638d13c0ee982f2e79c + path: tokenizer.model + - hash: bfe25c2735e395407beb78456ea9a6984a1f00d8c16fa04a8b75f2a614cf53e1 + path: tokenizer_config.json diff --git a/AI/modelcloud/dev/tools/build-images b/AI/modelcloud/dev/tools/build-images new file mode 100755 index 000000000..4ea7dff3f --- /dev/null +++ b/AI/modelcloud/dev/tools/build-images @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 +# Copyright 2025 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os +import subprocess +import sys + +# Add shared directory to Python path +sys.path.append(os.path.join(os.path.dirname(__file__), "shared")) +import utils + +def main(): + """Builds docker images.""" + srcdir = utils.find_srcdir() + images_dir = os.path.join(srcdir, "images") + + if not os.path.isdir(images_dir): + print("images directory not found") + return + + for root, dirs, files in os.walk(images_dir): + for filename in files: + if filename != "Dockerfile": + continue + + dockerfile_path = os.path.join(root, filename) + dockerfile_rel_path = os.path.relpath(dockerfile_path, srcdir) + service_name = os.path.basename(root) + + image_name = utils.get_full_image_name(service_name) + + print(f"Building image for {service_name} with tag {image_name}") + + cmd = ["docker", "buildx", "build", "-f", dockerfile_rel_path, "-t", image_name, "."] + subprocess.run(cmd, cwd=srcdir, check=True) + +if __name__ == "__main__": + main() diff --git a/AI/modelcloud/dev/tools/deploy-to-kube b/AI/modelcloud/dev/tools/deploy-to-kube new file mode 100755 index 000000000..0cd13e056 --- /dev/null +++ b/AI/modelcloud/dev/tools/deploy-to-kube @@ -0,0 +1,86 @@ +#!/usr/bin/env python3 +# Copyright 2025 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os +import subprocess +import sys +import io + +import yaml + +# Add shared directory to Python path +sys.path.append(os.path.join(os.path.dirname(__file__), "shared")) +import utils + +def find_and_replace_images(doc, service_name, image_prefix, image_tag): + """Recursively finds and replaces container images in a Kubernetes manifest.""" + if isinstance(doc, dict): + for k, v in doc.items(): + if k == "image": + # We only replace images that look like :latest + image_and_tag = v.split(":") + if len(image_and_tag) == 2 and image_and_tag[1] == "latest": + doc[k] = f"{image_prefix}{image_and_tag[0]}:{image_tag}" + else: + find_and_replace_images(v, service_name, image_prefix, image_tag) + elif isinstance(doc, list): + for item in doc: + find_and_replace_images(item, service_name, image_prefix, image_tag) + +def main(): + srcdir = utils.find_srcdir() + + manifests_path = os.path.join(srcdir, "k8s") + + if not os.path.isdir(manifests_path): + print("k8s directory not found") + return + + for root, dirs, files in os.walk(manifests_path): + for filename in files: + if not (filename.endswith(".yaml") or filename.endswith(".yml")): + continue + + path = os.path.join(root, filename) + + with open(path, "r") as f: + # Use safe_load_all for multi-document YAML files + docs = list(yaml.safe_load_all(f)) + + service_name = os.path.basename(root) + image_prefix = utils.get_image_prefix() + image_tag = utils.get_image_tag() + + # Process each document in the file + for doc in docs: + if not doc: # Skip empty documents + continue + + find_and_replace_images(doc, service_name, image_prefix, image_tag) + + # Dump the modified documents back to a string + string_stream = io.StringIO() + yaml.dump_all(docs, string_stream) + modified_content = string_stream.getvalue() + + rel_path = os.path.relpath(path, srcdir) + print(f"applying manifest {rel_path}") + + cmd = ["kubectl", "apply", "-f", "-"] + subprocess.run(cmd, cwd=srcdir, check=True, input=modified_content, text=True) + +if __name__ == "__main__": + main() diff --git a/AI/modelcloud/dev/tools/push-images b/AI/modelcloud/dev/tools/push-images new file mode 100755 index 000000000..9d0e4af31 --- /dev/null +++ b/AI/modelcloud/dev/tools/push-images @@ -0,0 +1,54 @@ +#!/usr/bin/env python3 +# Copyright 2025 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os +import subprocess +import sys + +# Add shared directory to Python path +sys.path.append(os.path.join(os.path.dirname(__file__), "shared")) +import utils + +def main(): + """Builds and pushes docker images.""" + srcdir = utils.find_srcdir() + images_dir = os.path.join(srcdir, "images") + + if not os.path.isdir(images_dir): + print("images directory not found") + return + + for root, dirs, files in os.walk(images_dir): + for filename in files: + if filename != "Dockerfile": + continue + + dockerfile_path = os.path.join(root, filename) + service_name = os.path.basename(root) + + image_name = utils.get_full_image_name(service_name) + + print(f"building image for {service_name} with tag {image_name}") + + build_cmd = ["docker", "buildx", "build", "--push", "-t", image_name, "-f", dockerfile_path, "."] + subprocess.run(build_cmd, cwd=srcdir, check=True) + + print(f"pushing image {image_name}") + push_cmd = ["docker", "push", image_name] + subprocess.run(push_cmd, check=True) + +if __name__ == "__main__": + main() diff --git a/AI/modelcloud/dev/tools/shared/utils.py b/AI/modelcloud/dev/tools/shared/utils.py new file mode 100644 index 000000000..6df2fae10 --- /dev/null +++ b/AI/modelcloud/dev/tools/shared/utils.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python3 +# Copyright 2025 The Kubernetes Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os +import subprocess + +def find_srcdir(): + """Finds the project root directory by looking for go.mod.""" + p = os.path.dirname(os.path.abspath(__file__)) + while True: + # We go up two levels to get out of dev/tools/shared + p = os.path.dirname(os.path.dirname(os.path.dirname(p))) + if os.path.exists(os.path.join(p, "go.mod")): + return p + parent = os.path.dirname(p) + if parent == p: + raise Exception("could not find go.mod in any parent directory") + p = parent + +def get_gcp_project(): + """Gets the GCP project ID from gcloud.""" + return subprocess.check_output(["gcloud", "config", "get-value", "project"], text=True).strip() + +def get_git_commit_short(): + """Gets the short git commit hash for HEAD.""" + return subprocess.check_output(["git", "rev-parse", "--short", "HEAD"], text=True).strip() + +def get_image_tag(): + """Gets the image tag based on the git commit.""" + return f"git-{get_git_commit_short()}" + +def get_image_prefix(): + """Constructs the image prefix for a container image.""" + project_id = get_gcp_project() + return f"gcr.io/{project_id}/" + +def get_full_image_name(service_name): + """Constructs the full GCR image name for a service.""" + image_prefix = get_image_prefix() + tag = get_image_tag() + return f"{image_prefix}{service_name}:{tag}" diff --git a/AI/modelcloud/go.mod b/AI/modelcloud/go.mod new file mode 100644 index 000000000..9022df046 --- /dev/null +++ b/AI/modelcloud/go.mod @@ -0,0 +1,21 @@ +module k8s.io/examples/AI/modelcloud + +go 1.24.5 + +require ( + google.golang.org/grpc v1.74.2 + k8s.io/klog/v2 v2.130.1 + sigs.k8s.io/yaml v1.6.0 +) + +require ( + github.com/go-logr/logr v1.4.3 // indirect + github.com/kr/pretty v0.3.1 // indirect + github.com/rogpeppe/go-internal v1.13.1 // indirect + go.yaml.in/yaml/v2 v2.4.2 // indirect + golang.org/x/net v0.43.0 // indirect + golang.org/x/sys v0.35.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250818200422-3122310a409c // indirect + google.golang.org/protobuf v1.36.7 // indirect + gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect +) diff --git a/AI/modelcloud/go.sum b/AI/modelcloud/go.sum new file mode 100644 index 000000000..a88b001a6 --- /dev/null +++ b/AI/modelcloud/go.sum @@ -0,0 +1,41 @@ +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= +go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= +go.yaml.in/yaml/v3 v3.0.3 h1:bXOww4E/J3f66rav3pX3m8w6jDE4knZjGOw8b5Y6iNE= +go.yaml.in/yaml/v3 v3.0.3/go.mod h1:tBHosrYAkRZjRAOREWbDnBXUf08JOwYq++0QNwQiWzI= +golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE= +golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= +golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= +golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng= +golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250818200422-3122310a409c h1:qXWI/sQtv5UKboZ/zUk7h+mrf/lXORyI+n9DKDAusdg= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250818200422-3122310a409c/go.mod h1:gw1tLEfykwDz2ET4a12jcXt4couGAm7IwsVaTy0Sflo= +google.golang.org/grpc v1.74.2 h1:WoosgB65DlWVC9FqI82dGsZhWFNBSLjQ84bjROOpMu4= +google.golang.org/grpc v1.74.2/go.mod h1:CtQ+BGjaAIXHs/5YS3i473GqwBBa1zGQNevxdeBEXrM= +google.golang.org/protobuf v1.36.7 h1:IgrO7UwFQGJdRNXH/sQux4R1Dj1WAKcLElzeeRaXV2A= +google.golang.org/protobuf v1.36.7/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= +k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= +sigs.k8s.io/yaml v1.6.0 h1:G8fkbMSAFqgEFgh4b1wmtzDnioxFCUgTZhlbj5P9QYs= +sigs.k8s.io/yaml v1.6.0/go.mod h1:796bPqUfzR/0jLAl6XjHl3Ck7MiyVv8dbTdyT3/pMf4= diff --git a/AI/modelcloud/images/blob-server/Dockerfile b/AI/modelcloud/images/blob-server/Dockerfile new file mode 100644 index 000000000..f8ece4970 --- /dev/null +++ b/AI/modelcloud/images/blob-server/Dockerfile @@ -0,0 +1,32 @@ +# Copyright 2025 The Kubernetes Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM golang:1.25.1 AS builder + +ADD go.mod /workspace/ +ADD go.sum /workspace/ +ADD cmd/blob-server /workspace/cmd/blob-server/ +ADD pkg /workspace/pkg/ + +WORKDIR /workspace + +RUN CGO_ENABLED=0 go build -o /blob-server ./cmd/blob-server + +# TODO: Don't have permission to write to volumes if nonroot +#FROM gcr.io/distroless/static-debian12:nonroot +FROM gcr.io/distroless/static-debian12:latest + +COPY --from=builder /blob-server /blob-server + +ENTRYPOINT ["/blob-server"] \ No newline at end of file diff --git a/AI/modelcloud/images/vllm-frontend/Dockerfile b/AI/modelcloud/images/vllm-frontend/Dockerfile new file mode 100644 index 000000000..0b19a3739 --- /dev/null +++ b/AI/modelcloud/images/vllm-frontend/Dockerfile @@ -0,0 +1,39 @@ +# Copyright 2025 The Kubernetes Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM golang:1.25.1 AS builder + +ADD go.mod /workspace/ +ADD go.sum /workspace/ +ADD cmd/vllm-frontend /workspace/cmd/vllm-frontend/ +ADD pkg /workspace/pkg/ + +WORKDIR /workspace + +RUN CGO_ENABLED=0 go build -o /vllm-frontend ./cmd/vllm-frontend + + +FROM vllm/vllm-openai:v0.10.1.1 +# gemma3-6cf4765df9-c4nmt gemma3 DEBUG 09-08 14:57:56 [__init__.py:58] Checking if CUDA platform is available. +# gemma3-6cf4765df9-c4nmt gemma3 DEBUG 09-08 14:57:56 [__init__.py:82] Exception happens when checking CUDA platform: NVML Shared Library Not Found +# gemma3-6cf4765df9-c4nmt gemma3 DEBUG 09-08 14:57:56 [__init__.py:99] CUDA platform is not available because: NVML Shared Library Not Found + + +# FROM vllm/vllm-openai:v0.10.0 + +COPY --from=builder /vllm-frontend /vllm-frontend + +ENV LD_LIBRARY_PATH /usr/local/nvidia/lib64:/usr/local/lib/ + +ENTRYPOINT ["/vllm-frontend"] \ No newline at end of file diff --git a/AI/modelcloud/k8s/blob-server/manifest.yaml b/AI/modelcloud/k8s/blob-server/manifest.yaml new file mode 100644 index 000000000..6b983498e --- /dev/null +++ b/AI/modelcloud/k8s/blob-server/manifest.yaml @@ -0,0 +1,57 @@ +kind: ServiceAccount +apiVersion: v1 +metadata: + name: blob-server + +--- + +kind: Service +apiVersion: v1 +metadata: + name: blob-server + labels: + app: blob-server +spec: + selector: + app: blob-server + ports: + - port: 80 + targetPort: 8080 + protocol: TCP + +--- + +kind: StatefulSet +apiVersion: apps/v1 +metadata: + name: blob-server +spec: + podManagementPolicy: "Parallel" + replicas: 1 + selector: + matchLabels: + app: blob-server + #serviceName: blob-server + template: + metadata: + labels: + app: blob-server + spec: + serviceAccountName: blob-server + containers: + - name: blob-server + image: blob-server:latest # placeholder value, replaced by deployment scripts + env: + - name: CACHE_DIR + value: /blobs + volumeMounts: + - mountPath: /blobs + name: blobs + volumeClaimTemplates: + - metadata: + name: blobs + spec: + accessModes: ["ReadWriteOnce"] + resources: + requests: + storage: 100Gi \ No newline at end of file diff --git a/AI/modelcloud/k8s/gemma3/manifest.yaml b/AI/modelcloud/k8s/gemma3/manifest.yaml new file mode 100644 index 000000000..fc0010151 --- /dev/null +++ b/AI/modelcloud/k8s/gemma3/manifest.yaml @@ -0,0 +1,57 @@ +kind: ServiceAccount +apiVersion: v1 +metadata: + name: gemma3 + +--- + +kind: Service +apiVersion: v1 +metadata: + name: gemma3 + labels: + app: gemma3 +spec: + selector: + app: gemma3 + ports: + - port: 80 + targetPort: 8080 + protocol: TCP + +--- + +kind: Deployment +apiVersion: apps/v1 +metadata: + name: gemma3 +spec: + replicas: 1 + selector: + matchLabels: + app: gemma3 + #serviceName: gemma3 + template: + metadata: + labels: + app: gemma3 + spec: + serviceAccountName: gemma3 + containers: + - name: gemma3 + image: vllm-frontend:latest # placeholder value, replaced by deployment scripts + resources: + requests: + ephemeral-storage: "10Gi" + nvidia.com/gpu: "1" + limits: + ephemeral-storage: "10Gi" + nvidia.com/gpu: "1" + args: + - -- + - --tensor-parallel-size=1 + env: + - name: MODEL_NAME + value: "google/gemma-3-1b-it" + - name: VLLM_LOGGING_LEVEL + value: "DEBUG" diff --git a/AI/modelcloud/pkg/api/model.go b/AI/modelcloud/pkg/api/model.go new file mode 100644 index 000000000..cc8cc4a4f --- /dev/null +++ b/AI/modelcloud/pkg/api/model.go @@ -0,0 +1,28 @@ +// Copyright 2025 The Kubernetes Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +type Model struct { + Spec ModelSpec `json:"spec,omitempty"` +} + +type ModelSpec struct { + Files []ModelFile `json:"files,omitempty"` +} + +type ModelFile struct { + Path string `json:"path,omitempty"` + Hash string `json:"hash,omitempty"` +} diff --git a/AI/modelcloud/pkg/blobs/blobserver.go b/AI/modelcloud/pkg/blobs/blobserver.go new file mode 100644 index 000000000..bbbfeaa68 --- /dev/null +++ b/AI/modelcloud/pkg/blobs/blobserver.go @@ -0,0 +1,153 @@ +// Copyright 2025 The Kubernetes Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package blobs + +import ( + "context" + "fmt" + "io" + "net/http" + "net/url" + "os" + "path/filepath" + "time" + + "k8s.io/klog/v2" +) + +type BlobServer struct { + // URL is the base URL to the blob-server, typically http://blob-store + URL *url.URL +} + +var _ BlobStore = &BlobServer{} + +func (l *BlobServer) Upload(ctx context.Context, r io.Reader, info BlobInfo) error { + url := l.URL.JoinPath(info.Hash) + return l.uploadFile(ctx, url.String(), r) +} + +func (l *BlobServer) Download(ctx context.Context, info BlobInfo, destPath string) error { + url := l.URL.JoinPath(info.Hash) + return l.downloadToFile(ctx, url.String(), destPath) +} + +func (l *BlobServer) downloadToFile(ctx context.Context, url string, destPath string) error { + log := klog.FromContext(ctx) + + dir := filepath.Dir(destPath) + tempFile, err := os.CreateTemp(dir, "model") + if err != nil { + return fmt.Errorf("creating temp file: %w", err) + } + + shouldDeleteTempFile := true + defer func() { + if shouldDeleteTempFile { + if err := os.Remove(tempFile.Name()); err != nil { + log.Error(err, "removing temp file", "path", tempFile.Name) + } + } + }() + + shouldCloseTempFile := true + defer func() { + if shouldCloseTempFile { + if err := tempFile.Close(); err != nil { + log.Error(err, "closing temp file", "path", tempFile.Name) + } + } + }() + + if err := l.downloadToWriter(ctx, url, tempFile); err != nil { + return fmt.Errorf("downloading from %q: %w", url, err) + } + + if err := tempFile.Close(); err != nil { + return fmt.Errorf("closing temp file: %w", err) + } + shouldCloseTempFile = false + + if err := os.Rename(tempFile.Name(), destPath); err != nil { + return fmt.Errorf("renaming temp file: %w", err) + } + shouldDeleteTempFile = false + + return nil +} + +func (l *BlobServer) downloadToWriter(ctx context.Context, url string, w io.Writer) error { + log := klog.FromContext(ctx) + + log.Info("downloading from url", "url", url) + + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return fmt.Errorf("creating request: %w", err) + } + + startedAt := time.Now() + + httpClient := &http.Client{} + resp, err := httpClient.Do(req) + if err != nil { + return fmt.Errorf("doing request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + if resp.StatusCode == 404 { + return fmt.Errorf("blob not found: %w", os.ErrNotExist) + } + return fmt.Errorf("unexpected status downloading from upstream source: %v", resp.Status) + } + + n, err := io.Copy(w, resp.Body) + if err != nil { + return fmt.Errorf("downloading from upstream source: %w", err) + } + + log.Info("downloaded blob", "url", url, "bytes", n, "duration", time.Since(startedAt)) + + return nil +} + +func (l *BlobServer) uploadFile(ctx context.Context, url string, r io.Reader) error { + log := klog.FromContext(ctx) + + log.Info("uploading to url", "url", url) + + req, err := http.NewRequestWithContext(ctx, "PUT", url, r) + if err != nil { + return fmt.Errorf("creating request: %w", err) + } + + startedAt := time.Now() + + httpClient := &http.Client{} + resp, err := httpClient.Do(req) + if err != nil { + return fmt.Errorf("doing request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != 201 { + return fmt.Errorf("unexpected status uploading to %q: %v", url, resp.Status) + } + + log.Info("uploaded blob", "url", url, "duration", time.Since(startedAt)) + + return nil +} diff --git a/AI/modelcloud/pkg/blobs/interfaces.go b/AI/modelcloud/pkg/blobs/interfaces.go new file mode 100644 index 000000000..bd0992d45 --- /dev/null +++ b/AI/modelcloud/pkg/blobs/interfaces.go @@ -0,0 +1,39 @@ +// Copyright 2025 The Kubernetes Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package blobs + +import ( + "context" + "io" +) + +type BlobReader interface { + // If no such object exists, Download should return an error for which errors.Is(err, os.ErrNotExist) is true. + Download(ctx context.Context, info BlobInfo, destPath string) error +} + +type BlobStore interface { + BlobReader + + // Upload uploads the blob to the blobstore, verifying the hash. + // If an object with the same hash already exists, Upload should not modify the existing object. + // On success, Upload returns (true, nil). + // On failure, Upload returns (false, err). + Upload(ctx context.Context, r io.Reader, info BlobInfo) error +} + +type BlobInfo struct { + Hash string +} diff --git a/AI/modelcloud/pkg/blobs/localblobstore.go b/AI/modelcloud/pkg/blobs/localblobstore.go new file mode 100644 index 000000000..cc352e2a4 --- /dev/null +++ b/AI/modelcloud/pkg/blobs/localblobstore.go @@ -0,0 +1,136 @@ +// Copyright 2025 The Kubernetes Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package blobs + +import ( + "context" + "crypto/sha256" + "fmt" + "io" + "os" + "path/filepath" + "time" + + "k8s.io/klog/v2" +) + +type LocalBlobStore struct { + LocalDir string // Directory to store blobs +} + +var _ BlobStore = (*LocalBlobStore)(nil) + +func (j *LocalBlobStore) Upload(ctx context.Context, r io.Reader, info BlobInfo) error { + log := klog.FromContext(ctx) + + localPath := filepath.Join(j.LocalDir, info.Hash) + if err := os.MkdirAll(filepath.Dir(localPath), 0755); err != nil { + return fmt.Errorf("creating parent directories for %q: %w", localPath, err) + } + + stat, err := os.Stat(localPath) + if err == nil { + log.Info("file already exists, skipping upload", "path", localPath, "size", stat.Size(), "modTime", stat.ModTime()) + return nil + } + + if err != nil && !os.IsNotExist(err) { + return fmt.Errorf("checking for destination file %q: %w", localPath, err) + } + + // TODO: Try to optimize case where already exists? + if _, err := writeToFile(ctx, r, localPath, info); err != nil { + return fmt.Errorf("writing file %q: %w", localPath, err) + } + + log.Info("added blob to local store", "path", localPath) + + return nil +} + +func (j *LocalBlobStore) Download(ctx context.Context, info BlobInfo, destinationPath string) error { + log := klog.FromContext(ctx) + + localPath := filepath.Join(j.LocalDir, info.Hash) + + startedAt := time.Now() + f, err := os.Open(localPath) + if err != nil { + return fmt.Errorf("opening local blob %q: %w", localPath, err) + } + defer f.Close() + + n, err := writeToFile(ctx, f, destinationPath, info) + if err != nil { + return fmt.Errorf("copying file %q to %q: %w", localPath, destinationPath, err) + } + + log.Info("downloaded blob from local store", "source", localPath, "destination", destinationPath, "bytes", n, "duration", time.Since(startedAt)) + + return nil +} + +func writeToFile(ctx context.Context, src io.Reader, destinationPath string, info BlobInfo) (int64, error) { + log := klog.FromContext(ctx) + + dir := filepath.Dir(destinationPath) + tempFile, err := os.CreateTemp(dir, "download") + if err != nil { + return 0, fmt.Errorf("creating temp file: %w", err) + } + + shouldDeleteTempFile := true + defer func() { + if shouldDeleteTempFile { + if err := os.Remove(tempFile.Name()); err != nil { + log.Error(err, "removing temp file", "path", tempFile.Name) + } + } + }() + + shouldCloseTempFile := true + defer func() { + if shouldCloseTempFile { + if err := tempFile.Close(); err != nil { + log.Error(err, "closing temp file", "path", tempFile.Name) + } + } + }() + + hasher := sha256.New() + mw := io.MultiWriter(tempFile, hasher) + + n, err := io.Copy(mw, src) + if err != nil { + return n, fmt.Errorf("downloading from upstream source: %w", err) + } + + calculatedHash := fmt.Sprintf("%x", hasher.Sum(nil)) + if info.Hash != "" && calculatedHash != info.Hash { + return n, fmt.Errorf("hash mismatch: expected %q, got %q", info.Hash, calculatedHash) + } + + if err := tempFile.Close(); err != nil { + return n, fmt.Errorf("closing temp file: %w", err) + } + shouldCloseTempFile = false + + if err := os.Rename(tempFile.Name(), destinationPath); err != nil { + return n, fmt.Errorf("renaming temp file: %w", err) + } + shouldDeleteTempFile = false + + return n, nil +}