Skip to content

Commit 628e9a2

Browse files
authored
[skip secret scan] CLI-1494 Add schema registry management commands with srsdk (#1185)
* add schema-registry schema commands create, describe and delete * add schema-registry subject commands onPrem * add sr config commands for on-prem and cloud * add check compatibility commands for on-prem and cloud * update compatibility command example * clean up code and structure * clean up and add error catcher * exclude schema-registry short length rule * add examples and error catcher * Update main.go * address comments i * rename response var to resp * address comments ii * fix bad merge and conflicts * Update command_config_describe.go * address comments iii * add compatibility and config unit and integration tests * Update command_compatibility_test.go * Update command_compatibility_test.go * address comments iv * update test output files * address comments v * fix naming and descriptions * update test fixture files * address comments vi * Update command_subject_update_onprem.go * Update subject-update-help.golden * address comments, revert breaking changes * Update command_subject_update_onprem.go
1 parent dd937df commit 628e9a2

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+1088
-257
lines changed

internal/cmd/kafka/command_topic.go

Lines changed: 0 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,16 @@
11
package kafka
22

33
import (
4-
"context"
5-
"encoding/binary"
6-
"encoding/json"
74
"fmt"
8-
"io/ioutil"
9-
"os"
10-
"path/filepath"
11-
"strconv"
125
"time"
136

147
ckafka "github.com/confluentinc/confluent-kafka-go/kafka"
15-
srsdk "github.com/confluentinc/schema-registry-sdk-go"
168
"github.com/spf13/cobra"
179

1810
pcmd "github.com/confluentinc/cli/internal/pkg/cmd"
1911
v1 "github.com/confluentinc/cli/internal/pkg/config/v1"
2012
"github.com/confluentinc/cli/internal/pkg/errors"
2113
"github.com/confluentinc/cli/internal/pkg/log"
22-
"github.com/confluentinc/cli/internal/pkg/output"
23-
"github.com/confluentinc/cli/internal/pkg/utils"
2414
)
2515

2616
const (
@@ -161,87 +151,3 @@ func (c *hasAPIKeyTopicCommand) validateTopic(client *ckafka.AdminClient, topic
161151
log.CliLogger.Tracef("validateTopic succeeded")
162152
return nil
163153
}
164-
165-
func registerSchemaWithAuth(cmd *cobra.Command, subject, schemaType, schemaPath string, refs []srsdk.SchemaReference, srClient *srsdk.APIClient, ctx context.Context) ([]byte, error) {
166-
schema, err := ioutil.ReadFile(schemaPath)
167-
if err != nil {
168-
return nil, err
169-
}
170-
171-
response, _, err := srClient.DefaultApi.Register(ctx, subject, srsdk.RegisterSchemaRequest{Schema: string(schema), SchemaType: schemaType, References: refs})
172-
if err != nil {
173-
return nil, err
174-
}
175-
176-
outputFormat, err := cmd.Flags().GetString(output.FlagName)
177-
if err != nil {
178-
return nil, err
179-
}
180-
if outputFormat == output.Human.String() {
181-
utils.Printf(cmd, errors.RegisteredSchemaMsg, response.Id)
182-
} else {
183-
err = output.StructuredOutput(outputFormat, &struct {
184-
Id int32 `json:"id" yaml:"id"`
185-
}{response.Id})
186-
if err != nil {
187-
return nil, err
188-
}
189-
}
190-
191-
metaInfo := []byte{0x0}
192-
schemaIdBuffer := make([]byte, 4)
193-
binary.BigEndian.PutUint32(schemaIdBuffer, uint32(response.Id))
194-
metaInfo = append(metaInfo, schemaIdBuffer...)
195-
return metaInfo, nil
196-
}
197-
198-
func readSchemaRefs(cmd *cobra.Command) ([]srsdk.SchemaReference, error) {
199-
var refs []srsdk.SchemaReference
200-
refPath, err := cmd.Flags().GetString("refs")
201-
if err != nil {
202-
return nil, err
203-
}
204-
if refPath != "" {
205-
refBlob, err := ioutil.ReadFile(refPath)
206-
if err != nil {
207-
return nil, err
208-
}
209-
err = json.Unmarshal(refBlob, &refs)
210-
if err != nil {
211-
return nil, err
212-
}
213-
}
214-
return refs, nil
215-
}
216-
217-
func storeSchemaReferences(refs []srsdk.SchemaReference, srClient *srsdk.APIClient, ctx context.Context) (map[string]string, error) {
218-
dir := filepath.Join(os.TempDir(), "ccloud-schema")
219-
if _, err := os.Stat(dir); os.IsNotExist(err) {
220-
err = os.Mkdir(dir, 0755)
221-
if err != nil {
222-
return nil, err
223-
}
224-
}
225-
226-
referencePathMap := map[string]string{}
227-
for _, ref := range refs {
228-
tempStorePath := filepath.Join(dir, ref.Name)
229-
if !fileExists(tempStorePath) {
230-
schema, _, err := srClient.DefaultApi.GetSchemaByVersion(ctx, ref.Subject, strconv.Itoa(int(ref.Version)), &srsdk.GetSchemaByVersionOpts{})
231-
if err != nil {
232-
return nil, err
233-
}
234-
err = os.MkdirAll(filepath.Dir(tempStorePath), 0755)
235-
if err != nil {
236-
return nil, err
237-
}
238-
err = ioutil.WriteFile(tempStorePath, []byte(schema.Schema), 0644)
239-
if err != nil {
240-
return nil, err
241-
}
242-
}
243-
referencePathMap[ref.Name] = tempStorePath
244-
}
245-
246-
return referencePathMap, nil
247-
}

internal/cmd/kafka/command_topic_consume_onprem.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func (c *authenticatedTopicCommand) onPremConsume(cmd *cobra.Command, args []str
8585
if c.State == nil { // require log-in to use oauthbearer token
8686
return errors.NewErrorWithSuggestions(errors.NotLoggedInErrorMsg, errors.AuthTokenSuggestion)
8787
}
88-
srClient, ctx, err = sr.GetAPIClientWithToken(cmd, nil, c.Version, c.AuthToken())
88+
srClient, ctx, err = sr.GetSrApiClientWithToken(cmd, nil, c.Version, c.AuthToken())
8989
if err != nil {
9090
return err
9191
}

internal/cmd/kafka/command_topic_produce.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func (c *hasAPIKeyTopicCommand) produce(cmd *cobra.Command, args []string) error
7979
if err != nil {
8080
return err
8181
}
82-
refs, err := readSchemaRefs(cmd)
82+
refs, err := sr.ReadSchemaRefs(cmd)
8383
if err != nil {
8484
return err
8585
}
@@ -243,12 +243,12 @@ func (c *hasAPIKeyTopicCommand) registerSchema(cmd *cobra.Command, valueFormat,
243243
}
244244
}
245245

246-
info, err := registerSchemaWithAuth(cmd, subject, schemaType, schemaPath, refs, srClient, ctx)
246+
info, err := sr.RegisterSchemaWithAuth(cmd, subject, schemaType, schemaPath, refs, srClient, ctx)
247247
if err != nil {
248248
return nil, nil, err
249249
}
250250
metaInfo = info
251-
referencePathMap, err = storeSchemaReferences(refs, srClient, ctx)
251+
referencePathMap, err = sr.StoreSchemaReferences(refs, srClient, ctx)
252252
if err != nil {
253253
return metaInfo, nil, err
254254
}

internal/cmd/kafka/command_topic_produce_onprem.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func (c *authenticatedTopicCommand) onPremProduce(cmd *cobra.Command, args []str
9393
if err != nil {
9494
return err
9595
}
96-
refs, err := readSchemaRefs(cmd)
96+
refs, err := sr.ReadSchemaRefs(cmd)
9797
if err != nil {
9898
return err
9999
}
@@ -182,17 +182,16 @@ func (c *authenticatedTopicCommand) registerSchema(cmd *cobra.Command, valueForm
182182
if c.State == nil { // require log-in to use oauthbearer token
183183
return nil, nil, errors.NewErrorWithSuggestions(errors.NotLoggedInErrorMsg, errors.AuthTokenSuggestion)
184184
}
185-
srClient, ctx, err := sr.GetAPIClientWithToken(cmd, nil, c.Version, c.AuthToken())
185+
srClient, ctx, err := sr.GetSrApiClientWithToken(cmd, nil, c.Version, c.AuthToken())
186186
if err != nil {
187187
return nil, nil, err
188188
}
189189

190-
info, err := registerSchemaWithAuth(cmd, subject, schemaType, schemaPath, refs, srClient, ctx)
190+
metaInfo, err = sr.RegisterSchemaWithAuth(cmd, subject, schemaType, schemaPath, refs, srClient, ctx)
191191
if err != nil {
192192
return nil, nil, err
193193
}
194-
metaInfo = info
195-
referencePathMap, err = storeSchemaReferences(refs, srClient, ctx)
194+
referencePathMap, err = sr.StoreSchemaReferences(refs, srClient, ctx)
196195
if err != nil {
197196
return metaInfo, nil, err
198197
}

internal/cmd/kafka/confluent_kafka.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/google/uuid"
2121
"github.com/spf13/cobra"
2222

23+
sr "github.com/confluentinc/cli/internal/cmd/schema-registry"
2324
configv1 "github.com/confluentinc/cli/internal/pkg/config/v1"
2425
"github.com/confluentinc/cli/internal/pkg/errors"
2526
"github.com/confluentinc/cli/internal/pkg/form"
@@ -347,7 +348,7 @@ func (h *GroupHandler) RequestSchema(value []byte) (string, map[string]string, e
347348
tempStorePath := filepath.Join(h.Properties.SchemaPath, fmt.Sprintf("%s-%d.txt", h.Subject, schemaID))
348349
tempRefStorePath := filepath.Join(h.Properties.SchemaPath, fmt.Sprintf("%s-%d.ref", h.Subject, schemaID))
349350
var references []srsdk.SchemaReference
350-
if !fileExists(tempStorePath) || !fileExists(tempRefStorePath) {
351+
if !utils.FileExists(tempStorePath) || !utils.FileExists(tempRefStorePath) {
351352
// TODO: add handler for writing schema failure
352353
getSchemaOpts := srsdk.GetSchemaOpts{
353354
Subject: optional.NewString(h.Subject),
@@ -382,7 +383,7 @@ func (h *GroupHandler) RequestSchema(value []byte) (string, map[string]string, e
382383
}
383384

384385
// Store the references in temporary files
385-
referencePathMap, err := storeSchemaReferences(references, h.SrClient, h.Ctx)
386+
referencePathMap, err := sr.StoreSchemaReferences(references, h.SrClient, h.Ctx)
386387
if err != nil {
387388
return "", nil, err
388389
}

internal/cmd/kafka/utils.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,6 @@ func copyMap(inputMap map[string]string) map[string]string {
2727
return newMap
2828
}
2929

30-
func fileExists(filename string) bool {
31-
info, err := os.Stat(filename)
32-
if os.IsNotExist(err) {
33-
return false
34-
}
35-
return !info.IsDir()
36-
}
37-
3830
func toCreateTopicConfigs(topicConfigsMap map[string]string) []kafkarestv3.ConfigData {
3931
topicConfigs := make([]kafkarestv3.ConfigData, len(topicConfigsMap))
4032
i := 0

internal/cmd/schema-registry/command.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@ func New(cfg *v1.Config, prerunner pcmd.PreRunner, srClient *srsdk.APIClient) *c
1919
c := pcmd.NewAuthenticatedCLICommand(cmd, prerunner)
2020

2121
c.AddCommand(newClusterCommand(cfg, prerunner, srClient))
22+
c.AddCommand(newCompatibilityCommand(cfg, prerunner, srClient))
23+
c.AddCommand(newConfigCommand(cfg, prerunner, srClient))
2224
c.AddCommand(newExporterCommand(prerunner, srClient))
23-
c.AddCommand(newSchemaCommand(prerunner, srClient))
24-
c.AddCommand(newSubjectCommand(prerunner, srClient))
25-
25+
c.AddCommand(newSchemaCommand(cfg, prerunner, srClient))
26+
c.AddCommand(newSubjectCommand(cfg, prerunner, srClient))
2627
return c.Command
2728
}
2829

internal/cmd/schema-registry/command_cluster.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func newClusterCommand(cfg *v1.Config, prerunner pcmd.PreRunner, srClient *srsdk
3636

3737
c.AddCommand(c.newDescribeCommand(cfg))
3838
c.AddCommand(c.newEnableCommand(cfg))
39-
c.AddCommand(c.newListCommand())
39+
c.AddCommand(c.newListCommandOnPrem())
4040
c.AddCommand(c.newUpdateCommand(cfg))
4141

4242
return c.Command

internal/cmd/schema-registry/command_cluster_list.go renamed to internal/cmd/schema-registry/command_cluster_list_onprem.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@ import (
1414

1515
const clusterType = "schema-registry-cluster"
1616

17-
func (c *clusterCommand) newListCommand() *cobra.Command {
17+
func (c *clusterCommand) newListCommandOnPrem() *cobra.Command {
1818
cmd := &cobra.Command{
1919
Use: "list",
2020
Short: "List registered Schema Registry clusters.",
2121
Long: "List Schema Registry clusters that are registered with the MDS cluster registry.",
2222
Args: cobra.NoArgs,
23-
RunE: pcmd.NewCLIRunE(c.list),
23+
RunE: pcmd.NewCLIRunE(c.onPremList),
2424
Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireOnPremLogin},
2525
}
2626

@@ -30,7 +30,7 @@ func (c *clusterCommand) newListCommand() *cobra.Command {
3030
return cmd
3131
}
3232

33-
func (c *clusterCommand) list(cmd *cobra.Command, _ []string) error {
33+
func (c *clusterCommand) onPremList(cmd *cobra.Command, _ []string) error {
3434
ctx := context.WithValue(context.Background(), mds.ContextAccessToken, c.State.AuthToken)
3535
opts := &mds.ClusterRegistryListOpts{ClusterType: optional.NewString(clusterType)}
3636

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package schemaregistry
2+
3+
import (
4+
srsdk "github.com/confluentinc/schema-registry-sdk-go"
5+
"github.com/spf13/cobra"
6+
7+
pcmd "github.com/confluentinc/cli/internal/pkg/cmd"
8+
v1 "github.com/confluentinc/cli/internal/pkg/config/v1"
9+
)
10+
11+
type compatibilityCommand struct {
12+
*pcmd.AuthenticatedStateFlagCommand
13+
srClient *srsdk.APIClient
14+
}
15+
16+
func newCompatibilityCommand(cfg *v1.Config, prerunner pcmd.PreRunner, srClient *srsdk.APIClient) *cobra.Command {
17+
cmd := &cobra.Command{
18+
Use: "compatibility",
19+
Short: "Validate schema compatibility.",
20+
Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireCloudLoginOrOnPremLogin},
21+
}
22+
23+
c := &compatibilityCommand{srClient: srClient}
24+
25+
if cfg.IsCloudLogin() {
26+
c.AuthenticatedStateFlagCommand = pcmd.NewAuthenticatedStateFlagCommand(cmd, prerunner)
27+
c.AddCommand(c.newValidateCommand())
28+
} else {
29+
c.AuthenticatedStateFlagCommand = pcmd.NewAuthenticatedWithMDSStateFlagCommand(cmd, prerunner)
30+
c.AddCommand(c.newValidateCommandOnPrem())
31+
}
32+
33+
return cmd
34+
}

0 commit comments

Comments
 (0)