|
| 1 | +package log |
| 2 | + |
| 3 | +import ( |
| 4 | + "archive/tar" |
| 5 | + "bytes" |
| 6 | + "context" |
| 7 | + "errors" |
| 8 | + "fmt" |
| 9 | + "io" |
| 10 | + "log" |
| 11 | + "math" |
| 12 | + "net/url" |
| 13 | + "os" |
| 14 | + "path" |
| 15 | + "path/filepath" |
| 16 | + |
| 17 | + "github.com/spf13/cobra" |
| 18 | + corev1 "k8s.io/api/core/v1" |
| 19 | + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| 20 | + "k8s.io/cli-runtime/pkg/genericclioptions" |
| 21 | + clientgoscheme "k8s.io/client-go/kubernetes/scheme" |
| 22 | + "k8s.io/client-go/rest" |
| 23 | + "k8s.io/client-go/tools/remotecommand" |
| 24 | + cmdutil "k8s.io/kubectl/pkg/cmd/util" |
| 25 | +) |
| 26 | + |
| 27 | +const filePathInPod = "/tmp/ray/session_latest/logs/" |
| 28 | + |
| 29 | +type ClusterLogOptions struct { |
| 30 | + configFlags *genericclioptions.ConfigFlags |
| 31 | + ioStreams *genericclioptions.IOStreams |
| 32 | + Executor RemoteExecutor |
| 33 | + outputDir string |
| 34 | + nodeType string |
| 35 | + args []string |
| 36 | +} |
| 37 | + |
| 38 | +func NewClusterLogOptions(streams genericclioptions.IOStreams) *ClusterLogOptions { |
| 39 | + return &ClusterLogOptions{ |
| 40 | + configFlags: genericclioptions.NewConfigFlags(true), |
| 41 | + ioStreams: &streams, |
| 42 | + Executor: &DefaultRemoteExecutor{}, |
| 43 | + } |
| 44 | +} |
| 45 | + |
| 46 | +func NewClusterLogCommand(streams genericclioptions.IOStreams) *cobra.Command { |
| 47 | + options := NewClusterLogOptions(streams) |
| 48 | + // Initialize the factory for later use with the current config flag |
| 49 | + cmdFactory := cmdutil.NewFactory(options.configFlags) |
| 50 | + |
| 51 | + cmd := &cobra.Command{ |
| 52 | + Use: "log (RAY_CLUSTER_NAME) [--out-dir DIR_PATH] [--node-type all|head|worker]", |
| 53 | + Short: "Get ray cluster log", |
| 54 | + Aliases: []string{"logs"}, |
| 55 | + SilenceUsage: true, |
| 56 | + RunE: func(cmd *cobra.Command, args []string) error { |
| 57 | + if err := options.Complete(args); err != nil { |
| 58 | + return err |
| 59 | + } |
| 60 | + if err := options.Validate(); err != nil { |
| 61 | + return err |
| 62 | + } |
| 63 | + return options.Run(cmd.Context(), cmdFactory) |
| 64 | + }, |
| 65 | + } |
| 66 | + cmd.Flags().StringVar(&options.outputDir, "out-dir", options.outputDir, "File Directory PATH of where to download the file logs to.") |
| 67 | + cmd.Flags().StringVar(&options.nodeType, "node-type", options.nodeType, "Type of Ray node to download the files for.") |
| 68 | + options.configFlags.AddFlags(cmd.Flags()) |
| 69 | + return cmd |
| 70 | +} |
| 71 | + |
| 72 | +func (options *ClusterLogOptions) Complete(args []string) error { |
| 73 | + options.args = args |
| 74 | + |
| 75 | + if options.nodeType == "" { |
| 76 | + options.nodeType = "head" |
| 77 | + } |
| 78 | + |
| 79 | + return nil |
| 80 | +} |
| 81 | + |
| 82 | +func (options *ClusterLogOptions) Validate() error { |
| 83 | + // Overrides and binds the kube config then retrieves the merged result |
| 84 | + config, err := options.configFlags.ToRawKubeConfigLoader().RawConfig() |
| 85 | + if err != nil { |
| 86 | + return fmt.Errorf("Error retrieving raw config: %w", err) |
| 87 | + } |
| 88 | + if len(config.CurrentContext) == 0 { |
| 89 | + return fmt.Errorf("no context is currently set, use %q to select a new one", "kubectl config use-context <context>") |
| 90 | + } |
| 91 | + |
| 92 | + // Command must have ray cluster name |
| 93 | + if len(options.args) != 1 { |
| 94 | + return fmt.Errorf("must have at only one argument") |
| 95 | + } else if options.outputDir == "" { |
| 96 | + fmt.Fprintln(options.ioStreams.Out, "No output directory specified, creating dir under current directory using cluster name.") |
| 97 | + options.outputDir = options.args[0] |
| 98 | + err := os.MkdirAll(options.outputDir, 0o755) |
| 99 | + if err != nil { |
| 100 | + return fmt.Errorf("could not create directory with cluster name %s: %w", options.outputDir, err) |
| 101 | + } |
| 102 | + } |
| 103 | + |
| 104 | + switch options.nodeType { |
| 105 | + case "all": |
| 106 | + return fmt.Errorf("node type `all` is currently not supported") |
| 107 | + case "head": |
| 108 | + break |
| 109 | + case "worker": |
| 110 | + return fmt.Errorf("node type `worker` is currently not supported") |
| 111 | + default: |
| 112 | + return fmt.Errorf("unknown node type `%s`", options.nodeType) |
| 113 | + } |
| 114 | + |
| 115 | + info, err := os.Stat(options.outputDir) |
| 116 | + if os.IsNotExist(err) { |
| 117 | + return fmt.Errorf("Directory does not exist. Failed with: %w", err) |
| 118 | + } else if err != nil { |
| 119 | + return fmt.Errorf("Error occurred will checking directory: %w", err) |
| 120 | + } else if !info.IsDir() { |
| 121 | + return fmt.Errorf("Path is Not a directory. Please input a directory and try again") |
| 122 | + } |
| 123 | + |
| 124 | + return nil |
| 125 | +} |
| 126 | + |
| 127 | +func (options *ClusterLogOptions) Run(ctx context.Context, factory cmdutil.Factory) error { |
| 128 | + kubeClientSet, err := factory.KubernetesClientSet() |
| 129 | + if err != nil { |
| 130 | + return fmt.Errorf("failed to retrieve kubernetes client set: %w", err) |
| 131 | + } |
| 132 | + |
| 133 | + var listopts v1.ListOptions |
| 134 | + if options.nodeType == "head" { |
| 135 | + listopts = v1.ListOptions{ |
| 136 | + LabelSelector: fmt.Sprintf("ray.io/group=headgroup, ray.io/cluster=%s", options.args[0]), |
| 137 | + } |
| 138 | + } |
| 139 | + |
| 140 | + // Get list of nodes that are considered ray heads |
| 141 | + rayHeads, err := kubeClientSet.CoreV1().Pods(*options.configFlags.Namespace).List(ctx, listopts) |
| 142 | + if err != nil { |
| 143 | + return fmt.Errorf("failed to retrieve head node for cluster %s: %w", options.args[0], err) |
| 144 | + } |
| 145 | + |
| 146 | + // Get a list of logs of the ray heads. |
| 147 | + var logList []*bytes.Buffer |
| 148 | + for _, rayHead := range rayHeads.Items { |
| 149 | + request := kubeClientSet.CoreV1().Pods(rayHead.Namespace).GetLogs(rayHead.Name, &corev1.PodLogOptions{}) |
| 150 | + |
| 151 | + podLogs, err := request.Stream(ctx) |
| 152 | + if err != nil { |
| 153 | + return fmt.Errorf("Error retrieving log for kuberay-head %s: %w", rayHead.Name, err) |
| 154 | + } |
| 155 | + defer podLogs.Close() |
| 156 | + |
| 157 | + // Get current logs: |
| 158 | + buf := new(bytes.Buffer) |
| 159 | + _, err = io.Copy(buf, podLogs) |
| 160 | + if err != nil { |
| 161 | + return fmt.Errorf("Failed to get read current logs for kuberay-head %s: %w", rayHead.Name, err) |
| 162 | + } |
| 163 | + |
| 164 | + logList = append(logList, buf) |
| 165 | + } |
| 166 | + |
| 167 | + // Pod file name format is name of the ray head |
| 168 | + for ind, logList := range logList { |
| 169 | + curFilePath := filepath.Join(options.outputDir, rayHeads.Items[ind].Name, "stdout.log") |
| 170 | + dirPath := filepath.Join(options.outputDir, rayHeads.Items[ind].Name) |
| 171 | + err := os.MkdirAll(dirPath, 0o755) |
| 172 | + if err != nil { |
| 173 | + return fmt.Errorf("failed to create directory within path %s: %w", dirPath, err) |
| 174 | + } |
| 175 | + file, err := os.OpenFile(curFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o644) |
| 176 | + if err != nil { |
| 177 | + return fmt.Errorf("failed to create/open file for kuberay-head with path: %s: %w", curFilePath, err) |
| 178 | + } |
| 179 | + defer file.Close() |
| 180 | + |
| 181 | + _, err = logList.WriteTo(file) |
| 182 | + if err != nil { |
| 183 | + return fmt.Errorf("failed to write to file for kuberay-head: %s: %w", rayHeads.Items[ind].Name, err) |
| 184 | + } |
| 185 | + |
| 186 | + req := kubeClientSet.CoreV1().RESTClient(). |
| 187 | + Get(). |
| 188 | + Namespace(rayHeads.Items[ind].Namespace). |
| 189 | + Resource("pods"). |
| 190 | + Name(rayHeads.Items[ind].Name). |
| 191 | + SubResource("exec"). |
| 192 | + VersionedParams(&corev1.PodExecOptions{ |
| 193 | + Command: []string{"tar", "--warning=no-file-changed", "-cf", "-", "-C", filePathInPod, "."}, |
| 194 | + Stdin: true, |
| 195 | + Stdout: true, |
| 196 | + Stderr: true, |
| 197 | + TTY: false, |
| 198 | + }, clientgoscheme.ParameterCodec) |
| 199 | + |
| 200 | + restconfig, err := factory.ToRESTConfig() |
| 201 | + if err != nil { |
| 202 | + return fmt.Errorf("failed to get restconfig: %w", err) |
| 203 | + } |
| 204 | + |
| 205 | + exec, err := options.Executor.CreateExecutor(restconfig, req.URL()) |
| 206 | + if err != nil { |
| 207 | + return fmt.Errorf("failed to create executor with error: %w", err) |
| 208 | + } |
| 209 | + |
| 210 | + err = options.downloadRayLogFiles(ctx, exec, rayHeads.Items[ind]) |
| 211 | + if err != nil { |
| 212 | + return fmt.Errorf("failed to download ray head log files with error: %w", err) |
| 213 | + } |
| 214 | + } |
| 215 | + return nil |
| 216 | +} |
| 217 | + |
| 218 | +// RemoteExecutor creates the executor for executing exec on the pod - provided for testing purposes |
| 219 | +type RemoteExecutor interface { |
| 220 | + CreateExecutor(restConfig *rest.Config, url *url.URL) (remotecommand.Executor, error) |
| 221 | +} |
| 222 | + |
| 223 | +type DefaultRemoteExecutor struct{} |
| 224 | + |
| 225 | +// CreateExecutor returns the executor created by NewSPDYExecutor |
| 226 | +func (dre *DefaultRemoteExecutor) CreateExecutor(restConfig *rest.Config, url *url.URL) (remotecommand.Executor, error) { |
| 227 | + return remotecommand.NewSPDYExecutor(restConfig, "POST", url) |
| 228 | +} |
| 229 | + |
| 230 | +// downloadRayLogFiles will use to the executor and retrieve the logs file from the inputted ray head |
| 231 | +func (options *ClusterLogOptions) downloadRayLogFiles(ctx context.Context, exec remotecommand.Executor, rayhead corev1.Pod) error { |
| 232 | + outreader, outStream := io.Pipe() |
| 233 | + go func() { |
| 234 | + defer outStream.Close() |
| 235 | + err := exec.StreamWithContext(ctx, remotecommand.StreamOptions{ |
| 236 | + Stdin: options.ioStreams.In, |
| 237 | + Stdout: outStream, |
| 238 | + Stderr: options.ioStreams.ErrOut, |
| 239 | + Tty: false, |
| 240 | + }) |
| 241 | + if err != nil { |
| 242 | + log.Fatalf("Error occurred while calling remote command: %v", err) |
| 243 | + } |
| 244 | + }() |
| 245 | + |
| 246 | + // Goes through the tar and create/copy them one by one into the destination dir |
| 247 | + tarReader := tar.NewReader(outreader) |
| 248 | + header, err := tarReader.Next() |
| 249 | + if err != nil && !errors.Is(err, io.EOF) { |
| 250 | + return fmt.Errorf("error will extracting head tar file for ray head %s: %w", rayhead.Name, err) |
| 251 | + } |
| 252 | + for !errors.Is(err, io.EOF) { |
| 253 | + fmt.Printf("Downloading file %s for Ray Head %s\n", header.Name, rayhead.Name) |
| 254 | + if err != nil { |
| 255 | + return fmt.Errorf("Error reading tar archive: %w", err) |
| 256 | + } |
| 257 | + |
| 258 | + // Construct the full local path and a directory for the tmp file logs |
| 259 | + localFilePath := filepath.Join(path.Clean(options.outputDir), path.Clean(rayhead.Name), path.Clean(header.Name)) |
| 260 | + |
| 261 | + switch header.Typeflag { |
| 262 | + case tar.TypeDir: |
| 263 | + if err := os.MkdirAll(localFilePath, 0o755); err != nil { |
| 264 | + return fmt.Errorf("Error creating directory: %w", err) |
| 265 | + } |
| 266 | + case tar.TypeReg: |
| 267 | + // Check for overflow: G115 |
| 268 | + if header.Mode < 0 || header.Mode > math.MaxUint32 { |
| 269 | + fmt.Fprintf(options.ioStreams.Out, "file mode out side of accceptable value %d skipping file", header.Mode) |
| 270 | + } |
| 271 | + // Create file and write contents |
| 272 | + outFile, err := os.OpenFile(localFilePath, os.O_CREATE|os.O_RDWR, os.FileMode(header.Mode)) //nolint:gosec // lint failing due to file mode conversion from uint64 to int32, checked above |
| 273 | + if err != nil { |
| 274 | + return fmt.Errorf("Error creating file: %w", err) |
| 275 | + } |
| 276 | + defer outFile.Close() |
| 277 | + // This is to limit the copy size for a decompression bomb, currently set arbitrarily |
| 278 | + for { |
| 279 | + n, err := io.CopyN(outFile, tarReader, 1000000) |
| 280 | + if err != nil { |
| 281 | + if errors.Is(err, io.EOF) { |
| 282 | + break |
| 283 | + } |
| 284 | + return fmt.Errorf("failed while writing to file: %w", err) |
| 285 | + } |
| 286 | + if n == 0 { |
| 287 | + break |
| 288 | + } |
| 289 | + } |
| 290 | + default: |
| 291 | + fmt.Printf("Ignoring unsupported file type: %b", header.Typeflag) |
| 292 | + } |
| 293 | + |
| 294 | + header, err = tarReader.Next() |
| 295 | + if header == nil && err != nil && !errors.Is(err, io.EOF) { |
| 296 | + return fmt.Errorf("error while extracting tar file with error: %w", err) |
| 297 | + } |
| 298 | + } |
| 299 | + |
| 300 | + return nil |
| 301 | +} |
0 commit comments