Skip to content

Commit d2facf6

Browse files
authored
confluent local services changes for Confluent Platform 8.0 (#3082)
1 parent 9603c4a commit d2facf6

33 files changed

+532
-140
lines changed

cmd/docs/main.go

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,6 @@ func main() {
2929
panic(err)
3030
}
3131

32-
tempCH, err := createTemporaryConfluentHome()
33-
if err != nil {
34-
panic(err)
35-
}
36-
defer func() {
37-
_ = os.RemoveAll(tempCH)
38-
}()
39-
4032
// Generate documentation for both subsets of commands: cloud and on-prem
4133
configs := []*config.Config{
4234
{CurrentContext: "Cloud", Contexts: map[string]*config.Context{"Cloud": {PlatformName: "https://confluent.cloud"}}},
@@ -107,22 +99,3 @@ func removeLineFromFile(line, file string) error {
10799

108100
return os.WriteFile(file, out, 0644)
109101
}
110-
111-
func createTemporaryConfluentHome() (string, error) {
112-
dir := filepath.Join(os.TempDir(), "ch")
113-
if err := os.Setenv("CONFLUENT_HOME", dir); err != nil {
114-
return "", err
115-
}
116-
117-
path := filepath.Join(dir, "share/java/confluent-control-center/control-center-0.0.0.jar")
118-
119-
if err := os.MkdirAll(filepath.Dir(path), 0777); err != nil {
120-
return "", err
121-
}
122-
123-
if err := os.WriteFile(path, []byte{}, 0644); err != nil {
124-
return "", err
125-
}
126-
127-
return dir, nil
128-
}

cmd/lint/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ var properNouns = []string{
169169
"Key Encryption Key",
170170
"KEK",
171171
"Kotlin",
172+
"KRaft Controller",
172173
"Ktor",
173174
"Kubernetes",
174175
"Node.js",
@@ -258,6 +259,7 @@ var vocabWords = []string{
258259
"keychain",
259260
"kms",
260261
"kotlin",
262+
"kraft",
261263
"ksql",
262264
"ksqldb",
263265
"ktor",

internal/command.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func NewConfluentCommand(cfg *config.Config) *cobra.Command {
119119
cmd.AddCommand(iam.New(cfg, prerunner))
120120
cmd.AddCommand(kafka.New(cfg, prerunner))
121121
cmd.AddCommand(ksql.New(cfg, prerunner))
122-
cmd.AddCommand(local.New(prerunner))
122+
cmd.AddCommand(local.New(cfg, prerunner))
123123
cmd.AddCommand(login.New(cfg, prerunner, ccloudClientFactory, mdsClientManager, loginCredentialsManager, loginOrganizationManager, authTokenHandler))
124124
cmd.AddCommand(logout.New(cfg, prerunner, authTokenHandler))
125125
cmd.AddCommand(network.New(prerunner))

internal/local/command.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"github.com/spf13/cobra"
55

66
pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
7+
"github.com/confluentinc/cli/v4/pkg/config"
78
"github.com/confluentinc/cli/v4/pkg/local"
89
)
910

@@ -21,7 +22,7 @@ func NewLocalCommand(cmd *cobra.Command, prerunner pcmd.PreRunner) *command {
2122
}
2223
}
2324

24-
func New(prerunner pcmd.PreRunner) *cobra.Command {
25+
func New(cfg *config.Config, prerunner pcmd.PreRunner) *cobra.Command {
2526
cmd := &cobra.Command{
2627
Use: "local",
2728
Short: "Manage a local Confluent Platform development environment.",
@@ -35,7 +36,7 @@ func New(prerunner pcmd.PreRunner) *cobra.Command {
3536

3637
c.AddCommand(NewCurrentCommand(prerunner))
3738
c.AddCommand(NewDestroyCommand(prerunner))
38-
c.AddCommand(NewServicesCommand(prerunner))
39+
c.AddCommand(NewServicesCommand(cfg, prerunner))
3940
c.AddCommand(NewVersionCommand(prerunner))
4041

4142
return cmd

internal/local/command_service.go

Lines changed: 115 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@ import (
1717
"golang.org/x/text/cases"
1818
"golang.org/x/text/language"
1919

20+
"github.com/confluentinc/properties"
21+
2022
"github.com/confluentinc/cli/v4/pkg/cmd"
2123
"github.com/confluentinc/cli/v4/pkg/errors"
2224
"github.com/confluentinc/cli/v4/pkg/output"
2325
"github.com/confluentinc/cli/v4/pkg/spinner"
26+
"github.com/confluentinc/cli/v4/pkg/utils"
2427
)
2528

2629
func NewServiceCommand(service string, prerunner cmd.PreRunner) *cobra.Command {
@@ -132,6 +135,13 @@ func (c *command) runServiceStartCommand(cmd *cobra.Command, _ []string) error {
132135
}
133136

134137
for _, dependency := range services[service].startDependencies {
138+
compatible, err := c.isCompatibleService(dependency)
139+
if err != nil {
140+
return err
141+
}
142+
if !compatible {
143+
continue
144+
}
135145
if err := c.startService(dependency, ""); err != nil {
136146
return err
137147
}
@@ -187,6 +197,13 @@ func (c *command) runServiceStopCommand(cmd *cobra.Command, _ []string) error {
187197
}
188198

189199
for _, dependency := range services[service].stopDependencies {
200+
compatible, err := c.isCompatibleService(dependency)
201+
if err != nil {
202+
return err
203+
}
204+
if !compatible {
205+
continue
206+
}
190207
if err := c.stopService(dependency); err != nil {
191208
return err
192209
}
@@ -242,7 +259,12 @@ func NewServiceVersionCommand(service string, prerunner cmd.PreRunner) *cobra.Co
242259
func (c *command) runServiceVersionCommand(cmd *cobra.Command, _ []string) error {
243260
service := cmd.Parent().Name()
244261

245-
ver, err := c.ch.GetVersion(service)
262+
zookeeperMode, err := c.isZookeeperMode()
263+
if err != nil {
264+
return err
265+
}
266+
267+
ver, err := c.ch.GetVersion(service, zookeeperMode)
246268
if err != nil {
247269
return err
248270
}
@@ -252,7 +274,7 @@ func (c *command) runServiceVersionCommand(cmd *cobra.Command, _ []string) error
252274
}
253275

254276
func (c *command) startService(service, configFile string) error {
255-
if err := c.checkJavaVersion(service); err != nil {
277+
if err := c.checkJavaVersion(); err != nil {
256278
return err
257279
}
258280

@@ -286,7 +308,12 @@ func (c *command) startService(service, configFile string) error {
286308
}
287309

288310
func (c *command) configService(service, configFile string) error {
289-
port, err := c.ch.ReadServicePort(service)
311+
zookeeperMode, err := c.isZookeeperMode()
312+
if err != nil {
313+
return err
314+
}
315+
316+
port, err := c.ch.ReadServicePort(service, zookeeperMode)
290317
if err != nil {
291318
if err.Error() != "no port specified" {
292319
return err
@@ -297,7 +324,7 @@ func (c *command) configService(service, configFile string) error {
297324

298325
var data []byte
299326
if configFile == "" {
300-
data, err = c.ch.ReadServiceConfig(service)
327+
data, err = c.ch.ReadServiceConfig(service, zookeeperMode)
301328
} else {
302329
data, err = os.ReadFile(configFile)
303330
}
@@ -370,6 +397,11 @@ func (c *command) startProcess(service string) error {
370397
return err
371398
}
372399

400+
err = c.setupMetaProperties(service)
401+
if err != nil {
402+
return err
403+
}
404+
373405
start := exec.Command(scriptFile, configFile)
374406

375407
logFile, err := c.cc.GetLogFile(service)
@@ -675,7 +707,7 @@ func (c *command) checkOSVersion() error {
675707
return nil
676708
}
677709

678-
func (c *command) checkJavaVersion(service string) error {
710+
func (c *command) checkJavaVersion() error {
679711
java := filepath.Join(os.Getenv("JAVA_HOME"), "/bin/java")
680712
if os.Getenv("JAVA_HOME") == "" {
681713
out, err := exec.Command("which", "java").Output()
@@ -696,7 +728,7 @@ func (c *command) checkJavaVersion(service string) error {
696728
re := regexp.MustCompile(`.+ version "([\d._]+)"`)
697729
javaVersion := string(re.FindSubmatch(data)[1])
698730

699-
isValid, err := isValidJavaVersion(service, javaVersion)
731+
isValid, err := isValidJavaVersion(javaVersion)
700732
if err != nil {
701733
return err
702734
}
@@ -709,7 +741,7 @@ func (c *command) checkJavaVersion(service string) error {
709741
return nil
710742
}
711743

712-
func isValidJavaVersion(service, javaVersion string) (bool, error) {
744+
func isValidJavaVersion(javaVersion string) (bool, error) {
713745
// 1.8.0_152 -> 8.0_152 -> 8.0
714746
javaVersion = strings.TrimPrefix(javaVersion, "1.")
715747
javaVersion = strings.Split(javaVersion, "_")[0]
@@ -726,10 +758,6 @@ func isValidJavaVersion(service, javaVersion string) (bool, error) {
726758
return false, nil
727759
}
728760

729-
if service == "zookeeper" || service == "kafka" {
730-
return true, nil
731-
}
732-
733761
return true, nil
734762
}
735763

@@ -748,6 +776,8 @@ func writeServiceName(service string) string {
748776
switch service {
749777
case "kafka-rest":
750778
return "Kafka REST"
779+
case "kraft-controller":
780+
return "KRaft Controller"
751781
case "ksql-server":
752782
return "ksqlDB Server"
753783
case "zookeeper":
@@ -757,3 +787,77 @@ func writeServiceName(service string) string {
757787
return cases.Title(language.Und).String(service)
758788
}
759789
}
790+
791+
func (c *command) setupMetaProperties(service string) error {
792+
// Only KRaft Controller and Kafka need to set up meta.properties
793+
if service != "kraft-controller" && service != "kafka" {
794+
return nil
795+
}
796+
797+
// This step is only valid when running in KRaft mode
798+
zookeeperMode, err := c.isZookeeperMode()
799+
if err != nil {
800+
return err
801+
}
802+
if zookeeperMode {
803+
return nil
804+
}
805+
806+
dataDir, err := c.cc.GetDataDir(service)
807+
if err != nil {
808+
return err
809+
}
810+
var metaFile string
811+
switch service {
812+
case "kraft-controller":
813+
metaFile = filepath.Join(dataDir, "kraft-controller-logs", "meta.properties")
814+
case "kafka":
815+
metaFile = filepath.Join(dataDir, "kraft-broker-logs", "meta.properties")
816+
}
817+
if utils.FileExists(metaFile) { // formatting the properties file twice results in an error
818+
return nil
819+
}
820+
821+
kafkaStorage, err := c.ch.GetFile("bin", "kafka-storage")
822+
if err != nil {
823+
return err
824+
}
825+
826+
var uuid string
827+
var ok bool
828+
if service == "kraft-controller" {
829+
out, err := exec.Command(kafkaStorage, "random-uuid").Output()
830+
if err != nil {
831+
return err
832+
}
833+
uuid = strings.TrimSuffix(string(out), "\n")
834+
} else if service == "kafka" {
835+
// read the uuid from the controller meta.properties file since the broker needs to use the same id
836+
// this file should exist since the controller is a dependency of the broker, and hence started first
837+
controllerDataDir, err := c.cc.GetDataDir("kraft-controller")
838+
if err != nil {
839+
return err
840+
}
841+
controllerMetaFile := filepath.Join(controllerDataDir, "kraft-controller-logs", "meta.properties")
842+
controllerMetaProperties, err := properties.LoadFile(controllerMetaFile, properties.UTF8)
843+
if err != nil {
844+
return err
845+
}
846+
uuid, ok = controllerMetaProperties.Get("cluster.id")
847+
if !ok {
848+
return errors.New("unable to retrieve cluster id from KRaft controller meta.properties file")
849+
}
850+
}
851+
852+
configFile, err := c.cc.GetConfigFile(service)
853+
if err != nil {
854+
return err
855+
}
856+
857+
kafkaStorageArgs := []string{"format", "-t", uuid, "-c", configFile}
858+
if service == "kraft-controller" {
859+
kafkaStorageArgs = append(kafkaStorageArgs, "-s")
860+
}
861+
862+
return exec.Command(kafkaStorage, kafkaStorageArgs...).Run()
863+
}

internal/local/command_service_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,15 @@ func TestIsValidJavaVersion(t *testing.T) {
6262
var isValid bool
6363
var err error
6464

65-
isValid, err = isValidJavaVersion("", "1.8.0_152")
65+
isValid, err = isValidJavaVersion("1.8.0_152")
6666
req.NoError(err)
6767
req.True(isValid)
6868

69-
isValid, err = isValidJavaVersion("", "9.0.4")
69+
isValid, err = isValidJavaVersion("9.0.4")
7070
req.NoError(err)
7171
req.False(isValid)
7272

73-
isValid, err = isValidJavaVersion("zookeeper", "13")
73+
isValid, err = isValidJavaVersion("13")
7474
req.NoError(err)
7575
req.True(isValid)
7676
}

0 commit comments

Comments
 (0)