diff --git a/components/cluster/command/template.go b/components/cluster/command/template.go index e51acfcb93..ac354cd436 100644 --- a/components/cluster/command/template.go +++ b/components/cluster/command/template.go @@ -43,6 +43,7 @@ type LocalTemplate struct { PDServers []string // pd_servers in yaml template TiDBServers []string // tidb_servers in yaml template TiKVServers []string // tikv_servers in yaml template + TiKVWorkerServers []string // tikv_worker_servers in yaml template TiFlashServers []string // tiflash_servers in yaml template MonitoringServers []string // monitoring_servers in yaml template GrafanaServers []string // grafana_servers in yaml template @@ -131,6 +132,7 @@ func newTemplateCmd() *cobra.Command { cmd.Flags().StringSliceVar(&localOpt.PDServers, "pd-servers", []string{"127.0.0.1"}, "List of PD servers") cmd.Flags().StringSliceVar(&localOpt.TiDBServers, "tidb-servers", []string{"127.0.0.1"}, "List of TiDB servers") cmd.Flags().StringSliceVar(&localOpt.TiKVServers, "tikv-servers", []string{"127.0.0.1"}, "List of TiKV servers") + cmd.Flags().StringSliceVar(&localOpt.TiKVWorkerServers, "tikv-worker-servers", nil, "List of TiKV worker servers") cmd.Flags().StringSliceVar(&localOpt.TiFlashServers, "tiflash-servers", nil, "List of TiFlash servers") cmd.Flags().StringSliceVar(&localOpt.MonitoringServers, "monitoring-servers", []string{"127.0.0.1"}, "List of monitor servers") cmd.Flags().StringSliceVar(&localOpt.GrafanaServers, "grafana-servers", []string{"127.0.0.1"}, "List of grafana servers") diff --git a/embed/examples/cluster/local.tpl b/embed/examples/cluster/local.tpl index f6f017fbb4..968841fa52 100644 --- a/embed/examples/cluster/local.tpl +++ b/embed/examples/cluster/local.tpl @@ -43,6 +43,12 @@ tikv_servers: - host: {{ . }} {{- end }} {{ end }} +{{ if .TiKVWorkerServers -}} +tikv_worker_servers: +{{- range .TiKVWorkerServers }} + - host: {{ . }} +{{- end }} +{{ end }} {{- if .TiFlashServers }} tiflash_servers: {{- range .TiFlashServers }} diff --git a/embed/templates/config/prometheus.yml.tpl b/embed/templates/config/prometheus.yml.tpl index ef76764f49..88012217aa 100644 --- a/embed/templates/config/prometheus.yml.tpl +++ b/embed/templates/config/prometheus.yml.tpl @@ -145,6 +145,23 @@ scrape_configs: - targets: {{- range .TiKVStatusAddrs}} - '{{.}}' +{{- end}} +{{- if .TiKVWorkerAddrs}} + - job_name: "tikv-worker" + honor_labels: true # don't overwrite job & instance labels +{{- if .TLSEnabled}} + scheme: https + tls_config: + insecure_skip_verify: false + ca_file: ../tls/ca.crt + cert_file: ../tls/prometheus.crt + key_file: ../tls/prometheus.pem +{{- end}} + static_configs: + - targets: +{{- range .TiKVWorkerAddrs}} + - '{{.}}' +{{- end}} {{- end}} - job_name: "pd" honor_labels: true # don't overwrite job & instance labels @@ -377,6 +394,14 @@ scrape_configs: {{- end}} labels: group: 'tikv' +{{- if .TiKVWorkerAddrs}} + - targets: + {{- range .TiKVWorkerAddrs}} + - '{{.}}' + {{- end}} + labels: + group: 'tikv-worker' +{{- end}} - targets: {{- range .PDAddrs}} - '{{.}}' @@ -511,4 +536,4 @@ scrape_configs: {{- if .RemoteConfig}} {{.RemoteConfig}} -{{- end}} \ No newline at end of file +{{- end}} diff --git a/embed/templates/scripts/run_tikv-worker.sh.tpl b/embed/templates/scripts/run_tikv-worker.sh.tpl new file mode 100644 index 0000000000..be34ae6c50 --- /dev/null +++ b/embed/templates/scripts/run_tikv-worker.sh.tpl @@ -0,0 +1,19 @@ +#!/bin/bash +set -e + +# WARNING: This file was auto-generated. Do not edit! +# All your edit might be overwritten! +cd "{{.DeployDir}}" || exit 1 + +{{- if and .NumaNode .NumaCores}} +exec numactl --cpunodebind={{.NumaNode}} --membind={{.NumaNode}} -C {{.NumaCores}} bin/tikv-worker \ +{{- else if .NumaNode}} +exec numactl --cpunodebind={{.NumaNode}} --membind={{.NumaNode}} bin/tikv-worker \ +{{- else}} +exec bin/tikv-worker \ +{{- end}} + --addr "{{.Addr}}" \ + --pd-endpoints "{{.PD}}" \ + --config conf/tikv-worker.toml \ + --log-file "{{.LogDir}}/tikv_worker.log" 2>> "{{.LogDir}}/tikv_worker_stderr.log" + diff --git a/pkg/cluster/manager/patch.go b/pkg/cluster/manager/patch.go index b38d78077c..6ac4ddc9a4 100644 --- a/pkg/cluster/manager/patch.go +++ b/pkg/cluster/manager/patch.go @@ -147,13 +147,18 @@ func checkPackage(specManager *spec.SpecManager, name string, inst spec.Instance } ver := inst.CalculateVersion(metadata.GetBaseMeta().Version) - repo, err := clusterutil.NewRepository(nodeOS, arch) - if err != nil { - return err - } - entry, err := repo.ComponentBinEntry(inst.ComponentSource(), ver) - if err != nil { - return err + var entry string + if inst.ComponentName() == spec.ComponentTiKVWorker { + entry = spec.ComponentTiKVWorker + } else { + repo, err := clusterutil.NewRepository(nodeOS, arch) + if err != nil { + return err + } + entry, err = repo.ComponentBinEntry(inst.ComponentSource(), ver) + if err != nil { + return err + } } checksum, err := utils.Checksum(packagePath) diff --git a/pkg/cluster/spec/monitoring.go b/pkg/cluster/spec/monitoring.go index c1a0603926..0aa1ec601f 100644 --- a/pkg/cluster/spec/monitoring.go +++ b/pkg/cluster/spec/monitoring.go @@ -360,6 +360,13 @@ func (i *MonitorInstance) InitConfig( cfig.AddTiKV(kv.Host, uint64(kv.StatusPort)) } } + if servers, found := topoHasField("TiKVWorkerServers"); found { + for i := 0; i < servers.Len(); i++ { + worker := servers.Index(i).Interface().(*TiKVWorkerSpec) + uniqueHosts.Insert(worker.Host) + cfig.AddTiKVWorker(worker.Host, uint64(worker.Port)) + } + } if servers, found := topoHasField("TiDBServers"); found { for i := 0; i < servers.Len(); i++ { db := servers.Index(i).Interface().(*TiDBSpec) diff --git a/pkg/cluster/spec/spec.go b/pkg/cluster/spec/spec.go index f20052afe3..3ebb05410a 100644 --- a/pkg/cluster/spec/spec.go +++ b/pkg/cluster/spec/spec.go @@ -120,6 +120,7 @@ type ( ServerConfigs struct { TiDB map[string]any `yaml:"tidb"` TiKV map[string]any `yaml:"tikv"` + TiKVWorker map[string]any `yaml:"tikv_worker"` PD map[string]any `yaml:"pd"` TSO map[string]any `yaml:"tso"` Scheduling map[string]any `yaml:"scheduling"` @@ -140,6 +141,7 @@ type ( ComponentVersions struct { TiDB string `yaml:"tidb,omitempty"` TiKV string `yaml:"tikv,omitempty"` + TiKVWorker string `yaml:"tikv_worker,omitempty"` TiFlash string `yaml:"tiflash,omitempty"` PD string `yaml:"pd,omitempty"` TSO string `yaml:"tso,omitempty"` @@ -162,15 +164,16 @@ type ( // ComponentSources represents the source of components ComponentSources struct { - TiDB string `yaml:"tidb,omitempty" validate:"tidb:editable"` - TiKV string `yaml:"tikv,omitempty" validate:"tikv:editable"` - TiFlash string `yaml:"tiflash,omitempty" validate:"tiflash:editable"` - PD string `yaml:"pd,omitempty" validate:"pd:editable"` - Dashboard string `yaml:"tidb_dashboard,omitempty" validate:"tidb_dashboard:editable"` - Pump string `yaml:"pump,omitempty" validate:"pump:editable"` - Drainer string `yaml:"drainer,omitempty" validate:"drainer:editable"` - CDC string `yaml:"cdc,omitempty" validate:"cdc:editable"` - TiKVCDC string `yaml:"kvcdc,omitempty" validate:"kvcdc:editable"` + TiDB string `yaml:"tidb,omitempty" validate:"tidb:editable"` + TiKV string `yaml:"tikv,omitempty" validate:"tikv:editable"` + TiKVWorker string `yaml:"tikv_worker,omitempty" validate:"tikv_worker:editable"` + TiFlash string `yaml:"tiflash,omitempty" validate:"tiflash:editable"` + PD string `yaml:"pd,omitempty" validate:"pd:editable"` + Dashboard string `yaml:"tidb_dashboard,omitempty" validate:"tidb_dashboard:editable"` + Pump string `yaml:"pump,omitempty" validate:"pump:editable"` + Drainer string `yaml:"drainer,omitempty" validate:"drainer:editable"` + CDC string `yaml:"cdc,omitempty" validate:"cdc:editable"` + TiKVCDC string `yaml:"kvcdc,omitempty" validate:"kvcdc:editable"` } // Specification represents the specification of topology.yaml @@ -182,6 +185,7 @@ type ( ServerConfigs ServerConfigs `yaml:"server_configs,omitempty" validate:"server_configs:ignore"` TiDBServers []*TiDBSpec `yaml:"tidb_servers"` TiKVServers []*TiKVSpec `yaml:"tikv_servers"` + TiKVWorkerServers []*TiKVWorkerSpec `yaml:"tikv_worker_servers,omitempty"` TiFlashServers []*TiFlashSpec `yaml:"tiflash_servers"` TiProxyServers []*TiProxySpec `yaml:"tiproxy_servers"` PDServers []*PDSpec `yaml:"pd_servers"` @@ -570,6 +574,7 @@ func (s *Specification) Merge(that Topology) Topology { ComponentVersions: s.ComponentVersions.Merge(spec.ComponentVersions), TiDBServers: append(s.TiDBServers, spec.TiDBServers...), TiKVServers: append(s.TiKVServers, spec.TiKVServers...), + TiKVWorkerServers: append(s.TiKVWorkerServers, spec.TiKVWorkerServers...), PDServers: append(s.PDServers, spec.PDServers...), DashboardServers: append(s.DashboardServers, spec.DashboardServers...), TiFlashServers: append(s.TiFlashServers, spec.TiFlashServers...), @@ -595,6 +600,7 @@ func (v *ComponentVersions) Merge(that ComponentVersions) ComponentVersions { return ComponentVersions{ TiDB: utils.Ternary(that.TiDB != "", that.TiDB, v.TiDB).(string), TiKV: utils.Ternary(that.TiKV != "", that.TiKV, v.TiKV).(string), + TiKVWorker: utils.Ternary(that.TiKVWorker != "", that.TiKVWorker, v.TiKVWorker).(string), PD: utils.Ternary(that.PD != "", that.PD, v.PD).(string), TSO: utils.Ternary(that.TSO != "", that.TSO, v.TSO).(string), Scheduling: utils.Ternary(that.Scheduling != "", that.Scheduling, v.Scheduling).(string), @@ -812,7 +818,7 @@ func (s *Specification) ComponentsByStopOrder() (comps []Component) { // ComponentsByStartOrder return component in the order need to start. func (s *Specification) ComponentsByStartOrder() (comps []Component) { - // "pd", "tso", "scheduling", "resource-manager", "dashboard", "tiproxy", "tikv", "pump", "tidb", "tiflash", "drainer", "cdc", "tikv-cdc", "prometheus", "grafana", "alertmanager" + // "pd", "tso", "scheduling", "resource-manager", "dashboard", "tiproxy", "tikv", "tikv-worker", "pump", "tidb", "tiflash", "drainer", "cdc", "tikv-cdc", "prometheus", "grafana", "alertmanager" comps = append(comps, &PDComponent{s}) comps = append(comps, &TSOComponent{s}) comps = append(comps, &SchedulingComponent{s}) @@ -821,6 +827,7 @@ func (s *Specification) ComponentsByStartOrder() (comps []Component) { comps = append(comps, &DashboardComponent{s}) comps = append(comps, &TiProxyComponent{s}) comps = append(comps, &TiKVComponent{s}) + comps = append(comps, &TiKVWorkerComponent{s}) comps = append(comps, &PumpComponent{s}) comps = append(comps, &TiDBComponent{s}) comps = append(comps, &TiFlashComponent{s}) @@ -840,7 +847,7 @@ func (s *Specification) ComponentsByUpdateOrder(curVer string) (comps []Componen // Ref: https://github.com/pingcap/tiup/issues/2166 cdcUpgradeBeforePDTiKVTiDB := tidbver.TiCDCUpgradeBeforePDTiKVTiDB(curVer) - // "tiflash", <"cdc">, "pd", "tso", "scheduling", "resource-manager","router", "dashboard", "tiproxy", "tikv", "pump", "tidb", "drainer", <"cdc>", "prometheus", "grafana", "alertmanager" + // "tiflash", <"cdc">, "pd", "tso", "scheduling", "resource-manager","router", "dashboard", "tiproxy", "tikv", "tikv-worker", "pump", "tidb", "drainer", <"cdc>", "prometheus", "grafana", "alertmanager" comps = append(comps, &TiFlashComponent{s}) if cdcUpgradeBeforePDTiKVTiDB { comps = append(comps, &CDCComponent{s}) @@ -853,6 +860,7 @@ func (s *Specification) ComponentsByUpdateOrder(curVer string) (comps []Componen comps = append(comps, &DashboardComponent{s}) comps = append(comps, &TiProxyComponent{s}) comps = append(comps, &TiKVComponent{s}) + comps = append(comps, &TiKVWorkerComponent{s}) comps = append(comps, &PumpComponent{s}) comps = append(comps, &TiDBComponent{s}) comps = append(comps, &DrainerComponent{s}) diff --git a/pkg/cluster/spec/spec_test.go b/pkg/cluster/spec/spec_test.go index c40814541c..d986d42ebe 100644 --- a/pkg/cluster/spec/spec_test.go +++ b/pkg/cluster/spec/spec_test.go @@ -32,6 +32,7 @@ func TestDefaultDataDir(t *testing.T) { // Test with without global DataDir. topo := new(Specification) topo.TiKVServers = append(topo.TiKVServers, &TiKVSpec{Host: "1.1.1.1", Port: 22}) + topo.TiKVWorkerServers = append(topo.TiKVWorkerServers, &TiKVWorkerSpec{Host: "1.1.1.2", Port: 22}) topo.CDCServers = append(topo.CDCServers, &CDCSpec{Host: "2.3.3.3", Port: 22}) topo.TiKVCDCServers = append(topo.TiKVCDCServers, &TiKVCDCSpec{Host: "3.3.3.3", Port: 22}) data, err := yaml.Marshal(topo) @@ -43,6 +44,7 @@ func TestDefaultDataDir(t *testing.T) { require.NoError(t, err) require.Equal(t, "data", topo.GlobalOptions.DataDir) require.Equal(t, "data", topo.TiKVServers[0].DataDir) + require.Equal(t, "data", topo.TiKVWorkerServers[0].DataDir) require.Equal(t, "data", topo.CDCServers[0].DataDir) require.Equal(t, "data", topo.TiKVCDCServers[0].DataDir) @@ -54,6 +56,7 @@ func TestDefaultDataDir(t *testing.T) { require.NoError(t, err) require.Equal(t, "data", topo.GlobalOptions.DataDir) require.Equal(t, "data", topo.TiKVServers[0].DataDir) + require.Equal(t, "data", topo.TiKVWorkerServers[0].DataDir) require.Equal(t, "data", topo.CDCServers[0].DataDir) require.Equal(t, "data", topo.TiKVCDCServers[0].DataDir) @@ -62,6 +65,8 @@ func TestDefaultDataDir(t *testing.T) { topo.GlobalOptions.DataDir = "/global_data" topo.TiKVServers = append(topo.TiKVServers, &TiKVSpec{Host: "1.1.1.1", Port: 22}) topo.TiKVServers = append(topo.TiKVServers, &TiKVSpec{Host: "1.1.1.2", Port: 33, DataDir: "/my_data"}) + topo.TiKVWorkerServers = append(topo.TiKVWorkerServers, &TiKVWorkerSpec{Host: "1.1.1.3", Port: 22}) + topo.TiKVWorkerServers = append(topo.TiKVWorkerServers, &TiKVWorkerSpec{Host: "1.1.1.4", Port: 33, DataDir: "/my_worker_data"}) topo.CDCServers = append(topo.CDCServers, &CDCSpec{Host: "2.3.3.3", Port: 22}) topo.CDCServers = append(topo.CDCServers, &CDCSpec{Host: "2.3.3.4", Port: 22, DataDir: "/cdc_data"}) topo.TiKVCDCServers = append(topo.TiKVCDCServers, &TiKVCDCSpec{Host: "3.3.3.3", Port: 22}) @@ -76,6 +81,9 @@ func TestDefaultDataDir(t *testing.T) { require.Equal(t, "/global_data/tikv-22", topo.TiKVServers[0].DataDir) require.Equal(t, "/my_data", topo.TiKVServers[1].DataDir) + require.Equal(t, "/global_data/tikv-worker-22", topo.TiKVWorkerServers[0].DataDir) + require.Equal(t, "/my_worker_data", topo.TiKVWorkerServers[1].DataDir) + require.Equal(t, "/global_data/cdc-22", topo.CDCServers[0].DataDir) require.Equal(t, "/cdc_data", topo.CDCServers[1].DataDir) diff --git a/pkg/cluster/spec/tikv_worker.go b/pkg/cluster/spec/tikv_worker.go new file mode 100644 index 0000000000..ce8d6fbfb3 --- /dev/null +++ b/pkg/cluster/spec/tikv_worker.go @@ -0,0 +1,258 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package spec + +import ( + "context" + "crypto/tls" + "fmt" + "path/filepath" + "strings" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/tiup/pkg/cluster/ctxt" + "github.com/pingcap/tiup/pkg/cluster/template/scripts" + "github.com/pingcap/tiup/pkg/meta" + "github.com/pingcap/tiup/pkg/utils" +) + +// TiKVWorkerSpec represents the TiKV-worker topology specification in topology.yaml. +type TiKVWorkerSpec struct { + Host string `yaml:"host"` + ManageHost string `yaml:"manage_host,omitempty" validate:"manage_host:editable"` + SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"` + Patched bool `yaml:"patched,omitempty"` + IgnoreExporter bool `yaml:"ignore_exporter,omitempty"` + Port int `yaml:"port" default:"19000"` + DeployDir string `yaml:"deploy_dir,omitempty"` + DataDir string `yaml:"data_dir,omitempty"` + LogDir string `yaml:"log_dir,omitempty"` + Source string `yaml:"source,omitempty" validate:"source:editable"` + NumaNode string `yaml:"numa_node,omitempty" validate:"numa_node:editable"` + NumaCores string `yaml:"numa_cores,omitempty" validate:"numa_cores:editable"` + Config map[string]any `yaml:"config,omitempty" validate:"config:ignore"` + ResourceControl meta.ResourceControl `yaml:"resource_control,omitempty" validate:"resource_control:editable"` + Arch string `yaml:"arch,omitempty"` + OS string `yaml:"os,omitempty"` +} + +// Status queries current status of the instance. +func (s *TiKVWorkerSpec) Status(_ context.Context, timeout time.Duration, tlsCfg *tls.Config, _ ...string) string { + return statusByHost(s.GetManageHost(), s.Port, "/healthz", timeout, tlsCfg) +} + +// Role returns the component role of the instance. +func (s *TiKVWorkerSpec) Role() string { + return ComponentTiKVWorker +} + +// SSH returns the host and SSH port of the instance. +func (s *TiKVWorkerSpec) SSH() (string, int) { + host := s.Host + if s.ManageHost != "" { + host = s.ManageHost + } + return host, s.SSHPort +} + +// GetMainPort returns the main port of the instance. +func (s *TiKVWorkerSpec) GetMainPort() int { + return s.Port +} + +// GetManageHost returns the manage host of the instance. +func (s *TiKVWorkerSpec) GetManageHost() string { + if s.ManageHost != "" { + return s.ManageHost + } + return s.Host +} + +// IgnoreMonitorAgent returns if the node does not have monitor agents available. +func (s *TiKVWorkerSpec) IgnoreMonitorAgent() bool { + return s.IgnoreExporter +} + +// TiKVWorkerComponent represents TiKV-worker component. +type TiKVWorkerComponent struct{ Topology *Specification } + +// Name implements Component interface. +func (c *TiKVWorkerComponent) Name() string { + return ComponentTiKVWorker +} + +// Role implements Component interface. +func (c *TiKVWorkerComponent) Role() string { + return ComponentTiKVWorker +} + +// Source implements Component interface. +func (c *TiKVWorkerComponent) Source() string { + if source := c.Topology.ComponentSources.TiKVWorker; source != "" { + return source + } + return ComponentTiKVWorker +} + +// CalculateVersion implements Component interface. +func (c *TiKVWorkerComponent) CalculateVersion(clusterVersion string) string { + if version := c.Topology.ComponentVersions.TiKVWorker; version != "" { + return version + } + return clusterVersion +} + +// SetVersion implements Component interface. +func (c *TiKVWorkerComponent) SetVersion(version string) { + c.Topology.ComponentVersions.TiKVWorker = version +} + +// Instances implements Component interface. +func (c *TiKVWorkerComponent) Instances() []Instance { + ins := make([]Instance, 0, len(c.Topology.TiKVWorkerServers)) + for _, s := range c.Topology.TiKVWorkerServers { + ins = append(ins, &TiKVWorkerInstance{BaseInstance{ + InstanceSpec: s, + Name: c.Name(), + Host: s.Host, + ManageHost: s.ManageHost, + Port: s.Port, + SSHP: s.SSHPort, + Source: s.Source, + NumaNode: s.NumaNode, + NumaCores: s.NumaCores, + Ports: []int{s.Port}, + Dirs: []string{ + s.DeployDir, + s.DataDir, + }, + StatusFn: s.Status, + UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { + return UptimeByHost(s.GetManageHost(), s.Port, timeout, tlsCfg) + }, + Component: c, + }, c.Topology}) + } + return ins +} + +// TiKVWorkerInstance represents the TiKV-worker instance. +type TiKVWorkerInstance struct { + BaseInstance + topo Topology +} + +// ScaleConfig deploy temporary config on scaling. +func (i *TiKVWorkerInstance) ScaleConfig( + ctx context.Context, + e ctxt.Executor, + topo Topology, + clusterName, + clusterVersion, + deployUser string, + paths meta.DirPaths, +) error { + orig := i.topo + defer func() { i.topo = orig }() + i.topo = mustBeClusterTopo(topo) + return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths) +} + +// InitConfig implements Instance interface. +func (i *TiKVWorkerInstance) InitConfig( + ctx context.Context, + e ctxt.Executor, + clusterName, + clusterVersion, + deployUser string, + paths meta.DirPaths, +) error { + topo := i.topo.(*Specification) + if err := i.BaseInstance.InitConfig(ctx, e, topo.GlobalOptions, deployUser, paths); err != nil { + return err + } + + enableTLS := topo.GlobalOptions.TLSEnabled + spec := i.InstanceSpec.(*TiKVWorkerSpec) + + pds := make([]string, 0, len(topo.PDServers)) + for _, pdspec := range topo.PDServers { + pds = append(pds, pdspec.GetAdvertiseClientURL(enableTLS)) + } + + cfg := &scripts.TiKVWorkerScript{ + Addr: utils.JoinHostPort(i.GetHost(), spec.Port), + PD: strings.Join(pds, ","), + DeployDir: paths.Deploy, + LogDir: paths.Log, + NumaNode: spec.NumaNode, + NumaCores: spec.NumaCores, + } + + fp := filepath.Join(paths.Cache, fmt.Sprintf("run_tikv-worker_%s_%d.sh", i.GetHost(), i.GetPort())) + if err := cfg.ConfigToFile(fp); err != nil { + return err + } + dst := filepath.Join(paths.Deploy, "scripts", "run_tikv-worker.sh") + if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil { + return err + } + if _, _, err := e.Execute(ctx, "chmod +x "+dst, false); err != nil { + return err + } + + globalConfig := topo.ServerConfigs.TiKVWorker + + var err error + spec.Config, err = i.setTLSConfig(ctx, enableTLS, spec.Config, paths) + if err != nil { + return err + } + if err := i.MergeServerConfig(ctx, e, globalConfig, spec.Config, paths); err != nil { + return err + } + + if len(pds) == 0 { + return errors.New("tikv-worker requires at least one PD server") + } + + return checkConfig(ctx, e, i.ComponentName(), i.ComponentSource(), clusterVersion, i.OS(), i.Arch(), i.ComponentName()+".toml", paths) +} + +// setTLSConfig sets TLS config for TiKV-worker. +func (i *TiKVWorkerInstance) setTLSConfig(ctx context.Context, enableTLS bool, configs map[string]any, paths meta.DirPaths) (map[string]any, error) { + if enableTLS { + if configs == nil { + configs = make(map[string]any) + } + configs["security.ca-path"] = fmt.Sprintf("%s/tls/%s", paths.Deploy, TLSCACert) + configs["security.cert-path"] = fmt.Sprintf("%s/tls/%s.crt", paths.Deploy, i.Role()) + configs["security.key-path"] = fmt.Sprintf("%s/tls/%s.pem", paths.Deploy, i.Role()) + return configs, nil + } + + // delete TLS configs + if configs == nil { + return nil, nil + } + for _, key := range []string{ + "security.ca-path", + "security.cert-path", + "security.key-path", + } { + delete(configs, key) + } + return configs, nil +} diff --git a/pkg/cluster/spec/validate.go b/pkg/cluster/spec/validate.go index cae8861e06..0e9a27355c 100644 --- a/pkg/cluster/spec/validate.go +++ b/pkg/cluster/spec/validate.go @@ -919,6 +919,7 @@ func (s *Specification) validateTLSEnabled() error { ComponentResourceManager, ComponentTiDB, ComponentTiKV, + ComponentTiKVWorker, ComponentTiFlash, ComponentTiProxy, ComponentPump, diff --git a/pkg/cluster/task/copy_component.go b/pkg/cluster/task/copy_component.go index ab87035bcd..6348531268 100644 --- a/pkg/cluster/task/copy_component.go +++ b/pkg/cluster/task/copy_component.go @@ -19,7 +19,9 @@ import ( "github.com/pingcap/tiup/pkg/cluster/spec" "github.com/pingcap/tiup/pkg/environment" + logprinter "github.com/pingcap/tiup/pkg/logger/printer" "github.com/pingcap/tiup/pkg/repository" + "github.com/pingcap/tiup/pkg/utils" ) // CopyComponent is used to copy all files related the specific version a component @@ -51,9 +53,16 @@ func (c *CopyComponent) Execute(ctx context.Context) error { // Copy to remote server srcPath := c.srcPath + useDefaultSrcPath := srcPath == "" if srcPath == "" { srcPath = spec.PackagePath(c.component, c.version, c.os, c.arch) } + if useDefaultSrcPath && c.component == spec.ComponentTiKVWorker && utils.IsNotExist(srcPath) { + if logger, ok := ctx.Value(logprinter.ContextKeyLogger).(*logprinter.Logger); ok { + logger.Warnf("Skip copying %s:%s because package was not found at %s", c.component, c.version, srcPath) + } + return nil + } install := &InstallPackage{ srcPath: srcPath, diff --git a/pkg/cluster/task/download.go b/pkg/cluster/task/download.go index c177318b57..3f907fee0d 100644 --- a/pkg/cluster/task/download.go +++ b/pkg/cluster/task/download.go @@ -17,8 +17,11 @@ import ( "context" "fmt" + "github.com/pingcap/errors" operator "github.com/pingcap/tiup/pkg/cluster/operation" + "github.com/pingcap/tiup/pkg/cluster/spec" "github.com/pingcap/tiup/pkg/environment" + "github.com/pingcap/tiup/pkg/logger/printer" "github.com/pingcap/tiup/pkg/repository" ) @@ -42,7 +45,7 @@ func NewDownloader(component string, os string, arch string, version string) *Do } // Execute implements the Task interface -func (d *Downloader) Execute(_ context.Context) error { +func (d *Downloader) Execute(ctx context.Context) error { // If the version is not specified, the last stable one will be used if d.version == "" { env := environment.GlobalEnv() @@ -55,7 +58,17 @@ func (d *Downloader) Execute(_ context.Context) error { } d.version = string(ver) } - return operator.Download(d.component, d.os, d.arch, d.version) + err := operator.Download(d.component, d.os, d.arch, d.version) + if err == nil { + return nil + } + if d.component == spec.ComponentTiKVWorker && (errors.Cause(err) == repository.ErrUnknownComponent || errors.Cause(err) == repository.ErrUnknownVersion) { + if logger, ok := ctx.Value(logprinter.ContextKeyLogger).(*logprinter.Logger); ok { + logger.Warnf("Skip downloading %s:%s (%s/%s): %s", d.component, d.version, d.os, d.arch, err.Error()) + } + return nil + } + return err } // Rollback implements the Task interface diff --git a/pkg/cluster/task/update_meta.go b/pkg/cluster/task/update_meta.go index e5b0fc105e..3d68886555 100644 --- a/pkg/cluster/task/update_meta.go +++ b/pkg/cluster/task/update_meta.go @@ -65,6 +65,15 @@ func (u *UpdateMeta) Execute(ctx context.Context) error { } newMeta.Topology.TiKVServers = tikvServers + tikvWorkerServers := make([]*spec.TiKVWorkerSpec, 0) + for i, instance := range (&spec.TiKVWorkerComponent{Topology: topo}).Instances() { + if deleted.Exist(instance.ID()) { + continue + } + tikvWorkerServers = append(tikvWorkerServers, topo.TiKVWorkerServers[i]) + } + newMeta.Topology.TiKVWorkerServers = tikvWorkerServers + pdServers := make([]*spec.PDSpec, 0) for i, instance := range (&spec.PDComponent{Topology: topo}).Instances() { if deleted.Exist(instance.ID()) { diff --git a/pkg/cluster/template/config/prometheus.go b/pkg/cluster/template/config/prometheus.go index f20b30acce..647a15a11a 100644 --- a/pkg/cluster/template/config/prometheus.go +++ b/pkg/cluster/template/config/prometheus.go @@ -34,6 +34,7 @@ type PrometheusConfig struct { TiDBStatusAddrs []string TiProxyStatusAddrs []string TiKVStatusAddrs []string + TiKVWorkerAddrs []string PDAddrs []string TSOAddrs []string SchedulingAddrs []string @@ -97,6 +98,12 @@ func (c *PrometheusConfig) AddTiKV(ip string, port uint64) *PrometheusConfig { return c } +// AddTiKVWorker add a tikv-worker address. +func (c *PrometheusConfig) AddTiKVWorker(ip string, port uint64) *PrometheusConfig { + c.TiKVWorkerAddrs = append(c.TiKVWorkerAddrs, utils.JoinHostPort(ip, int(port))) + return c +} + // AddPD add a PD address func (c *PrometheusConfig) AddPD(ip string, port uint64) *PrometheusConfig { c.PDAddrs = append(c.PDAddrs, utils.JoinHostPort(ip, int(port))) diff --git a/pkg/cluster/template/scripts/tikv_worker.go b/pkg/cluster/template/scripts/tikv_worker.go new file mode 100644 index 0000000000..bee09f55ae --- /dev/null +++ b/pkg/cluster/template/scripts/tikv_worker.go @@ -0,0 +1,56 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package scripts + +import ( + "bytes" + "path" + "text/template" + + "github.com/pingcap/tiup/embed" + "github.com/pingcap/tiup/pkg/utils" +) + +// TiKVWorkerScript represents the data to generate TiKV-worker start script. +type TiKVWorkerScript struct { + Addr string + PD string + + DeployDir string + LogDir string + + NumaNode string + NumaCores string +} + +// ConfigToFile writes the script content to the target file. +func (c *TiKVWorkerScript) ConfigToFile(file string) error { + fp := path.Join("templates", "scripts", "run_tikv-worker.sh.tpl") + tpl, err := embed.ReadTemplate(fp) + if err != nil { + return err + } + + tmpl, err := template.New("TiKVWorker").Parse(string(tpl)) + if err != nil { + return err + } + + content := bytes.NewBufferString("") + if err := tmpl.Execute(content, c); err != nil { + return err + } + + return utils.WriteFile(file, content.Bytes(), 0755) +}