diff --git a/.github/workflows/kind_e2e.yaml b/.github/workflows/kind_e2e.yaml index 9d5ce7a23..652ede811 100644 --- a/.github/workflows/kind_e2e.yaml +++ b/.github/workflows/kind_e2e.yaml @@ -50,7 +50,7 @@ jobs: name: E2E tests for K8s (KinD) runs-on: ubuntu-latest env: - DAPR_RUNTIME_PINNED_VERSION: 1.16.0 + DAPR_RUNTIME_PINNED_VERSION: 1.16.1 DAPR_DASHBOARD_PINNED_VERSION: 0.15.0 DAPR_RUNTIME_LATEST_STABLE_VERSION: DAPR_DASHBOARD_LATEST_STABLE_VERSION: diff --git a/.github/workflows/self_hosted_e2e.yaml b/.github/workflows/self_hosted_e2e.yaml index 20ccd4113..6cb796019 100644 --- a/.github/workflows/self_hosted_e2e.yaml +++ b/.github/workflows/self_hosted_e2e.yaml @@ -38,7 +38,7 @@ jobs: GOARCH: ${{ matrix.target_arch }} GOPROXY: https://proxy.golang.org ARCHIVE_OUTDIR: dist/archives - DAPR_RUNTIME_PINNED_VERSION: "1.16.0" + DAPR_RUNTIME_PINNED_VERSION: "1.16.1" DAPR_DASHBOARD_PINNED_VERSION: 0.15.0 DAPR_RUNTIME_LATEST_STABLE_VERSION: "" DAPR_DASHBOARD_LATEST_STABLE_VERSION: "" diff --git a/Makefile b/Makefile index 3bc4b4005..8116f3cb1 100644 --- a/Makefile +++ b/Makefile @@ -74,7 +74,7 @@ TEST_OUTPUT_FILE ?= test_output.json # Set the default timeout for tests to 10 minutes ifndef E2E_SH_TEST_TIMEOUT - override E2E_SH_TEST_TIMEOUT := 10m + override E2E_SH_TEST_TIMEOUT := 30m endif # Use the variable H to add a header (equivalent to =>) to informational output diff --git a/cmd/dapr.go b/cmd/dapr.go index 0bfd59952..d539ef047 100644 --- a/cmd/dapr.go +++ b/cmd/dapr.go @@ -21,6 +21,7 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" + "github.com/dapr/cli/cmd/scheduler" "github.com/dapr/cli/pkg/api" "github.com/dapr/cli/pkg/print" "github.com/dapr/cli/pkg/standalone" @@ -108,4 +109,6 @@ func init() { RootCmd.Flags().BoolVarP(&versionFlag, "version", "v", false, "version for dapr") RootCmd.PersistentFlags().StringVarP(&daprRuntimePath, "runtime-path", "", "", "The path to the dapr runtime installation directory") RootCmd.PersistentFlags().BoolVarP(&logAsJSON, "log-as-json", "", false, "Log output in JSON format") + + RootCmd.AddCommand(scheduler.SchedulerCmd) } diff --git a/cmd/scheduler/delete.go b/cmd/scheduler/delete.go new file mode 100644 index 000000000..b6146348e --- /dev/null +++ b/cmd/scheduler/delete.go @@ -0,0 +1,52 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "github.com/spf13/cobra" + + "github.com/dapr/cli/pkg/scheduler" + "github.com/dapr/kit/signals" +) + +var DeleteCmd = &cobra.Command{ + Use: "delete", + Aliases: []string{"d", "del"}, + Short: `Delete one of more jobs from scheduler. +Job names are formatted by their type, app ID, then identifier. +Actor reminders require the actor type, actor ID, then reminder name, separated by /. +Workflow reminders require the app ID, instance ID, then reminder name, separated by /. +Accepts multiple names. +`, + Args: cobra.MinimumNArgs(1), + Example: ` +dapr scheduler delete app/my-app-id/my-job-name +dapr scheduler delete actor/my-actor-type/my-actor-id/my-reminder-name +dapr scheduler delete workflow/my-app-id/my-instance-id/my-workflow-reminder-name +`, + RunE: func(cmd *cobra.Command, args []string) error { + ctx := signals.Context() + opts := scheduler.DeleteOptions{ + SchedulerNamespace: schedulerNamespace, + KubernetesMode: kubernetesMode, + DaprNamespace: daprNamespace, + } + + return scheduler.Delete(ctx, opts, args...) + }, +} + +func init() { + SchedulerCmd.AddCommand(DeleteCmd) +} diff --git a/cmd/scheduler/deleteall.go b/cmd/scheduler/deleteall.go new file mode 100644 index 000000000..820c3a080 --- /dev/null +++ b/cmd/scheduler/deleteall.go @@ -0,0 +1,54 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "github.com/spf13/cobra" + + "github.com/dapr/cli/pkg/scheduler" + "github.com/dapr/kit/signals" +) + +var DeleteAllCmd = &cobra.Command{ + Use: "delete-all", + Aliases: []string{"da", "delall"}, + Short: `Delete all scheduled jobs in the specified namespace of a particular filter. +Accepts a single key as an argument. Deletes all jobs which match the filter key. +`, + Args: cobra.ExactArgs(1), + Example: ` +dapr scheduler delete-all all +dapr scheduler delete-all app +dapr scheduler delete-all app/my-app-id +dapr scheduler delete-all actor/my-actor-type +dapr scheduler delete-all actor/my-actor-type/my-actor-id +dapr scheduler delete-all workflow +dapr scheduler delete-all workflow/my-app-id +dapr scheduler delete-all workflow/my-app-id/my-workflow-id +`, + RunE: func(cmd *cobra.Command, args []string) error { + ctx := signals.Context() + opts := scheduler.DeleteOptions{ + SchedulerNamespace: schedulerNamespace, + KubernetesMode: kubernetesMode, + DaprNamespace: daprNamespace, + } + + return scheduler.DeleteAll(ctx, opts, args[0]) + }, +} + +func init() { + SchedulerCmd.AddCommand(DeleteAllCmd) +} diff --git a/cmd/scheduler/export.go b/cmd/scheduler/export.go new file mode 100644 index 000000000..279177d52 --- /dev/null +++ b/cmd/scheduler/export.go @@ -0,0 +1,61 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "os" + + "github.com/spf13/cobra" + + "github.com/dapr/cli/pkg/print" + "github.com/dapr/cli/pkg/scheduler" + "github.com/dapr/kit/signals" +) + +var ( + schedulerExportFile string +) + +var SchedulerExportCmd = &cobra.Command{ + Use: "export", + Short: "Export all jobs and actor reminders to a binary file, including the tracked count.", + Long: `Export jobs and actor reminders which are scheduled in Scheduler. +Can later be imported using 'dapr scheduler import'. +dapr scheduler export -o output.bin +`, + RunE: func(cmd *cobra.Command, args []string) error { + ctx := signals.Context() + + err := scheduler.Export(ctx, scheduler.ExportImportOptions{ + SchedulerNamespace: schedulerNamespace, + KubernetesMode: kubernetesMode, + TargetFile: schedulerExportFile, + }) + if err != nil { + return err + } + + print.InfoStatusEvent(os.Stdout, "Export to '%s' complete.", schedulerExportFile) + + return nil + }, +} + +func init() { + SchedulerExportCmd.Flags().MarkHidden("namespace") + SchedulerExportCmd.Flags().StringVarP(&schedulerExportFile, "output-file", "o", "", "Output binary file to export jobs and actor reminders to.") + SchedulerExportCmd.MarkFlagRequired("output-file") + SchedulerExportCmd.MarkFlagFilename("output-file") + SchedulerCmd.AddCommand(SchedulerExportCmd) +} diff --git a/cmd/scheduler/get.go b/cmd/scheduler/get.go new file mode 100644 index 000000000..f5ad880a1 --- /dev/null +++ b/cmd/scheduler/get.go @@ -0,0 +1,93 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "os" + + "github.com/gocarina/gocsv" + "github.com/spf13/cobra" + + "github.com/dapr/cli/pkg/scheduler" + "github.com/dapr/cli/utils" + "github.com/dapr/kit/signals" +) + +var ( + getOutputFormat *string +) + +var GetCmd = &cobra.Command{ + Use: "get", + Aliases: []string{"g", "ge"}, + Short: `Get a scheduled app job or actor reminder in Scheduler. +Job names are formatted by their type, app ID, then identifier. +Actor reminders require the actor type, actor ID, then reminder name, separated by /. +Workflow reminders require the app ID, instance ID, then reminder name, separated by /. +Activity reminders require the app ID, activity ID, separated by /. +Accepts multiple names. +`, + Args: cobra.MinimumNArgs(1), + Example: ` +dapr scheduler get app/my-app-id/my-job-name +dapr scheduler get actor/my-actor-type/my-actor-id/my-reminder-name +dapr scheduler get workflow/my-app-id/my-instance-id/my-workflow-reminder-name +dapr scheduler get activity/my-app-id/xyz::0::1 +`, + RunE: func(cmd *cobra.Command, args []string) error { + ctx := signals.Context() + opts := scheduler.GetOptions{ + SchedulerNamespace: schedulerNamespace, + KubernetesMode: kubernetesMode, + DaprNamespace: daprNamespace, + } + + var list any + var err error + if *getOutputFormat == outputFormatShort { + list, err = scheduler.Get(ctx, opts, args...) + } else { + list, err = scheduler.GetWide(ctx, opts, args...) + } + if err != nil { + return err + } + + switch *getOutputFormat { + case outputFormatYAML: + err = utils.PrintDetail(os.Stdout, "yaml", list) + case outputFormatJSON: + err = utils.PrintDetail(os.Stdout, "json", list) + default: + var table string + table, err = gocsv.MarshalString(list) + if err != nil { + break + } + + utils.PrintTable(table) + } + + if err != nil { + return err + } + + return nil + }, +} + +func init() { + getOutputFormat = outputFunc(GetCmd) + SchedulerCmd.AddCommand(GetCmd) +} diff --git a/cmd/scheduler/import.go b/cmd/scheduler/import.go new file mode 100644 index 000000000..267b70ed2 --- /dev/null +++ b/cmd/scheduler/import.go @@ -0,0 +1,60 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "os" + + "github.com/spf13/cobra" + + "github.com/dapr/cli/pkg/print" + "github.com/dapr/cli/pkg/scheduler" + "github.com/dapr/kit/signals" +) + +var ( + schedulerImportFile string +) + +var SchedulerImportCmd = &cobra.Command{ + Use: "import", + Short: "Import all jobs and actor reminders from a binary file generated by 'dapr scheduler export'.", + Long: `Import jobs and actor reminders to Scheduler from a binary file generated by 'dapr scheduler export'. +dapr scheduler import -f export.bin`, + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + ctx := signals.Context() + + err := scheduler.Import(ctx, scheduler.ExportImportOptions{ + SchedulerNamespace: schedulerNamespace, + KubernetesMode: kubernetesMode, + TargetFile: schedulerImportFile, + }) + if err != nil { + return err + } + + print.InfoStatusEvent(os.Stdout, "Import from '%s' complete.", schedulerImportFile) + + return nil + }, +} + +func init() { + SchedulerImportCmd.Flags().MarkHidden("namespace") + SchedulerImportCmd.Flags().StringVarP(&schedulerImportFile, "input-file", "f", "", "Input file to import jobs and actor reminders from.") + SchedulerImportCmd.MarkFlagRequired("input-file") + SchedulerImportCmd.MarkFlagFilename("input-file") + SchedulerCmd.AddCommand(SchedulerImportCmd) +} diff --git a/cmd/scheduler/list.go b/cmd/scheduler/list.go new file mode 100644 index 000000000..c6fefd95e --- /dev/null +++ b/cmd/scheduler/list.go @@ -0,0 +1,102 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "os" + + "github.com/gocarina/gocsv" + "github.com/spf13/cobra" + + "github.com/dapr/cli/pkg/print" + "github.com/dapr/cli/pkg/scheduler" + "github.com/dapr/cli/utils" + "github.com/dapr/kit/ptr" + "github.com/dapr/kit/signals" +) + +var ( + listFilterType *string + listOutputFormat *string +) + +var ListCmd = &cobra.Command{ + Use: "list", + Short: "List scheduled jobs in Scheduler.", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + ctx := signals.Context() + + opts := scheduler.ListOptions{ + SchedulerNamespace: schedulerNamespace, + KubernetesMode: kubernetesMode, + Filter: scheduler.Filter{ + Type: *listFilterType, + }, + } + opts.Filter.Namespace = ptr.Of(daprNamespace) + + var list any + var empty bool + switch *listOutputFormat { + case outputFormatShort: + ll, err := scheduler.List(ctx, opts) + if err != nil { + return err + } + empty = len(ll) == 0 + list = ll + default: + ll, err := scheduler.ListWide(ctx, opts) + if err != nil { + return err + } + empty = len(ll) == 0 + list = ll + } + + if empty { + print.FailureStatusEvent(os.Stderr, "No jobs found in namespace %q", daprNamespace) + return nil + } + + var err error + switch *listOutputFormat { + case outputFormatYAML: + err = utils.PrintDetail(os.Stdout, "yaml", list) + case outputFormatJSON: + err = utils.PrintDetail(os.Stdout, "json", list) + default: + var table string + table, err = gocsv.MarshalString(list) + if err != nil { + break + } + + utils.PrintTable(table) + } + + if err != nil { + return err + } + + return nil + }, +} + +func init() { + listOutputFormat = outputFunc(ListCmd) + listFilterType = filterFunc(ListCmd) + SchedulerCmd.AddCommand(ListCmd) +} diff --git a/cmd/scheduler/scheduler.go b/cmd/scheduler/scheduler.go new file mode 100644 index 000000000..8fae6bdfa --- /dev/null +++ b/cmd/scheduler/scheduler.go @@ -0,0 +1,108 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "errors" + "fmt" + "slices" + "strings" + + "github.com/spf13/cobra" + + "github.com/dapr/cli/pkg/scheduler" +) + +const ( + outputFormatShort = "short" + outputFormatWide = "wide" + outputFormatYAML = "yaml" + outputFormatJSON = "json" +) + +var ( + daprNamespace string + schedulerNamespace string + kubernetesMode bool +) + +var SchedulerCmd = &cobra.Command{ + Use: "scheduler", + Short: "Scheduler management commands. Use -k to target a Kubernetes Dapr cluster.", + Aliases: []string{"sched"}, +} + +func init() { + SchedulerCmd.PersistentFlags().BoolVarP(&kubernetesMode, "kubernetes", "k", false, "Perform scheduler command on a Kubernetes Dapr cluster") + SchedulerCmd.PersistentFlags().StringVarP(&daprNamespace, "namespace", "n", "default", "Namespace of the Dapr application") + SchedulerCmd.PersistentFlags().StringVar(&schedulerNamespace, "scheduler-namespace", "dapr-system", "Kubernetes namespace where the scheduler is deployed, only relevant if --kubernetes is set") +} + +func outputFunc(cmd *cobra.Command) *string { + + outputs := []string{ + outputFormatShort, + outputFormatWide, + outputFormatYAML, + outputFormatJSON, + } + + var outputFormat string + cmd.Flags().StringVarP(&outputFormat, "output", "o", outputFormatShort, fmt.Sprintf("Output format. One of %s", + strings.Join(outputs, ", ")), + ) + + pre := cmd.PreRunE + cmd.PreRunE = func(cmd *cobra.Command, args []string) error { + if !slices.Contains(outputs, outputFormat) { + return errors.New("invalid value for --output. Supported values are " + strings.Join(outputs, ", ")) + } + + if pre != nil { + return pre(cmd, args) + } + return nil + } + + return &outputFormat +} + +func filterFunc(cmd *cobra.Command) *string { + all := []string{ + scheduler.FilterAll, + scheduler.FilterApp, + scheduler.FilterActor, + scheduler.FilterWorkflow, + scheduler.FilterActivity, + } + + var filterType string + cmd.Flags().StringVar(&filterType, "filter", scheduler.FilterAll, + fmt.Sprintf("Filter jobs by type. Supported values are %s\n", strings.Join(all, ", ")), + ) + + pre := cmd.PreRunE + cmd.PreRunE = func(cmd *cobra.Command, args []string) error { + if !slices.Contains(all, filterType) { + return errors.New("invalid value for --filter. Supported values are " + strings.Join(all, ", ")) + } + + if pre != nil { + return pre(cmd, args) + } + return nil + } + + return &filterType +} diff --git a/go.mod b/go.mod index 573b27be8..6cee22c95 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/dapr/dapr v1.16.0 github.com/dapr/go-sdk v1.13.0 github.com/dapr/kit v0.16.1 + github.com/diagridio/go-etcd-cron v0.9.1 github.com/docker/docker v25.0.6+incompatible github.com/evanphx/json-patch/v5 v5.9.0 github.com/fatih/color v1.17.0 @@ -27,8 +28,10 @@ require ( github.com/spf13/pflag v1.0.6 github.com/spf13/viper v1.13.0 github.com/stretchr/testify v1.10.0 + go.etcd.io/etcd/client/v3 v3.5.21 golang.org/x/mod v0.25.0 golang.org/x/sys v0.33.0 + google.golang.org/protobuf v1.36.6 gopkg.in/yaml.v2 v2.4.0 helm.sh/helm/v3 v3.17.1 k8s.io/api v0.32.1 @@ -72,6 +75,8 @@ require ( github.com/containerd/errdefs v0.3.0 // indirect github.com/containerd/log v0.1.0 // indirect github.com/containerd/platforms v0.2.1 // indirect + github.com/coreos/go-semver v0.3.1 // indirect + github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cyphar/filepath-securejoin v0.3.6 // indirect github.com/dapr/components-contrib v1.16.0 // indirect github.com/dapr/durabletask-go v0.10.0 // indirect @@ -201,6 +206,8 @@ require ( github.com/xlab/treeprint v1.2.0 // indirect github.com/yusufpapurcu/wmi v1.2.3 // indirect github.com/zeebo/errs v1.4.0 // indirect + go.etcd.io/etcd/api/v3 v3.5.21 // indirect + go.etcd.io/etcd/client/pkg/v3 v3.5.21 // indirect go.mongodb.org/mongo-driver v1.14.0 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect @@ -215,6 +222,7 @@ require ( go.opentelemetry.io/otel/trace v1.36.0 // indirect go.opentelemetry.io/proto/otlp v1.6.0 // indirect go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect golang.org/x/crypto v0.39.0 // indirect golang.org/x/exp v0.0.0-20250408133849-7e4ce0ab07d0 // indirect golang.org/x/net v0.41.0 // indirect @@ -226,7 +234,6 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20250512202823-5a2f75b736a9 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect google.golang.org/grpc v1.73.0 // indirect - google.golang.org/protobuf v1.36.6 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect diff --git a/go.sum b/go.sum index f148519f1..fc262641b 100644 --- a/go.sum +++ b/go.sum @@ -142,6 +142,10 @@ github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= github.com/containerd/platforms v0.2.1 h1:zvwtM3rz2YHPQsF2CHYM8+KtB5dvhISiXh5ZpSBQv6A= github.com/containerd/platforms v0.2.1/go.mod h1:XHCb+2/hzowdiut9rkudds9bE5yJ7npe7dG/wG+uFPw= +github.com/coreos/go-semver v0.3.1 h1:yi21YpKnrx1gt5R+la8n5WgS0kCrsPp33dmEyHReZr4= +github.com/coreos/go-semver v0.3.1/go.mod h1:irMmmIw/7yzSRPWryHsK7EYSg09caPQL03VsM8rvUec= +github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY= @@ -164,6 +168,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 h1:8UrgZ3GkP4i/CLijOJx79Yu+etlyjdBU4sfcs2WYQMs= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= +github.com/diagridio/go-etcd-cron v0.9.1 h1:KUfcceDtypL8s3hL0jD2ZoiIzjjXY6xDQ4kT1DJF4Ws= +github.com/diagridio/go-etcd-cron v0.9.1/go.mod h1:CSzuxoCDFu+Gbds0RO73GE8CnmL5t85axiPLptsej3I= github.com/distribution/distribution/v3 v3.0.0-20221208165359-362910506bc2 h1:aBfCb7iqHmDEIp6fBvC/hQUddQfg+3qdYjwzaiP9Hnc= github.com/distribution/distribution/v3 v3.0.0-20221208165359-362910506bc2/go.mod h1:WHNsWjnIn2V1LYOrME7e8KxSeKunYHsxEm4am0BUtcI= github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= @@ -271,6 +277,7 @@ github.com/gocarina/gocsv v0.0.0-20220927221512-ad3251f9fa25 h1:wxgEEZvsnOTrDO2n github.com/gocarina/gocsv v0.0.0-20220927221512-ad3251f9fa25/go.mod h1:5YoVOkjYAQumqlV356Hj3xeYh4BdZuLE0/nRkf2NKkI= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= @@ -670,6 +677,12 @@ github.com/yvasiyarov/newrelic_platform_go v0.0.0-20140908184405-b21fdbd4370f h1 github.com/yvasiyarov/newrelic_platform_go v0.0.0-20140908184405-b21fdbd4370f/go.mod h1:GlGEuHIJweS1mbCqG+7vt2nvWLzLLnRHbXz5JKd/Qbg= github.com/zeebo/errs v1.4.0 h1:XNdoD/RRMKP7HD0UhJnIzUy74ISdGGxURlYG8HSWSfM= github.com/zeebo/errs v1.4.0/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4= +go.etcd.io/etcd/api/v3 v3.5.21 h1:A6O2/JDb3tvHhiIz3xf9nJ7REHvtEFJJ3veW3FbCnS8= +go.etcd.io/etcd/api/v3 v3.5.21/go.mod h1:c3aH5wcvXv/9dqIw2Y810LDXJfhSYdHQ0vxmP3CCHVY= +go.etcd.io/etcd/client/pkg/v3 v3.5.21 h1:lPBu71Y7osQmzlflM9OfeIV2JlmpBjqBNlLtcoBqUTc= +go.etcd.io/etcd/client/pkg/v3 v3.5.21/go.mod h1:BgqT/IXPjK9NkeSDjbzwsHySX3yIle2+ndz28nVsjUs= +go.etcd.io/etcd/client/v3 v3.5.21 h1:T6b1Ow6fNjOLOtM0xSoKNQt1ASPCLWrF9XMHcH9pEyY= +go.etcd.io/etcd/client/v3 v3.5.21/go.mod h1:mFYy67IOqmbRf/kRUvsHixzo3iG+1OF2W2+jVIQRAnU= go.mongodb.org/mongo-driver v1.14.0 h1:P98w8egYRjYe3XDjxhYJagTokP/H6HzlsnojRgZRd80= go.mongodb.org/mongo-driver v1.14.0/go.mod h1:Vzb0Mk/pa7e6cWw85R4F/endUC3u0U9jGcNU603k65c= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= diff --git a/pkg/kubernetes/components_test.go b/pkg/kubernetes/components_test.go index 1568a9a8d..dd7dc3880 100644 --- a/pkg/kubernetes/components_test.go +++ b/pkg/kubernetes/components_test.go @@ -40,7 +40,7 @@ func TestComponents(t *testing.T) { name: "List one config", configName: "", outputFormat: "", - expectedOutput: " NAMESPACE NAME TYPE VERSION SCOPES CREATED AGE \n default appConfig state.redis v1 " + formattedNow + " 0s \n", + expectedOutput: "NAMESPACE NAME TYPE VERSION SCOPES CREATED AGE \ndefault appConfig state.redis v1 " + formattedNow + " 0s \n", errString: "", errorExpected: false, k8sConfig: []v1alpha1.Component{ @@ -70,7 +70,7 @@ func TestComponents(t *testing.T) { name: "Filters out daprsystem", configName: "", outputFormat: "", - expectedOutput: " NAMESPACE NAME TYPE VERSION SCOPES CREATED AGE \n default appConfig state.redis v1 " + formattedNow + " 0s \n", + expectedOutput: "NAMESPACE NAME TYPE VERSION SCOPES CREATED AGE \ndefault appConfig state.redis v1 " + formattedNow + " 0s \n", errString: "", errorExpected: false, k8sConfig: []v1alpha1.Component{ @@ -98,7 +98,7 @@ func TestComponents(t *testing.T) { name: "Name does match", configName: "appConfig", outputFormat: "list", - expectedOutput: " NAMESPACE NAME TYPE VERSION SCOPES CREATED AGE \n default appConfig state.redis v1 " + formattedNow + " 0s \n", + expectedOutput: "NAMESPACE NAME TYPE VERSION SCOPES CREATED AGE \ndefault appConfig state.redis v1 " + formattedNow + " 0s \n", errString: "", errorExpected: false, k8sConfig: []v1alpha1.Component{ @@ -119,7 +119,7 @@ func TestComponents(t *testing.T) { name: "Name does not match", configName: "appConfig", outputFormat: "list", - expectedOutput: " NAMESPACE NAME TYPE VERSION SCOPES CREATED AGE \n", + expectedOutput: "NAMESPACE NAME TYPE VERSION SCOPES CREATED AGE \n", errString: "", errorExpected: false, k8sConfig: []v1alpha1.Component{ diff --git a/pkg/kubernetes/configurations_test.go b/pkg/kubernetes/configurations_test.go index a48d655bb..8d62a3f51 100644 --- a/pkg/kubernetes/configurations_test.go +++ b/pkg/kubernetes/configurations_test.go @@ -41,7 +41,7 @@ func TestConfigurations(t *testing.T) { name: "List one config", configName: "", outputFormat: "", - expectedOutput: " NAMESPACE NAME TRACING-ENABLED METRICS-ENABLED AGE CREATED \n default appConfig false false 0s " + formattedNow + " \n", + expectedOutput: "NAMESPACE NAME TRACING-ENABLED METRICS-ENABLED AGE CREATED \ndefault appConfig false false 0s " + formattedNow + " \n", errString: "", errorExpected: false, k8sConfig: []v1alpha1.Configuration{ @@ -68,7 +68,7 @@ func TestConfigurations(t *testing.T) { name: "Filters out daprsystem", configName: "", outputFormat: "", - expectedOutput: " NAMESPACE NAME TRACING-ENABLED METRICS-ENABLED AGE CREATED \n default appConfig false false 0s " + formattedNow + " \n", + expectedOutput: "NAMESPACE NAME TRACING-ENABLED METRICS-ENABLED AGE CREATED \ndefault appConfig false false 0s " + formattedNow + " \n", errString: "", errorExpected: false, k8sConfig: []v1alpha1.Configuration{ @@ -94,7 +94,7 @@ func TestConfigurations(t *testing.T) { name: "Name does match", configName: "appConfig", outputFormat: "list", - expectedOutput: " NAMESPACE NAME TRACING-ENABLED METRICS-ENABLED AGE CREATED \n default appConfig false false 0s " + formattedNow + " \n", + expectedOutput: "NAMESPACE NAME TRACING-ENABLED METRICS-ENABLED AGE CREATED \ndefault appConfig false false 0s " + formattedNow + " \n", errString: "", errorExpected: false, k8sConfig: []v1alpha1.Configuration{ @@ -112,7 +112,7 @@ func TestConfigurations(t *testing.T) { name: "Name does not match", configName: "appConfig", outputFormat: "list", - expectedOutput: " NAMESPACE NAME TRACING-ENABLED METRICS-ENABLED AGE CREATED \n", + expectedOutput: "NAMESPACE NAME TRACING-ENABLED METRICS-ENABLED AGE CREATED \n", errString: "", errorExpected: false, k8sConfig: []v1alpha1.Configuration{ diff --git a/pkg/scheduler/delete.go b/pkg/scheduler/delete.go new file mode 100644 index 000000000..342c9bf2b --- /dev/null +++ b/pkg/scheduler/delete.go @@ -0,0 +1,70 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "context" + "fmt" + "os" + + clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/dapr/cli/pkg/print" +) + +type DeleteOptions struct { + SchedulerNamespace string + DaprNamespace string + KubernetesMode bool +} + +func Delete(ctx context.Context, opts DeleteOptions, keys ...string) error { + etcdClient, cancel, err := etcdClient(opts.KubernetesMode, opts.SchedulerNamespace) + if err != nil { + return err + } + defer cancel() + + for _, key := range keys { + if err = delSingle(ctx, etcdClient, key, opts); err != nil { + return err + } + + print.InfoStatusEvent(os.Stdout, "Deleted %s in namespace '%s'.", key, opts.DaprNamespace) + } + + return nil +} + +func delSingle(ctx context.Context, client *clientv3.Client, key string, opts DeleteOptions) error { + jobKey, err := parseJobKey(key) + if err != nil { + return err + } + + paths := pathsFromJobKey(jobKey, opts.DaprNamespace) + resp, err := client.Txn(ctx).Then( + clientv3.OpDelete(paths[0]), + clientv3.OpDelete(paths[1]), + ).Commit() + if err != nil { + return err + } + + if len(resp.Responses) == 0 || resp.Responses[0].GetResponseDeleteRange().Deleted == 0 { + return fmt.Errorf("no job with key '%s' found in namespace '%s'", key, opts.DaprNamespace) + } + + return nil +} diff --git a/pkg/scheduler/deleteall.go b/pkg/scheduler/deleteall.go new file mode 100644 index 000000000..25170fcb1 --- /dev/null +++ b/pkg/scheduler/deleteall.go @@ -0,0 +1,139 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "context" + "fmt" + "os" + "strings" + + clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/dapr/cli/pkg/print" +) + +func DeleteAll(ctx context.Context, opts DeleteOptions, key string) error { + etcdClient, cancel, err := etcdClient(opts.KubernetesMode, opts.SchedulerNamespace) + if err != nil { + return err + } + defer cancel() + + split := strings.Split(key, "/") + + var paths []string + switch split[0] { + case "all": + if len(split) != 1 { + return fmt.Errorf("invalid key format: %s", key) + } + paths = []string{ + fmt.Sprintf("dapr/jobs/app||%s||", opts.DaprNamespace), + fmt.Sprintf("dapr/jobs/actorreminder||%s||", opts.DaprNamespace), + fmt.Sprintf("dapr/counters/app||%s||", opts.DaprNamespace), + fmt.Sprintf("dapr/counters/actorreminder||%s||", opts.DaprNamespace), + } + case "app": + switch len(split) { + case 1: + paths = []string{ + fmt.Sprintf("dapr/jobs/app||%s||", opts.DaprNamespace), + fmt.Sprintf("dapr/counters/app||%s||", opts.DaprNamespace), + } + case 2: + paths = []string{ + fmt.Sprintf("dapr/jobs/app||%s||%s||", opts.DaprNamespace, split[1]), + fmt.Sprintf("dapr/counters/app||%s||%s||", opts.DaprNamespace, split[1]), + } + default: + return fmt.Errorf("invalid key format: %s", key) + } + + case "actor": + switch len(split) { + case 2: + paths = []string{ + fmt.Sprintf("dapr/jobs/actorreminder||%s||%s||", opts.DaprNamespace, split[1]), + fmt.Sprintf("dapr/counters/actorreminder||%s||%s||", opts.DaprNamespace, split[1]), + } + case 3: + paths = []string{ + fmt.Sprintf("dapr/jobs/actorreminder||%s||%s||%s||", opts.DaprNamespace, split[1], split[2]), + fmt.Sprintf("dapr/counters/actorreminder||%s||%s||%s||", opts.DaprNamespace, split[1], split[2]), + } + default: + return fmt.Errorf("invalid key format: %s", key) + } + + case "workflow": + switch len(split) { + case 1: + paths = []string{ + fmt.Sprintf("dapr/jobs/actorreminder||%s||dapr.internal.%s.", opts.DaprNamespace, opts.DaprNamespace), + fmt.Sprintf("dapr/counters/actorreminder||%s||dapr.internal.%s.", opts.DaprNamespace, opts.DaprNamespace), + } + case 2: + paths = []string{ + fmt.Sprintf("dapr/jobs/actorreminder||%s||dapr.internal.%s.%s.workflow||", opts.DaprNamespace, opts.DaprNamespace, split[1]), + fmt.Sprintf("dapr/jobs/actorreminder||%s||dapr.internal.%s.%s.activity||", opts.DaprNamespace, opts.DaprNamespace, split[1]), + fmt.Sprintf("dapr/counters/actorreminder||%s||dapr.internal.%s.%s.workflow||", opts.DaprNamespace, opts.DaprNamespace, split[1]), + fmt.Sprintf("dapr/counters/actorreminder||%s||dapr.internal.%s.%s.activity||", opts.DaprNamespace, opts.DaprNamespace, split[1]), + } + case 3: + paths = []string{ + fmt.Sprintf("dapr/jobs/actorreminder||%s||dapr.internal.%s.%s.workflow||%s||", opts.DaprNamespace, opts.DaprNamespace, split[1], split[2]), + fmt.Sprintf("dapr/jobs/actorreminder||%s||dapr.internal.%s.%s.activity||%s::", opts.DaprNamespace, opts.DaprNamespace, split[1], split[2]), + fmt.Sprintf("dapr/counters/actorreminder||%s||dapr.internal.%s.%s.workflow||%s||", opts.DaprNamespace, opts.DaprNamespace, split[1], split[2]), + fmt.Sprintf("dapr/counters/actorreminder||%s||dapr.internal.%s.%s.activity||%s::", opts.DaprNamespace, opts.DaprNamespace, split[1], split[2]), + } + default: + return fmt.Errorf("invalid key format: %s", key) + } + + default: + return fmt.Errorf("unknown key prefix: %s", split[0]) + } + + oopts := make([]clientv3.Op, 0, len(paths)) + for _, path := range paths { + oopts = append(oopts, clientv3.OpDelete(path, + clientv3.WithPrefix(), + clientv3.WithPrevKV(), + clientv3.WithKeysOnly(), + )) + } + + resp, err := etcdClient.Txn(ctx).Then(oopts...).Commit() + if err != nil { + return err + } + + // Only count actual jobs, not counters. + var deleted int64 + toCount := resp.Responses[:1] + if len(paths) > 2 { + toCount = resp.Responses[:2] + } + for _, resp := range toCount { + for _, kv := range resp.GetResponseDeleteRange().GetPrevKvs() { + print.InfoStatusEvent(os.Stdout, "Deleted job '%s'.", kv.Key) + } + deleted += resp.GetResponseDeleteRange().Deleted + } + + print.InfoStatusEvent(os.Stdout, "Deleted %d jobs in namespace '%s'.", deleted, opts.DaprNamespace) + + return nil +} diff --git a/pkg/scheduler/exportimport.go b/pkg/scheduler/exportimport.go new file mode 100644 index 000000000..a54ae95b4 --- /dev/null +++ b/pkg/scheduler/exportimport.go @@ -0,0 +1,153 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "context" + "encoding/gob" + "errors" + "fmt" + "os" + + clientv3 "go.etcd.io/etcd/client/v3" + "google.golang.org/protobuf/proto" + + "github.com/dapr/cli/pkg/print" + "github.com/dapr/cli/pkg/scheduler/stored" +) + +type ExportImportOptions struct { + SchedulerNamespace string + KubernetesMode bool + TargetFile string +} + +type ExportFile struct { + Jobs map[string][]byte + Counters map[string][]byte +} + +func Export(ctx context.Context, opts ExportImportOptions) error { + if _, err := os.Stat(opts.TargetFile); !errors.Is(err, os.ErrNotExist) { + if err == nil { + return fmt.Errorf("file '%s' already exists", opts.TargetFile) + } + return err + } + + client, cancel, err := etcdClient(opts.KubernetesMode, opts.SchedulerNamespace) + if err != nil { + return err + } + defer cancel() + + jobs, err := listJobs(ctx, client) + if err != nil { + return err + } + counters, err := listCounters(ctx, client) + if err != nil { + return err + } + + out := ExportFile{ + Jobs: make(map[string][]byte, len(jobs)), + Counters: make(map[string][]byte, len(counters)), + } + + var b []byte + for k, j := range jobs { + b, err = proto.Marshal(j) + if err != nil { + return fmt.Errorf("marshal job %q: %w", k, err) + } + out.Jobs[k] = b + } + for k, c := range counters { + b, err = proto.Marshal(c) + if err != nil { + return fmt.Errorf("marshal counter %q: %w", k, err) + } + out.Counters[k] = b + } + + f, err := os.OpenFile(opts.TargetFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o600) + if err != nil { + return fmt.Errorf("open %s: %w", opts.TargetFile, err) + } + defer f.Close() + + if err := gob.NewEncoder(f).Encode(&out); err != nil { + _ = os.Remove(opts.TargetFile) + return fmt.Errorf("encode export file: %w", err) + } + + print.InfoStatusEvent(os.Stdout, "Exported %d jobs and %d counters.", len(out.Jobs), len(out.Counters)) + return nil +} + +func Import(ctx context.Context, opts ExportImportOptions) error { + client, cancel, err := etcdClient(opts.KubernetesMode, opts.SchedulerNamespace) + if err != nil { + return err + } + defer cancel() + + f, err := os.OpenFile(opts.TargetFile, os.O_RDONLY, 0o600) + if err != nil { + return fmt.Errorf("open %s: %w", opts.TargetFile, err) + } + defer f.Close() + + var in ExportFile + if err := gob.NewDecoder(f).Decode(&in); err != nil { + return fmt.Errorf("decode import file: %w", err) + } + + ops := make([]clientv3.Op, 0, len(in.Jobs)+len(in.Counters)) + + for key, b := range in.Jobs { + var j stored.Job + if err := proto.Unmarshal(b, &j); err != nil { + return fmt.Errorf("unmarshal job %q: %w", key, err) + } + ops = append(ops, clientv3.OpPut(key, string(b))) + } + + for key, b := range in.Counters { + var c stored.Counter + if err := proto.Unmarshal(b, &c); err != nil { + return fmt.Errorf("unmarshal counter %q: %w", key, err) + } + ops = append(ops, clientv3.OpPut(key, string(b))) + } + + var end int + for i := 0; i < len(ops); i += 128 { + txn := client.Txn(ctx) + end = i + 128 + if end > len(ops) { + end = len(ops) + } + txn.Then(ops[i:end]...) + if _, err := txn.Commit(); err != nil { + print.FailureStatusEvent(os.Stderr, "Incomplete import with %d items.", end) + return fmt.Errorf("commit transaction: %w", err) + } + } + + print.InfoStatusEvent(os.Stdout, "Imported %d items.", end) + + return nil +} diff --git a/pkg/scheduler/get.go b/pkg/scheduler/get.go new file mode 100644 index 000000000..5613111e0 --- /dev/null +++ b/pkg/scheduler/get.go @@ -0,0 +1,141 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "context" + "fmt" + + "github.com/dapr/cli/pkg/scheduler/stored" + clientv3 "go.etcd.io/etcd/client/v3" + "google.golang.org/protobuf/proto" +) + +type GetOptions struct { + SchedulerNamespace string + DaprNamespace string + KubernetesMode bool +} + +func Get(ctx context.Context, opts GetOptions, keys ...string) ([]*ListOutput, error) { + list, err := GetWide(ctx, opts, keys...) + if err != nil { + return nil, err + } + + return listWideToShort(list) +} + +func GetWide(ctx context.Context, opts GetOptions, keys ...string) ([]*ListOutputWide, error) { + etcdClient, cancel, err := etcdClient(opts.KubernetesMode, opts.SchedulerNamespace) + if err != nil { + return nil, err + } + defer cancel() + + results := make([]*ListOutputWide, 0, len(keys)) + for _, key := range keys { + wide, err := getSingle(ctx, etcdClient, key, opts) + if err != nil { + return nil, err + } + + results = append(results, wide) + } + + return results, nil +} + +func getSingle(ctx context.Context, cl *clientv3.Client, key string, opts GetOptions) (*ListOutputWide, error) { + jobKey, err := parseJobKey(key) + if err != nil { + return nil, err + } + + paths := pathsFromJobKey(jobKey, opts.DaprNamespace) + + resp, err := cl.Txn(ctx).Then( + clientv3.OpGet(paths[0]), + clientv3.OpGet(paths[1]), + ).Commit() + if err != nil { + return nil, err + } + + if len(resp.Responses[0].GetResponseRange().Kvs) == 0 { + return nil, fmt.Errorf("job '%s' not found", key) + } + + var storedJ stored.Job + if err = proto.Unmarshal(resp.Responses[0].GetResponseRange().Kvs[0].Value, &storedJ); err != nil { + return nil, err + } + + var storedC stored.Counter + if kvs := resp.Responses[1].GetResponseRange().Kvs; len(kvs) > 0 { + if err = proto.Unmarshal(kvs[0].Value, &storedC); err != nil { + return nil, err + } + } + + return parseJob(&JobCount{ + Key: paths[0], + Job: &storedJ, + Counter: &storedC, + }, Filter{ + Type: FilterAll, + }) +} + +func pathsFromJobKey(jobKey *jobKey, namespace string) [2]string { + const reminderPath = "dapr/jobs/actorreminder" + const reminderCounterPath = "dapr/counters/actorreminder" + + var paths [2]string + switch { + case jobKey.actorType != nil: + paths[0] = fmt.Sprintf("%s||%s||%s||%s||%s", + reminderPath, namespace, *jobKey.actorType, *jobKey.actorID, jobKey.name, + ) + paths[1] = fmt.Sprintf("%s||%s||%s||%s||%s", + reminderCounterPath, namespace, *jobKey.actorType, *jobKey.actorID, jobKey.name, + ) + + case jobKey.activity: + actorType := fmt.Sprintf("dapr.internal.%s.%s.activity", namespace, *jobKey.appID) + actorID := jobKey.name + paths[0] = fmt.Sprintf("%s||%s||%s||%s||run-activity", + reminderPath, namespace, actorType, actorID, + ) + paths[1] = fmt.Sprintf("%s||%s||%s||%s||run-activity", + reminderCounterPath, namespace, actorType, actorID, + ) + + case jobKey.instanceID != nil: + actorType := fmt.Sprintf("dapr.internal.%s.%s.workflow", namespace, *jobKey.appID) + actorID := *jobKey.instanceID + paths[0] = fmt.Sprintf("%s||%s||%s||%s||%s", + reminderPath, namespace, actorType, actorID, jobKey.name, + ) + paths[1] = fmt.Sprintf("%s||%s||%s||%s||%s", + reminderCounterPath, namespace, actorType, actorID, jobKey.name, + ) + + default: + paths[0] = fmt.Sprintf("dapr/jobs/app||%s||%s||%s", namespace, *jobKey.appID, jobKey.name) + paths[1] = fmt.Sprintf("dapr/counters/app||%s||%s||%s", namespace, *jobKey.appID, jobKey.name) + } + + return paths +} diff --git a/pkg/scheduler/list.go b/pkg/scheduler/list.go new file mode 100644 index 000000000..bd4854020 --- /dev/null +++ b/pkg/scheduler/list.go @@ -0,0 +1,212 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "context" + "fmt" + "sort" + "strings" + "time" + + clientv3 "go.etcd.io/etcd/client/v3" + "google.golang.org/protobuf/proto" + + "github.com/dapr/cli/pkg/scheduler/stored" + "github.com/dapr/cli/utils" +) + +type ListOptions struct { + SchedulerNamespace string + KubernetesMode bool + Filter Filter +} + +type ListOutputWide struct { + Namespace string `csv:"NAMESPACE" json:"namespace" yaml:"namespace"` + Name string `csv:"NAME" json:"name" yaml:"name"` + Begin time.Time `csv:"BEGIN" json:"begin" yaml:"begin,omitempty"` + Expiration *time.Time `csv:"EXPIRATION" json:"expiration" yaml:"expiration,omitempty"` + Schedule *string `csv:"SCHEDULE" json:"schedule" yaml:"schedule,omitempty"` + DueTime *string `csv:"DUE TIME" json:"dueTime" yaml:"dueTime,omitempty"` + TTL *string `csv:"TTL" json:"ttl" yaml:"ttl,omitempty"` + Repeats *uint32 `csv:"REPEATS" json:"repeats" yaml:"repeats,omitempty"` + Count uint32 `csv:"COUNT" json:"count" yaml:"count,omitempty"` + LastTrigger *time.Time `csv:"LAST TRIGGER" json:"lastTrigger,omitempty" yaml:"lastTrigger,omitempty"` +} + +type ListOutput struct { + Name string `csv:"NAME" json:"name" yaml:"name"` + Begin string `csv:"BEGIN" json:"begin" yaml:"begin,omitempty"` + Count uint32 `csv:"COUNT" json:"count" yaml:"count,omitempty"` + LastTrigger string `csv:"LAST TRIGGER" json:"lastTrigger" yaml:"lastTrigger"` +} + +type JobCount struct { + Key string + Job *stored.Job + Counter *stored.Counter +} + +func List(ctx context.Context, opts ListOptions) ([]*ListOutput, error) { + listWide, err := ListWide(ctx, opts) + if err != nil { + return nil, err + } + + return listWideToShort(listWide) +} + +func ListWide(ctx context.Context, opts ListOptions) ([]*ListOutputWide, error) { + jobCounters, err := ListJobs(ctx, opts) + if err != nil { + return nil, err + } + + var list []*ListOutputWide + for _, jobCounter := range jobCounters { + listoutput, err := parseJob(jobCounter, opts.Filter) + if err != nil { + return nil, err + } + + if listoutput == nil { + continue + } + + list = append(list, listoutput) + } + + sort.SliceStable(list, func(i, j int) bool { + if list[i].Namespace == list[j].Namespace { + if list[i].Begin.Equal(list[j].Begin) { + return list[i].Name < list[j].Name + } + return list[i].Begin.Before(list[j].Begin) + } + return list[i].Namespace < list[j].Namespace + }) + + return list, nil +} + +func ListJobs(ctx context.Context, opts ListOptions) ([]*JobCount, error) { + etcdClient, cancel, err := etcdClient(opts.KubernetesMode, opts.SchedulerNamespace) + if err != nil { + return nil, err + } + defer cancel() + + jobs, err := listJobs(ctx, etcdClient) + if err != nil { + return nil, err + } + + counters, err := listCounters(ctx, etcdClient) + if err != nil { + return nil, err + } + + jobCounts := make([]*JobCount, 0, len(jobs)) + for key, job := range jobs { + jobCount := &JobCount{ + Key: key, + Job: job, + } + + counter, ok := counters[strings.ReplaceAll(key, "dapr/jobs/", "dapr/counters/")] + if ok { + jobCount.Counter = counter + } + + jobCounts = append(jobCounts, jobCount) + } + + return jobCounts, nil +} + +func listWideToShort(listWide []*ListOutputWide) ([]*ListOutput, error) { + now := time.Now() + list := make([]*ListOutput, 0, len(listWide)) + for _, item := range listWide { + if item == nil { + continue + } + + l := ListOutput{ + Name: item.Name, + Count: item.Count, + } + + if item.LastTrigger != nil { + l.LastTrigger = "-" + utils.HumanizeDuration(now.Sub(*item.LastTrigger)) + } + + if item.Begin.After(now) { + l.Begin = "+" + utils.HumanizeDuration(item.Begin.Sub(now)) + } else { + l.Begin = "-" + utils.HumanizeDuration(now.Sub(item.Begin)) + } + + list = append(list, &l) + } + + return list, nil +} + +func listJobs(ctx context.Context, client *clientv3.Client) (map[string]*stored.Job, error) { + resp, err := client.Get(ctx, + "dapr/jobs/", + clientv3.WithPrefix(), + clientv3.WithLimit(0), + ) + if err != nil { + return nil, err + } + + jobs := make(map[string]*stored.Job) + for _, kv := range resp.Kvs { + var stored stored.Job + if err := proto.Unmarshal(kv.Value, &stored); err != nil { + return nil, fmt.Errorf("failed to unmarshal job %s: %w", kv.Key, err) + } + + jobs[string(kv.Key)] = &stored + } + + return jobs, nil +} + +func listCounters(ctx context.Context, client *clientv3.Client) (map[string]*stored.Counter, error) { + resp, err := client.Get(ctx, + "dapr/counters/", + clientv3.WithPrefix(), + clientv3.WithLimit(0), + ) + if err != nil { + return nil, err + } + + counters := make(map[string]*stored.Counter) + for _, kv := range resp.Kvs { + var stored stored.Counter + if err := proto.Unmarshal(kv.Value, &stored); err != nil { + return nil, fmt.Errorf("failed to unmarshal counter %s: %w", kv.Key, err) + } + + counters[string(kv.Key)] = &stored + } + + return counters, nil +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go new file mode 100644 index 000000000..8281980a2 --- /dev/null +++ b/pkg/scheduler/scheduler.go @@ -0,0 +1,255 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "context" + "fmt" + "strings" + "time" + + clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/dapr/cli/pkg/kubernetes" + "github.com/dapr/cli/pkg/scheduler/stored" + schedulerv1 "github.com/dapr/dapr/pkg/proto/scheduler/v1" + "github.com/dapr/kit/ptr" +) + +const ( + FilterAll = "all" + FilterApp = "app" + FilterActor = "actor" + FilterWorkflow = "workflow" + FilterActivity = "activity" +) + +type Filter struct { + Type string + Namespace *string +} + +type jobKey struct { + appID *string + + actorType *string + actorID *string + + instanceID *string + activity bool + + name string +} + +func parseJob(jobCounter *JobCount, opts Filter) (*ListOutputWide, error) { + var meta schedulerv1.JobMetadata + if err := jobCounter.Job.GetJob().GetMetadata().UnmarshalTo(&meta); err != nil { + return nil, err + } + + if opts.Type != FilterAll { + switch meta.GetTarget().GetType().(type) { + case *schedulerv1.JobTargetMetadata_Job: + if opts.Type != FilterApp { + return nil, nil + } + case *schedulerv1.JobTargetMetadata_Actor: + atype := meta.GetTarget().GetActor().GetType() + switch { + case strings.HasPrefix(atype, "dapr.internal.") && strings.HasSuffix(atype, ".workflow"): + if opts.Type != FilterWorkflow { + return nil, nil + } + case strings.HasPrefix(atype, "dapr.internal.") && strings.HasSuffix(atype, ".activity"): + if opts.Type != FilterActivity { + return nil, nil + } + default: + if opts.Type != FilterActor { + return nil, nil + } + } + } + } + + if opts.Namespace != nil && meta.GetNamespace() != *opts.Namespace { + return nil, nil + } + + listoutput := ListOutputWide{ + Name: jobCounter.Key[(strings.LastIndex(jobCounter.Key, "||") + 2):], + Namespace: meta.GetNamespace(), + Schedule: jobCounter.Job.GetJob().Schedule, + DueTime: jobCounter.Job.GetJob().DueTime, + TTL: jobCounter.Job.GetJob().Ttl, + Repeats: jobCounter.Job.GetJob().Repeats, + } + + switch meta.GetTarget().GetType().(type) { + case *schedulerv1.JobTargetMetadata_Job: + listoutput.Name = "app/" + meta.GetAppId() + "/" + listoutput.Name + case *schedulerv1.JobTargetMetadata_Actor: + atype := meta.GetTarget().GetActor().GetType() + switch { + case strings.HasPrefix(atype, "dapr.internal.") && strings.HasSuffix(atype, ".workflow"): + listoutput.Name = "workflow/" + fmt.Sprintf("%s/%s/%s", + strings.Split(atype, ".")[3], meta.GetTarget().GetActor().GetId(), + listoutput.Name, + ) + case strings.HasPrefix(atype, "dapr.internal.") && strings.HasSuffix(atype, ".activity"): + listoutput.Name = "activity/" + fmt.Sprintf("%s/%s", + strings.Split(atype, ".")[3], meta.GetTarget().GetActor().GetId(), + ) + default: + listoutput.Name = "actor/" + fmt.Sprintf("%s/%s/%s", + meta.GetTarget().GetActor().GetType(), + meta.GetTarget().GetActor().GetId(), + listoutput.Name, + ) + } + } + + switch t := jobCounter.Job.GetBegin().(type) { + case *stored.Job_DueTime: + listoutput.Begin = t.DueTime.AsTime().Truncate(time.Second) + case *stored.Job_Start: + listoutput.Begin = t.Start.AsTime().Truncate(time.Second) + } + + if jobCounter.Job.Expiration != nil { + listoutput.Expiration = ptr.Of(jobCounter.Job.Expiration.AsTime().Truncate(time.Second)) + } + + if jobCounter.Counter != nil { + listoutput.Count = jobCounter.Counter.Count + if jobCounter.Counter.LastTrigger != nil { + listoutput.LastTrigger = ptr.Of(jobCounter.Counter.LastTrigger.AsTime().Truncate(time.Second)) + } + } + + return &listoutput, nil +} + +func parseJobKey(key string) (*jobKey, error) { + split := strings.Split(key, "/") + if len(split) < 2 { + return nil, fmt.Errorf("failed to parse job key, expecting '{target type}/{identifier}', got '%s'", key) + } + + switch split[0] { + case FilterApp: + if len(split) != 3 { + return nil, fmt.Errorf("expecting job key to be in format 'app/{app ID}/{job name}', got '%s'", key) + } + return &jobKey{ + appID: &split[1], + name: split[2], + }, nil + + case FilterActor: + if len(split) != 4 { + return nil, fmt.Errorf("expecting actor reminder key to be in format 'actor/{actor type}/{actor id}/{name}', got '%s'", key) + } + return &jobKey{ + actorType: &split[1], + actorID: &split[2], + name: split[3], + }, nil + + case FilterWorkflow: + if len(split) != 4 { + return nil, fmt.Errorf("expecting workflow key to be in format 'workflow/{app ID}/{instance ID}/{name}', got '%s'", key) + } + return &jobKey{ + appID: &split[1], + instanceID: &split[2], + name: split[3], + }, nil + + case FilterActivity: + if len(split) != 3 { + return nil, fmt.Errorf("expecting activity key to be in format 'activity/{app ID}/{activity ID}', got '%s'", key) + } + return &jobKey{ + appID: &split[1], + name: split[2], + activity: true, + }, nil + + default: + return nil, fmt.Errorf("unsupported job type '%s', accepts 'app', 'actor', 'workflow', or 'activity'", split[0]) + } +} + +func etcdClient(kubernetesMode bool, schedulerNamespace string) (*clientv3.Client, context.CancelFunc, error) { + var etcdClient *clientv3.Client + var err error + if kubernetesMode { + var cancel context.CancelFunc + etcdClient, cancel, err = etcdClientKubernetes(schedulerNamespace) + if err != nil { + return nil, nil, err + } + return etcdClient, cancel, nil + } else { + etcdClient, err = getEtcdClient("localhost:2379") + if err != nil { + return nil, nil, err + } + } + + return etcdClient, func() {}, nil +} + +func getEtcdClient(host string) (*clientv3.Client, error) { + client, err := clientv3.New(clientv3.Config{ + Endpoints: []string{host}, + }) + if err != nil { + return nil, err + } + + return client, nil +} + +func etcdClientKubernetes(namespace string) (*clientv3.Client, context.CancelFunc, error) { + config, _, err := kubernetes.GetKubeConfigClient() + if err != nil { + return nil, nil, err + } + + portForward, err := kubernetes.NewPortForward( + config, + namespace, + "dapr-scheduler-server-0", + "localhost", + 2379, + 2379, + false, + ) + if err != nil { + return nil, nil, err + } + + if err = portForward.Init(); err != nil { + return nil, nil, err + } + + client, err := getEtcdClient("localhost:2379") + if err != nil { + return nil, nil, err + } + + return client, portForward.Stop, nil +} diff --git a/pkg/scheduler/stored/counter.pb.go b/pkg/scheduler/stored/counter.pb.go new file mode 100644 index 000000000..10ced51ea --- /dev/null +++ b/pkg/scheduler/stored/counter.pb.go @@ -0,0 +1,197 @@ +// +//Copyright (c) 2024 Diagrid Inc. +//Licensed under the MIT License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.33.0 +// protoc v5.29.3 +// source: proto/stored/counter.proto + +package stored + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// Counter holds counter information for a given job. +type Counter struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // job_partition_id is the parition_id of the job this counter belongs to. + // Prevents an updated job from inheriting the counter of a previous job with + // the same name. + // Doesn't need to be globally unique. + JobPartitionId uint64 `protobuf:"varint,1,opt,name=job_partition_id,json=jobPartitionId,proto3" json:"job_partition_id,omitempty"` + // count is the number of times the job has been triggered. + Count uint32 `protobuf:"varint,2,opt,name=count,proto3" json:"count,omitempty"` + // last_trigger is the timestamp the job was last triggered. Used to + // determine the next time the job should be triggered. + LastTrigger *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=last_trigger,json=lastTrigger,proto3" json:"last_trigger,omitempty"` + // attempts is the number of times the job has been attempted to be triggered + // at this count. Used by failure policy to track how many times the Job + // trigger should be retried. + Attempts uint32 `protobuf:"varint,4,opt,name=attempts,proto3" json:"attempts,omitempty"` +} + +func (x *Counter) Reset() { + *x = Counter{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_stored_counter_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Counter) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Counter) ProtoMessage() {} + +func (x *Counter) ProtoReflect() protoreflect.Message { + mi := &file_proto_stored_counter_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Counter.ProtoReflect.Descriptor instead. +func (*Counter) Descriptor() ([]byte, []int) { + return file_proto_stored_counter_proto_rawDescGZIP(), []int{0} +} + +func (x *Counter) GetJobPartitionId() uint64 { + if x != nil { + return x.JobPartitionId + } + return 0 +} + +func (x *Counter) GetCount() uint32 { + if x != nil { + return x.Count + } + return 0 +} + +func (x *Counter) GetLastTrigger() *timestamppb.Timestamp { + if x != nil { + return x.LastTrigger + } + return nil +} + +func (x *Counter) GetAttempts() uint32 { + if x != nil { + return x.Attempts + } + return 0 +} + +var File_proto_stored_counter_proto protoreflect.FileDescriptor + +var file_proto_stored_counter_proto_rawDesc = []byte{ + 0x0a, 0x1a, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x64, 0x2f, 0x63, + 0x6f, 0x75, 0x6e, 0x74, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x73, 0x74, + 0x6f, 0x72, 0x65, 0x64, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xa4, 0x01, 0x0a, 0x07, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x65, + 0x72, 0x12, 0x28, 0x0a, 0x10, 0x6a, 0x6f, 0x62, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, + 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0e, 0x6a, 0x6f, 0x62, + 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x63, + 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x63, 0x6f, 0x75, 0x6e, + 0x74, 0x12, 0x3d, 0x0a, 0x0c, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x74, 0x72, 0x69, 0x67, 0x67, 0x65, + 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, + 0x61, 0x6d, 0x70, 0x52, 0x0b, 0x6c, 0x61, 0x73, 0x74, 0x54, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, + 0x12, 0x1a, 0x0a, 0x08, 0x61, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x0d, 0x52, 0x08, 0x61, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x42, 0x37, 0x5a, 0x35, + 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x64, 0x69, 0x61, 0x67, 0x72, + 0x69, 0x64, 0x69, 0x6f, 0x2f, 0x67, 0x6f, 0x2d, 0x65, 0x74, 0x63, 0x64, 0x2d, 0x63, 0x72, 0x6f, + 0x6e, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, + 0x74, 0x6f, 0x72, 0x65, 0x64, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_proto_stored_counter_proto_rawDescOnce sync.Once + file_proto_stored_counter_proto_rawDescData = file_proto_stored_counter_proto_rawDesc +) + +func file_proto_stored_counter_proto_rawDescGZIP() []byte { + file_proto_stored_counter_proto_rawDescOnce.Do(func() { + file_proto_stored_counter_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_stored_counter_proto_rawDescData) + }) + return file_proto_stored_counter_proto_rawDescData +} + +var file_proto_stored_counter_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_proto_stored_counter_proto_goTypes = []interface{}{ + (*Counter)(nil), // 0: stored.Counter + (*timestamppb.Timestamp)(nil), // 1: google.protobuf.Timestamp +} +var file_proto_stored_counter_proto_depIdxs = []int32{ + 1, // 0: stored.Counter.last_trigger:type_name -> google.protobuf.Timestamp + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_proto_stored_counter_proto_init() } +func file_proto_stored_counter_proto_init() { + if File_proto_stored_counter_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_proto_stored_counter_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Counter); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_proto_stored_counter_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_proto_stored_counter_proto_goTypes, + DependencyIndexes: file_proto_stored_counter_proto_depIdxs, + MessageInfos: file_proto_stored_counter_proto_msgTypes, + }.Build() + File_proto_stored_counter_proto = out.File + file_proto_stored_counter_proto_rawDesc = nil + file_proto_stored_counter_proto_goTypes = nil + file_proto_stored_counter_proto_depIdxs = nil +} diff --git a/pkg/scheduler/stored/job.pb.go b/pkg/scheduler/stored/job.pb.go new file mode 100644 index 000000000..3d4637b8e --- /dev/null +++ b/pkg/scheduler/stored/job.pb.go @@ -0,0 +1,253 @@ +// +//Copyright (c) 2024 Diagrid Inc. +//Licensed under the MIT License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.33.0 +// protoc v5.29.3 +// source: proto/stored/job.proto + +package stored + +import ( + api "github.com/diagridio/go-etcd-cron/api" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// Job is the wrapped stored version of a Job which has a partition_id +// associated. +type Job struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // partion_id is an identifier for the job, used for distinguishing jobs with + // the same name and assigning the job to a partition. + // Doesn't need to be globally unique. + PartitionId uint64 `protobuf:"varint,1,opt,name=partition_id,json=partitionId,proto3" json:"partition_id,omitempty"` + // begin is the beginning time of the job. + // + // Types that are assignable to Begin: + // + // *Job_Start + // *Job_DueTime + Begin isJob_Begin `protobuf_oneof:"begin"` + // expiration is the optional time at which the job should no longer be + // scheduled and will be ignored and garbage collected thereafter. + // A job may be removed earlier if repeats are exhausted or schedule doesn't + // permit. + Expiration *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=expiration,proto3,oneof" json:"expiration,omitempty"` + // job is the job spec. + Job *api.Job `protobuf:"bytes,5,opt,name=job,proto3" json:"job,omitempty"` +} + +func (x *Job) Reset() { + *x = Job{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_stored_job_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Job) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Job) ProtoMessage() {} + +func (x *Job) ProtoReflect() protoreflect.Message { + mi := &file_proto_stored_job_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Job.ProtoReflect.Descriptor instead. +func (*Job) Descriptor() ([]byte, []int) { + return file_proto_stored_job_proto_rawDescGZIP(), []int{0} +} + +func (x *Job) GetPartitionId() uint64 { + if x != nil { + return x.PartitionId + } + return 0 +} + +func (m *Job) GetBegin() isJob_Begin { + if m != nil { + return m.Begin + } + return nil +} + +func (x *Job) GetStart() *timestamppb.Timestamp { + if x, ok := x.GetBegin().(*Job_Start); ok { + return x.Start + } + return nil +} + +func (x *Job) GetDueTime() *timestamppb.Timestamp { + if x, ok := x.GetBegin().(*Job_DueTime); ok { + return x.DueTime + } + return nil +} + +func (x *Job) GetExpiration() *timestamppb.Timestamp { + if x != nil { + return x.Expiration + } + return nil +} + +func (x *Job) GetJob() *api.Job { + if x != nil { + return x.Job + } + return nil +} + +type isJob_Begin interface { + isJob_Begin() +} + +type Job_Start struct { + // start is the epoch time of the job whereby the clock starts on the + // schedule. The job _will not_ trigger at this time. + Start *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=start,proto3,oneof"` +} + +type Job_DueTime struct { + // due_time is the epoch time of the job whereby the clock starts on the + // schedule. The job _will_ trigger at this time. + DueTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=due_time,json=dueTime,proto3,oneof"` +} + +func (*Job_Start) isJob_Begin() {} + +func (*Job_DueTime) isJob_Begin() {} + +var File_proto_stored_job_proto protoreflect.FileDescriptor + +var file_proto_stored_job_proto_rawDesc = []byte{ + 0x0a, 0x16, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x64, 0x2f, 0x6a, + 0x6f, 0x62, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x64, + 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, + 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x1a, 0x13, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x6a, 0x6f, 0x62, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x8a, 0x02, 0x0a, 0x03, 0x4a, 0x6f, 0x62, 0x12, 0x21, + 0x0a, 0x0c, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x04, 0x52, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x49, + 0x64, 0x12, 0x32, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x72, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x48, 0x00, 0x52, 0x05, + 0x73, 0x74, 0x61, 0x72, 0x74, 0x12, 0x37, 0x0a, 0x08, 0x64, 0x75, 0x65, 0x5f, 0x74, 0x69, 0x6d, + 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, + 0x61, 0x6d, 0x70, 0x48, 0x00, 0x52, 0x07, 0x64, 0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x3f, + 0x0a, 0x0a, 0x65, 0x78, 0x70, 0x69, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x48, 0x01, + 0x52, 0x0a, 0x65, 0x78, 0x70, 0x69, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x88, 0x01, 0x01, 0x12, + 0x1a, 0x0a, 0x03, 0x6a, 0x6f, 0x62, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x08, 0x2e, 0x61, + 0x70, 0x69, 0x2e, 0x4a, 0x6f, 0x62, 0x52, 0x03, 0x6a, 0x6f, 0x62, 0x42, 0x07, 0x0a, 0x05, 0x62, + 0x65, 0x67, 0x69, 0x6e, 0x42, 0x0d, 0x0a, 0x0b, 0x5f, 0x65, 0x78, 0x70, 0x69, 0x72, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x42, 0x37, 0x5a, 0x35, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x64, 0x69, 0x61, 0x67, 0x72, 0x69, 0x64, 0x69, 0x6f, 0x2f, 0x67, 0x6f, 0x2d, 0x65, + 0x74, 0x63, 0x64, 0x2d, 0x63, 0x72, 0x6f, 0x6e, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, + 0x6c, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x64, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_proto_stored_job_proto_rawDescOnce sync.Once + file_proto_stored_job_proto_rawDescData = file_proto_stored_job_proto_rawDesc +) + +func file_proto_stored_job_proto_rawDescGZIP() []byte { + file_proto_stored_job_proto_rawDescOnce.Do(func() { + file_proto_stored_job_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_stored_job_proto_rawDescData) + }) + return file_proto_stored_job_proto_rawDescData +} + +var file_proto_stored_job_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_proto_stored_job_proto_goTypes = []interface{}{ + (*Job)(nil), // 0: stored.Job + (*timestamppb.Timestamp)(nil), // 1: google.protobuf.Timestamp + (*api.Job)(nil), // 2: api.Job +} +var file_proto_stored_job_proto_depIdxs = []int32{ + 1, // 0: stored.Job.start:type_name -> google.protobuf.Timestamp + 1, // 1: stored.Job.due_time:type_name -> google.protobuf.Timestamp + 1, // 2: stored.Job.expiration:type_name -> google.protobuf.Timestamp + 2, // 3: stored.Job.job:type_name -> api.Job + 4, // [4:4] is the sub-list for method output_type + 4, // [4:4] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name +} + +func init() { file_proto_stored_job_proto_init() } +func file_proto_stored_job_proto_init() { + if File_proto_stored_job_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_proto_stored_job_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Job); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + file_proto_stored_job_proto_msgTypes[0].OneofWrappers = []interface{}{ + (*Job_Start)(nil), + (*Job_DueTime)(nil), + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_proto_stored_job_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_proto_stored_job_proto_goTypes, + DependencyIndexes: file_proto_stored_job_proto_depIdxs, + MessageInfos: file_proto_stored_job_proto_msgTypes, + }.Build() + File_proto_stored_job_proto = out.File + file_proto_stored_job_proto_rawDesc = nil + file_proto_stored_job_proto_goTypes = nil + file_proto_stored_job_proto_depIdxs = nil +} diff --git a/pkg/scheduler/stored/leadership.pb.go b/pkg/scheduler/stored/leadership.pb.go new file mode 100644 index 000000000..772970a63 --- /dev/null +++ b/pkg/scheduler/stored/leadership.pb.go @@ -0,0 +1,186 @@ +// +//Copyright (c) 2024 Diagrid Inc. +//Licensed under the MIT License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.33.0 +// protoc v5.29.3 +// source: proto/stored/leadership.proto + +package stored + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + anypb "google.golang.org/protobuf/types/known/anypb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// Leadership is the message written to the leadership table when the replica +// gains ownership of the leader key. +type Leadership struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // total is this replicas understanding of the total number of partition + // replicas. + Total uint64 `protobuf:"varint,1,opt,name=total,proto3" json:"total,omitempty"` + // uid is a unique identifier for this replica. Ensures a single replica + // is the leader for a given partition. + Uid uint64 `protobuf:"varint,2,opt,name=uid,proto3" json:"uid,omitempty"` + // replica_data is custom data that is associated with this leader (replica). + // All leader data will be sent to library consumer on leadership table + // updates. + ReplicaData *anypb.Any `protobuf:"bytes,3,opt,name=replica_data,json=replicaData,proto3,oneof" json:"replica_data,omitempty"` +} + +func (x *Leadership) Reset() { + *x = Leadership{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_stored_leadership_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Leadership) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Leadership) ProtoMessage() {} + +func (x *Leadership) ProtoReflect() protoreflect.Message { + mi := &file_proto_stored_leadership_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Leadership.ProtoReflect.Descriptor instead. +func (*Leadership) Descriptor() ([]byte, []int) { + return file_proto_stored_leadership_proto_rawDescGZIP(), []int{0} +} + +func (x *Leadership) GetTotal() uint64 { + if x != nil { + return x.Total + } + return 0 +} + +func (x *Leadership) GetUid() uint64 { + if x != nil { + return x.Uid + } + return 0 +} + +func (x *Leadership) GetReplicaData() *anypb.Any { + if x != nil { + return x.ReplicaData + } + return nil +} + +var File_proto_stored_leadership_proto protoreflect.FileDescriptor + +var file_proto_stored_leadership_proto_rawDesc = []byte{ + 0x0a, 0x1d, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x64, 0x2f, 0x6c, + 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x68, 0x69, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, + 0x0b, 0x63, 0x72, 0x6f, 0x6e, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x64, 0x1a, 0x19, 0x67, 0x6f, + 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x61, 0x6e, + 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x83, 0x01, 0x0a, 0x0a, 0x4c, 0x65, 0x61, 0x64, + 0x65, 0x72, 0x73, 0x68, 0x69, 0x70, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x12, 0x10, 0x0a, 0x03, + 0x75, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x03, 0x75, 0x69, 0x64, 0x12, 0x3c, + 0x0a, 0x0c, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x48, 0x00, 0x52, 0x0b, 0x72, 0x65, + 0x70, 0x6c, 0x69, 0x63, 0x61, 0x44, 0x61, 0x74, 0x61, 0x88, 0x01, 0x01, 0x42, 0x0f, 0x0a, 0x0d, + 0x5f, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x42, 0x37, 0x5a, + 0x35, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x64, 0x69, 0x61, 0x67, + 0x72, 0x69, 0x64, 0x69, 0x6f, 0x2f, 0x67, 0x6f, 0x2d, 0x65, 0x74, 0x63, 0x64, 0x2d, 0x63, 0x72, + 0x6f, 0x6e, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x61, 0x70, 0x69, 0x2f, + 0x73, 0x74, 0x6f, 0x72, 0x65, 0x64, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_proto_stored_leadership_proto_rawDescOnce sync.Once + file_proto_stored_leadership_proto_rawDescData = file_proto_stored_leadership_proto_rawDesc +) + +func file_proto_stored_leadership_proto_rawDescGZIP() []byte { + file_proto_stored_leadership_proto_rawDescOnce.Do(func() { + file_proto_stored_leadership_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_stored_leadership_proto_rawDescData) + }) + return file_proto_stored_leadership_proto_rawDescData +} + +var file_proto_stored_leadership_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_proto_stored_leadership_proto_goTypes = []interface{}{ + (*Leadership)(nil), // 0: cron.stored.Leadership + (*anypb.Any)(nil), // 1: google.protobuf.Any +} +var file_proto_stored_leadership_proto_depIdxs = []int32{ + 1, // 0: cron.stored.Leadership.replica_data:type_name -> google.protobuf.Any + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_proto_stored_leadership_proto_init() } +func file_proto_stored_leadership_proto_init() { + if File_proto_stored_leadership_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_proto_stored_leadership_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Leadership); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + file_proto_stored_leadership_proto_msgTypes[0].OneofWrappers = []interface{}{} + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_proto_stored_leadership_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_proto_stored_leadership_proto_goTypes, + DependencyIndexes: file_proto_stored_leadership_proto_depIdxs, + MessageInfos: file_proto_stored_leadership_proto_msgTypes, + }.Build() + File_proto_stored_leadership_proto = out.File + file_proto_stored_leadership_proto_rawDesc = nil + file_proto_stored_leadership_proto_goTypes = nil + file_proto_stored_leadership_proto_depIdxs = nil +} diff --git a/pkg/standalone/standalone.go b/pkg/standalone/standalone.go index 86d23dd7a..7adc86aeb 100644 --- a/pkg/standalone/standalone.go +++ b/pkg/standalone/standalone.go @@ -88,7 +88,7 @@ const ( schedulerHealthPort = 58081 schedulerMetricPort = 59091 - schedulerEtcdPort = 52379 + schedulerEtcdPort = 2379 daprVersionsWithScheduler = ">= 1.14.x" ) @@ -693,6 +693,10 @@ func runSchedulerService(wg *sync.WaitGroup, errorChan chan<- error, info initIn } } + if schedulerEtcdClientListenAddress(info) { + args = append(args, "--etcd-client-listen-address=0.0.0.0") + } + _, err = utils.RunCmdAndWait(runtimeCmd, args...) if err != nil { runError := isContainerRunError(err) @@ -721,6 +725,21 @@ func schedulerOverrideHostPort(info initInfo) bool { return runV.GreaterThan(v115rc5) } +func schedulerEtcdClientListenAddress(info initInfo) bool { + if info.runtimeVersion == "edge" || info.runtimeVersion == "dev" { + return true + } + + runV, err := semver.NewVersion(info.runtimeVersion) + if err != nil { + return true + } + + v1160, _ := semver.NewVersion("1.16.0") + + return runV.GreaterThan(v1160) +} + func moveDashboardFiles(extractedFilePath string, dir string) (string, error) { // Move /release/os/web directory to /web. oldPath := path_filepath.Join(path_filepath.Dir(extractedFilePath), "web") diff --git a/tests/apps/scheduler/app.go b/tests/apps/scheduler/app.go new file mode 100644 index 000000000..ce7dd6909 --- /dev/null +++ b/tests/apps/scheduler/app.go @@ -0,0 +1,223 @@ +/* +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "fmt" + "log" + "net" + "net/http" + "time" + + "github.com/dapr/durabletask-go/workflow" + "github.com/dapr/go-sdk/client" + "github.com/dapr/kit/ptr" + "github.com/dapr/kit/signals" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/wrapperspb" +) + +func main() { + const port = 9095 + + ctx := signals.Context() + + fmt.Printf("Starting server in port %v...\n", port) + + regCh := make(chan struct{}) + mux := http.NewServeMux() + mux.HandleFunc("/dapr/config", func(w http.ResponseWriter, r *http.Request) { + close(regCh) + w.Write([]byte(`{"entities": ["myactortype"]}`)) + }) + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {}) + + go func() { + log.Printf("Waiting for registration call...") + select { + case <-regCh: + log.Printf("Registration call received") + case <-ctx.Done(): + log.Printf("Context done while waiting for registration call") + return + } + register(ctx) + }() + + StartServer(ctx, port, mux) +} + +func register(ctx context.Context) { + log.Printf("Registering jobs, reminders and workflows") + + addr := "127.0.0.1:3510" + log.Printf("Creating client to %s", addr) + cl, err := client.NewClientWithAddress(addr) + if err != nil { + log.Fatal(err) + } + log.Println("Client created") + + ds := time.Now().Format(time.RFC3339) + + data, err := anypb.New(wrapperspb.String("hello")) + if err != nil { + log.Fatal(err) + } + + if err = cl.ScheduleJobAlpha1(ctx, &client.Job{ + Name: "test1", + Schedule: ptr.Of("@every 100m"), + Repeats: ptr.Of(uint32(1234)), + DueTime: ptr.Of(ds), + Data: data, + }); err != nil { + log.Fatal(err) + } + + log.Printf("Scheduled job test1") + + if err = cl.ScheduleJobAlpha1(ctx, &client.Job{ + Name: "test2", + Schedule: ptr.Of("@every 100m"), + Repeats: ptr.Of(uint32(56788)), + DueTime: ptr.Of(ds), + TTL: ptr.Of("10000s"), + Data: data, + }); err != nil { + log.Fatal(err) + } + + log.Printf("Scheduled job test2") + + if err = cl.RegisterActorReminder(ctx, &client.RegisterActorReminderRequest{ + ActorType: "myactortype", + ActorID: "actorid1", + Name: "test1", + DueTime: ds, + Period: "R100/PT10000S", + }); err != nil { + log.Fatal(err) + } + + log.Printf("Scheduled actor reminder test1") + + if err = cl.RegisterActorReminder(ctx, &client.RegisterActorReminderRequest{ + ActorType: "myactortype", + ActorID: "actorid2", + Name: "test2", + DueTime: ds, + Period: "R100/PT10000S", + }); err != nil { + log.Fatal(err) + } + + log.Printf("Scheduled actor reminder test2") + + r := workflow.NewRegistry() + + if err := r.AddWorkflow(W1); err != nil { + log.Fatal(err) + } + if err := r.AddWorkflow(W2); err != nil { + log.Fatal(err) + } + if err := r.AddActivity(A1); err != nil { + log.Fatal(err) + } + + wf, err := client.NewWorkflowClient() + if err != nil { + log.Fatal(err) + } + + if err = wf.StartWorker(ctx, r); err != nil { + log.Fatal(err) + } + + if _, err = wf.ScheduleWorkflow(ctx, "W1", workflow.WithInstanceID("abc1")); err != nil { + log.Fatal(err) + } + + log.Printf("Scheduled workflow W1 with id abc1") + + if _, err = wf.ScheduleWorkflow(ctx, "W1", workflow.WithInstanceID("abc2")); err != nil { + log.Fatal(err) + } + + log.Printf("Scheduled workflow W1 with id abc2") + + if _, err = wf.ScheduleWorkflow(ctx, "W2", workflow.WithInstanceID("xyz1")); err != nil { + log.Fatal(err) + } + + log.Printf("Scheduled workflow W2 with id xyz1") + + if _, err = wf.ScheduleWorkflow(ctx, "W2", workflow.WithInstanceID("xyz2")); err != nil { + log.Fatal(err) + } + + log.Printf("Scheduled workflow W2 with id xyz2") +} + +// StartServer starts a HTTP or HTTP2 server +func StartServer(ctx context.Context, port int, handler http.Handler) { + // Create a listener + addr := fmt.Sprintf(":%d", port) + + log.Println("Starting server on ", addr) + + ln, err := net.Listen("tcp", addr) + if err != nil { + log.Fatalf("Failed to create listener: %v", err) + } + + //nolint:gosec + server := &http.Server{ + Addr: addr, + Handler: handler, + } + + go func() { + // Wait for cancelation signal + <-ctx.Done() + log.Println("Shutdown signal received") + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + server.Shutdown(ctx) + }() + + log.Printf("Server listening on %s", addr) + err = server.Serve(ln) + + if err != http.ErrServerClosed { + log.Fatalf("Failed to run server: %v", err) + } + + log.Println("Server shut down") +} + +func W1(ctx *workflow.WorkflowContext) (any, error) { + return nil, ctx.CreateTimer(time.Hour * 50).Await(nil) +} + +func W2(ctx *workflow.WorkflowContext) (any, error) { + return nil, ctx.CallActivity("A1").Await(nil) +} + +func A1(ctx workflow.ActivityContext) (any, error) { + <-ctx.Context().Done() + return nil, nil +} diff --git a/tests/apps/scheduler/go.mod b/tests/apps/scheduler/go.mod new file mode 100644 index 000000000..acd0a4009 --- /dev/null +++ b/tests/apps/scheduler/go.mod @@ -0,0 +1,29 @@ +module scheduler + +go 1.24.7 + +require ( + github.com/dapr/durabletask-go v0.10.0 + github.com/dapr/go-sdk v1.13.0 + github.com/dapr/kit v0.16.1 + google.golang.org/protobuf v1.36.6 +) + +require ( + github.com/cenkalti/backoff/v4 v4.3.0 // indirect + github.com/dapr/dapr v1.16.0 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/otel v1.36.0 // indirect + go.opentelemetry.io/otel/metric v1.36.0 // indirect + go.opentelemetry.io/otel/trace v1.36.0 // indirect + golang.org/x/net v0.41.0 // indirect + golang.org/x/sys v0.33.0 // indirect + golang.org/x/text v0.26.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect + google.golang.org/grpc v1.73.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/tests/apps/scheduler/go.sum b/tests/apps/scheduler/go.sum new file mode 100644 index 000000000..deeb81574 --- /dev/null +++ b/tests/apps/scheduler/go.sum @@ -0,0 +1,71 @@ +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/dapr/dapr v1.16.0 h1:la2WLZM8Myr2Pq3cyrFjHKWDSPYLzGZCs3p502TwBjI= +github.com/dapr/dapr v1.16.0/go.mod h1:ln/mxvNOeqklaDmic4ppsxmnjl2D/oZGKaJy24IwaEY= +github.com/dapr/durabletask-go v0.10.0 h1:vfIivPl4JYd55xZTslDwhA6p6F8ipcNxBtMaupxArr8= +github.com/dapr/durabletask-go v0.10.0/go.mod h1:0Ts4rXp74JyG19gDWPcwNo5V6NBZzhARzHF5XynmA7Q= +github.com/dapr/go-sdk v1.13.0 h1:Qw2BmUonClQ9yK/rrEEaFL1PyDgq616RrvYj0CT67Lk= +github.com/dapr/go-sdk v1.13.0/go.mod h1:RsffVNZitDApmQqoS68tNKGMXDZUjTviAbKZupJSzts= +github.com/dapr/kit v0.16.1 h1:MqLAhHVg8trPy2WJChMZFU7ToeondvxcNHYVvMDiVf4= +github.com/dapr/kit v0.16.1/go.mod h1:40ZWs5P6xfYf7O59XgwqZkIyDldTIXlhTQhGop8QoSM= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.36.0 h1:UumtzIklRBY6cI/lllNZlALOF5nNIzJVb16APdvgTXg= +go.opentelemetry.io/otel v1.36.0/go.mod h1:/TcFMXYjyRNh8khOAO9ybYkqaDBb/70aVwkNML4pP8E= +go.opentelemetry.io/otel/metric v1.36.0 h1:MoWPKVhQvJ+eeXWHFBOPoBOi20jh6Iq2CcCREuTYufE= +go.opentelemetry.io/otel/metric v1.36.0/go.mod h1:zC7Ks+yeyJt4xig9DEw9kuUFe5C3zLbVjV2PzT6qzbs= +go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY= +go.opentelemetry.io/otel/sdk v1.35.0/go.mod h1:+ga1bZliga3DxJ3CQGg3updiaAJoNECOgJREo9KHGQg= +go.opentelemetry.io/otel/sdk/metric v1.35.0 h1:1RriWBmCKgkeHEhM7a2uMjMUfP7MsOF5JpUCaEqEI9o= +go.opentelemetry.io/otel/sdk/metric v1.35.0/go.mod h1:is6XYCUMpcKi+ZsOvfluY5YstFnhW0BidkR+gL+qN+w= +go.opentelemetry.io/otel/trace v1.36.0 h1:ahxWNuqZjpdiFAyrIoQ4GIiAIhxAunQR6MUoKrsNd4w= +go.opentelemetry.io/otel/trace v1.36.0/go.mod h1:gQ+OnDZzrybY4k4seLzPAWNwVBBVlF2szhehOBB/tGA= +golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw= +golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= +golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M= +golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 h1:fc6jSaCT0vBduLYZHYrBBNY4dsWuvgyff9noRNDdBeE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= +google.golang.org/grpc v1.73.0 h1:VIWSmpI2MegBtTuFt5/JWy2oXxtjJ/e89Z70ImfD2ok= +google.golang.org/grpc v1.73.0/go.mod h1:50sbHOUqWoCQGI8V2HQLJM0B+LMlIUjNSZmow7EVBQc= +google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= +google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/tests/e2e/standalone/commands.go b/tests/e2e/standalone/commands.go index c75bc7171..fa1530ed8 100644 --- a/tests/e2e/standalone/commands.go +++ b/tests/e2e/standalone/commands.go @@ -219,3 +219,51 @@ func cmdVersion(output string, args ...string) (string, error) { return spawn.Command(common.GetDaprPath(), verArgs...) } + +func cmdSchedulerList(args ...string) (string, error) { + listArgs := []string{"scheduler", "list"} + + listArgs = append(listArgs, args...) + + return spawn.Command(common.GetDaprPath(), listArgs...) +} + +func cmdSchedulerGet(args ...string) (string, error) { + listArgs := []string{"scheduler", "get"} + + listArgs = append(listArgs, args...) + + return spawn.Command(common.GetDaprPath(), listArgs...) +} + +func cmdSchedulerDelete(args ...string) (string, error) { + deleteArgs := []string{"scheduler", "delete"} + + deleteArgs = append(deleteArgs, args...) + + return spawn.Command(common.GetDaprPath(), deleteArgs...) +} + +func cmdSchedulerDeleteAll(args ...string) (string, error) { + deleteArgs := []string{"scheduler", "delete-all"} + + deleteArgs = append(deleteArgs, args...) + + return spawn.Command(common.GetDaprPath(), deleteArgs...) +} + +func cmdSchedulerExport(args ...string) (string, error) { + exportArgs := []string{"scheduler", "export"} + + exportArgs = append(exportArgs, args...) + + return spawn.Command(common.GetDaprPath(), exportArgs...) +} + +func cmdSchedulerImport(args ...string) (string, error) { + importArgs := []string{"scheduler", "import"} + + importArgs = append(importArgs, args...) + + return spawn.Command(common.GetDaprPath(), importArgs...) +} diff --git a/tests/e2e/standalone/scheduler_test.go b/tests/e2e/standalone/scheduler_test.go new file mode 100644 index 000000000..5549aba5c --- /dev/null +++ b/tests/e2e/standalone/scheduler_test.go @@ -0,0 +1,505 @@ +//go:build !windows && (e2e || template) + +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package standalone_test + +import ( + "encoding/json" + "path/filepath" + "strconv" + "strings" + "testing" + "time" + + "github.com/dapr/cli/pkg/scheduler" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" +) + +func TestSchedulerList(t *testing.T) { + if isSlimMode() { + t.Skip("skipping scheduler tests in slim mode") + } + + cmdUninstall() + ensureDaprInstallation(t) + t.Cleanup(func() { + must(t, cmdUninstall, "failed to uninstall Dapr") + }) + + runFilePath := "../testdata/run-template-files/test-scheduler.yaml" + t.Cleanup(func() { + cmdStopWithAppID("test-scheduler") + waitAppsToBeStopped() + }) + args := []string{"-f", runFilePath} + + go func() { + o, err := cmdRun("", args...) + t.Log(o) + t.Log(err) + }() + + require.EventuallyWithT(t, func(c *assert.CollectT) { + output, err := cmdSchedulerList() + require.NoError(t, err) + assert.Len(c, strings.Split(output, "\n"), 10) + }, time.Second*30, time.Millisecond*10) + + t.Run("short", func(t *testing.T) { + output, err := cmdSchedulerList() + require.NoError(t, err) + lines := strings.Split(output, "\n") + require.Len(t, lines, 10) + + require.Equal(t, []string{ + "NAME", + "BEGIN", + "COUNT", + "LAST", + "TRIGGER", + }, strings.Fields(lines[0])) + + expNames := []string{ + "actor/myactortype/actorid1/test1", + "actor/myactortype/actorid2/test2", + "app/test-scheduler/test1", + "app/test-scheduler/test2", + } + for i, line := range lines[1:5] { + assert.Equal(t, expNames[i], strings.Fields(line)[0]) + + assert.NotEmpty(t, strings.Fields(line)[1]) + + count, err := strconv.Atoi(strings.Fields(line)[2]) + require.NoError(t, err) + assert.Equal(t, 1, count) + + assert.NotEmpty(t, strings.Fields(line)[3]) + } + + expNames = []string{ + "activity/test-scheduler/xyz1::0::1", + "activity/test-scheduler/xyz2::0::1", + } + for i, line := range lines[5:7] { + assert.Equal(t, expNames[i], strings.Fields(line)[0]) + + assert.NotEmpty(t, strings.Fields(line)[1]) + + count, err := strconv.Atoi(strings.Fields(line)[2]) + require.NoError(t, err) + assert.Equal(t, 0, count) + } + + expNames = []string{ + "workflow/test-scheduler/abc1", + "workflow/test-scheduler/abc2", + } + for i, line := range lines[7:9] { + assert.True(t, strings.HasPrefix(strings.Fields(line)[0], expNames[i]), strings.Fields(line)[0]) + } + }) + + t.Run("wide", func(t *testing.T) { + output, err := cmdSchedulerList("-o", "wide") + require.NoError(t, err) + lines := strings.Split(output, "\n") + require.Len(t, lines, 10) + + require.Equal(t, []string{ + "NAMESPACE", + "NAME", + "BEGIN", + "EXPIRATION", + "SCHEDULE", + "DUE", + "TIME", + "TTL", + "REPEATS", + "COUNT", + "LAST", + "TRIGGER", + }, strings.Fields(lines[0])) + }) + + t.Run("yaml", func(t *testing.T) { + output, err := cmdSchedulerList("-o", "yaml") + require.NoError(t, err) + + var list []scheduler.ListOutputWide + require.NoError(t, yaml.Unmarshal([]byte(output), &list)) + assert.Len(t, list, 8) + }) + + t.Run("json", func(t *testing.T) { + output, err := cmdSchedulerList("-o", "json") + require.NoError(t, err) + + var list []scheduler.ListOutputWide + require.NoError(t, json.Unmarshal([]byte(output), &list)) + assert.Len(t, list, 8) + }) + + t.Run("filter", func(t *testing.T) { + output, err := cmdSchedulerList("-n", "foo") + require.NoError(t, err) + assert.Len(t, strings.Split(output, "\n"), 2) + + output, err = cmdSchedulerList("--filter", "all") + require.NoError(t, err) + assert.Len(t, strings.Split(output, "\n"), 10) + + output, err = cmdSchedulerList("--filter", "app") + require.NoError(t, err) + assert.Len(t, strings.Split(output, "\n"), 4) + + output, err = cmdSchedulerList("--filter", "actor") + require.NoError(t, err) + assert.Len(t, strings.Split(output, "\n"), 4) + + output, err = cmdSchedulerList("--filter", "workflow") + require.NoError(t, err) + assert.Len(t, strings.Split(output, "\n"), 4) + + output, err = cmdSchedulerList("--filter", "activity") + require.NoError(t, err) + assert.Len(t, strings.Split(output, "\n"), 4) + }) +} + +func TestSchedulerGet(t *testing.T) { + if isSlimMode() { + t.Skip("skipping scheduler tests in slim mode") + } + + cmdUninstall() + ensureDaprInstallation(t) + t.Cleanup(func() { + must(t, cmdUninstall, "failed to uninstall Dapr") + }) + + runFilePath := "../testdata/run-template-files/test-scheduler.yaml" + t.Cleanup(func() { + cmdStopWithAppID("test-scheduler") + waitAppsToBeStopped() + }) + args := []string{"-f", runFilePath} + + go func() { + o, err := cmdRun("", args...) + t.Log(o) + t.Log(err) + }() + + require.EventuallyWithT(t, func(c *assert.CollectT) { + output, err := cmdSchedulerList() + require.NoError(t, err) + assert.Len(c, strings.Split(output, "\n"), 10) + }, time.Second*30, time.Millisecond*10) + + expNames := []string{ + "actor/myactortype/actorid1/test1", + "actor/myactortype/actorid2/test2", + "app/test-scheduler/test1", + "app/test-scheduler/test2", + "activity/test-scheduler/xyz1::0::1", + "activity/test-scheduler/xyz2::0::1", + } + + t.Run("short", func(t *testing.T) { + for _, name := range expNames { + output, err := cmdSchedulerGet(name) + require.NoError(t, err) + lines := strings.Split(output, "\n") + require.Len(t, lines, 3) + + require.Equal(t, []string{ + "NAME", + "BEGIN", + "COUNT", + "LAST", + "TRIGGER", + }, strings.Fields(lines[0])) + } + }) + + t.Run("wide", func(t *testing.T) { + for _, name := range expNames { + output, err := cmdSchedulerGet(name, "-o", "wide") + require.NoError(t, err) + lines := strings.Split(output, "\n") + require.Len(t, lines, 3) + + require.Equal(t, []string{ + "NAMESPACE", + "NAME", + "BEGIN", + "EXPIRATION", + "SCHEDULE", + "DUE", + "TIME", + "TTL", + "REPEATS", + "COUNT", + "LAST", + "TRIGGER", + }, strings.Fields(lines[0])) + } + }) + + t.Run("yaml", func(t *testing.T) { + for _, name := range expNames { + output, err := cmdSchedulerGet(name, "-o", "yaml") + require.NoError(t, err) + + var list []scheduler.ListOutputWide + require.NoError(t, yaml.Unmarshal([]byte(output), &list)) + assert.Len(t, list, 1) + } + }) + + t.Run("json", func(t *testing.T) { + for _, name := range expNames { + output, err := cmdSchedulerGet(name, "-o", "json") + require.NoError(t, err) + + var list []scheduler.ListOutputWide + require.NoError(t, json.Unmarshal([]byte(output), &list)) + assert.Len(t, list, 1) + } + }) +} + +func TestSchedulerDelete(t *testing.T) { + if isSlimMode() { + t.Skip("skipping scheduler tests in slim mode") + } + + cmdUninstall() + ensureDaprInstallation(t) + t.Cleanup(func() { + must(t, cmdUninstall, "failed to uninstall Dapr") + }) + + runFilePath := "../testdata/run-template-files/test-scheduler.yaml" + t.Cleanup(func() { + cmdStopWithAppID("test-scheduler") + waitAppsToBeStopped() + }) + args := []string{"-f", runFilePath} + + go func() { + o, err := cmdRun("", args...) + t.Log(o) + t.Log(err) + }() + + require.EventuallyWithT(t, func(c *assert.CollectT) { + output, err := cmdSchedulerList() + require.NoError(t, err) + assert.Len(c, strings.Split(output, "\n"), 10) + }, time.Second*30, time.Millisecond*10) + + output, err := cmdSchedulerList() + require.NoError(t, err) + + _, err = cmdSchedulerDelete("actor/myactortype/actorid1/test1") + require.NoError(t, err) + + output, err = cmdSchedulerList() + require.NoError(t, err) + assert.Len(t, strings.Split(output, "\n"), 9) + + _, err = cmdSchedulerDelete( + "actor/myactortype/actorid2/test2", + "app/test-scheduler/test1", + "app/test-scheduler/test2", + ) + require.NoError(t, err) + + output, err = cmdSchedulerList() + require.NoError(t, err) + assert.Len(t, strings.Split(output, "\n"), 6) + + _, err = cmdSchedulerDelete( + "activity/test-scheduler/xyz1::0::1", + "activity/test-scheduler/xyz2::0::1", + ) + require.NoError(t, err) + + output, err = cmdSchedulerList() + require.NoError(t, err) + assert.Len(t, strings.Split(output, "\n"), 4) + + _, err = cmdSchedulerDelete( + strings.Fields(strings.Split(output, "\n")[1])[0], + strings.Fields(strings.Split(output, "\n")[2])[0], + ) + require.NoError(t, err) + + output, err = cmdSchedulerList() + require.NoError(t, err) + assert.Len(t, strings.Split(output, "\n"), 2) +} + +func TestSchedulerDeleteAllAll(t *testing.T) { + if isSlimMode() { + t.Skip("skipping scheduler tests in slim mode") + } + + cmdUninstall() + ensureDaprInstallation(t) + t.Cleanup(func() { + must(t, cmdUninstall, "failed to uninstall Dapr") + }) + + runFilePath := "../testdata/run-template-files/test-scheduler.yaml" + t.Cleanup(func() { + cmdStopWithAppID("test-scheduler") + waitAppsToBeStopped() + }) + args := []string{"-f", runFilePath} + + go func() { + o, err := cmdRun("", args...) + t.Log(o) + t.Log(err) + }() + + require.EventuallyWithT(t, func(c *assert.CollectT) { + output, err := cmdSchedulerList() + require.NoError(t, err) + assert.Len(c, strings.Split(output, "\n"), 10) + }, time.Second*30, time.Millisecond*10) + + _, err := cmdSchedulerDeleteAll("all") + require.NoError(t, err) + + output, err := cmdSchedulerList() + require.NoError(t, err) + assert.Len(t, strings.Split(output, "\n"), 2) +} + +func TestSchedulerDeleteAll(t *testing.T) { + if isSlimMode() { + t.Skip("skipping scheduler tests in slim mode") + } + + cmdUninstall() + ensureDaprInstallation(t) + t.Cleanup(func() { + must(t, cmdUninstall, "failed to uninstall Dapr") + }) + + runFilePath := "../testdata/run-template-files/test-scheduler.yaml" + t.Cleanup(func() { + cmdStopWithAppID("test-scheduler") + waitAppsToBeStopped() + }) + args := []string{"-f", runFilePath} + + go func() { + o, err := cmdRun("", args...) + t.Log(o) + t.Log(err) + }() + + require.EventuallyWithT(t, func(c *assert.CollectT) { + output, err := cmdSchedulerList() + require.NoError(t, err) + assert.Len(c, strings.Split(output, "\n"), 10) + }, time.Second*30, time.Millisecond*10) + + _, err := cmdSchedulerDeleteAll("app/test-scheduler") + require.NoError(t, err) + output, err := cmdSchedulerList() + require.NoError(t, err) + assert.Len(t, strings.Split(output, "\n"), 8) + + _, err = cmdSchedulerDeleteAll("workflow/test-scheduler/abc1") + require.NoError(t, err) + output, err = cmdSchedulerList() + require.NoError(t, err) + assert.Len(t, strings.Split(output, "\n"), 7) + + _, err = cmdSchedulerDeleteAll("workflow/test-scheduler") + require.NoError(t, err) + output, err = cmdSchedulerList() + require.NoError(t, err) + assert.Len(t, strings.Split(output, "\n"), 4) + + _, err = cmdSchedulerDeleteAll("actor/myactortype/actorid1") + require.NoError(t, err) + output, err = cmdSchedulerList() + require.NoError(t, err) + assert.Len(t, strings.Split(output, "\n"), 3) + + _, err = cmdSchedulerDeleteAll("actor/myactortype") + require.NoError(t, err) + output, err = cmdSchedulerList() + require.NoError(t, err) + assert.Len(t, strings.Split(output, "\n"), 2) +} + +func TestSchedulerExportImport(t *testing.T) { + if isSlimMode() { + t.Skip("skipping scheduler tests in slim mode") + } + + cmdUninstall() + ensureDaprInstallation(t) + t.Cleanup(func() { + must(t, cmdUninstall, "failed to uninstall Dapr") + }) + + runFilePath := "../testdata/run-template-files/test-scheduler.yaml" + t.Cleanup(func() { + cmdStopWithAppID("test-scheduler") + waitAppsToBeStopped() + }) + args := []string{"-f", runFilePath} + + go func() { + o, err := cmdRun("", args...) + t.Log(o) + t.Log(err) + }() + + require.EventuallyWithT(t, func(c *assert.CollectT) { + output, err := cmdSchedulerList() + require.NoError(t, err) + assert.Len(c, strings.Split(output, "\n"), 10) + }, time.Second*30, time.Millisecond*10) + + f := filepath.Join(t.TempDir(), "foo") + _, err := cmdSchedulerExport("-o", f) + require.NoError(t, err) + + _, err = cmdSchedulerDeleteAll("all") + require.NoError(t, err) + output, err := cmdSchedulerList() + require.NoError(t, err) + assert.Len(t, strings.Split(output, "\n"), 2) + + _, err = cmdSchedulerImport("-f", f) + require.NoError(t, err) + output, err = cmdSchedulerList() + require.NoError(t, err) + assert.Len(t, strings.Split(output, "\n"), 10) +} diff --git a/tests/e2e/testdata/run-template-files/test-scheduler.yaml b/tests/e2e/testdata/run-template-files/test-scheduler.yaml new file mode 100644 index 000000000..d68bf746f --- /dev/null +++ b/tests/e2e/testdata/run-template-files/test-scheduler.yaml @@ -0,0 +1,11 @@ +version: 1 +apps: +- appID: test-scheduler + appDirPath: ../../../apps/scheduler/ + appPort: 9095 + daprGRPCPort: 3510 + command: ["go", "run", "app.go"] + appLogDestination: console + daprdLogDestination: console + schedulerHostAddress: 127.0.0.1:50006 + placementHostAddress: 127.0.0.1:50005 diff --git a/tests/e2e/upgrade/upgrade_test.go b/tests/e2e/upgrade/upgrade_test.go index b3038257f..732da491c 100644 --- a/tests/e2e/upgrade/upgrade_test.go +++ b/tests/e2e/upgrade/upgrade_test.go @@ -30,7 +30,7 @@ type upgradePath struct { } const ( - latestRuntimeVersion = "1.16.0" + latestRuntimeVersion = "1.16.1" latestRuntimeVersionMinusOne = "1.15.11" latestRuntimeVersionMinusTwo = "1.14.5" dashboardVersion = "0.15.0" diff --git a/utils/utils.go b/utils/utils.go index 483bf85f7..286da4289 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -15,6 +15,7 @@ package utils import ( "bufio" + "bytes" "context" "encoding/json" "errors" @@ -92,12 +93,16 @@ func PrintTable(csvContent string) { // WriteTable writes the csv table to writer. func WriteTable(writer io.Writer, csvContent string) { - table := tablewriter.NewWriter(writer) + var output bytes.Buffer + + table := tablewriter.NewWriter(&output) table.SetHeaderAlignment(tablewriter.ALIGN_LEFT) - table.SetBorder(false) table.SetHeaderLine(false) - table.SetRowLine(false) - table.SetCenterSeparator("") + table.SetBorders(tablewriter.Border{ + Top: false, + Bottom: false, + }) + table.SetTablePadding("") table.SetRowSeparator("") table.SetColumnSeparator("") table.SetAlignment(tablewriter.ALIGN_LEFT) @@ -116,6 +121,12 @@ func WriteTable(writer io.Writer, csvContent string) { } table.Render() + + b := bufio.NewScanner(&output) + for b.Scan() { + writer.Write(bytes.TrimLeft(b.Bytes(), " ")) + writer.Write([]byte("\n")) + } } func TruncateString(str string, maxLength int) string { @@ -430,3 +441,27 @@ func AttachJobObjectToProcess(pid string, proc *os.Process) { func GetJobObjectNameFromPID(pid string) string { return pid + "-" + windowsDaprAppProcJobName } + +func HumanizeDuration(d time.Duration) string { + if d == 0 { + return "" + } + + if d < 0 { + d = -d + } + switch { + case d < time.Microsecond: + return fmt.Sprintf("%dns", d.Nanoseconds()) + case d < time.Millisecond: + return fmt.Sprintf("%.1fµs", float64(d)/1e3) + case d < time.Second: + return fmt.Sprintf("%.1fms", float64(d)/1e6) + case d < time.Minute: + return fmt.Sprintf("%.2fs", d.Seconds()) + case d < time.Hour: + return fmt.Sprintf("%.1fm", d.Minutes()) + default: + return fmt.Sprintf("%.1fh", d.Hours()) + } +}