Skip to content

Commit f47b8ae

Browse files
authored
[CLI-3426] CLI support for Flink CCN routing (#3044)
1 parent 65af2c4 commit f47b8ae

File tree

77 files changed

+950
-107
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

77 files changed

+950
-107
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ require (
3737
github.com/confluentinc/ccloud-sdk-go-v2/ksql v0.2.0
3838
github.com/confluentinc/ccloud-sdk-go-v2/mds v0.4.0
3939
github.com/confluentinc/ccloud-sdk-go-v2/metrics v0.2.0
40-
github.com/confluentinc/ccloud-sdk-go-v2/networking v0.12.0
40+
github.com/confluentinc/ccloud-sdk-go-v2/networking v0.14.0
4141
github.com/confluentinc/ccloud-sdk-go-v2/networking-access-point v0.5.0
4242
github.com/confluentinc/ccloud-sdk-go-v2/networking-dnsforwarder v0.4.0
4343
github.com/confluentinc/ccloud-sdk-go-v2/networking-gateway v0.2.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,8 +232,8 @@ github.com/confluentinc/ccloud-sdk-go-v2/mds v0.4.0 h1:jIXXhGi+Xn+XYFCErnMvd035Q
232232
github.com/confluentinc/ccloud-sdk-go-v2/mds v0.4.0/go.mod h1:ufn9In8kDsyJ7Nru2ygpAaWdGw7DSDTOTtDhQVSmZjs=
233233
github.com/confluentinc/ccloud-sdk-go-v2/metrics v0.2.0 h1:TWwZHdfo2XNKrnGOuxXx4LF8WgahqqDC47Ap51L4thM=
234234
github.com/confluentinc/ccloud-sdk-go-v2/metrics v0.2.0/go.mod h1:odGsHChrn2l+jaOvx4Gib5//U4a3Id79wstQVkNh8v0=
235-
github.com/confluentinc/ccloud-sdk-go-v2/networking v0.12.0 h1:eaIjlpKyjW6WgOKqR2/v1oITgK/m/eg4PdIrmIN8fbc=
236-
github.com/confluentinc/ccloud-sdk-go-v2/networking v0.12.0/go.mod h1:jL9lLHYwFKzCJE5Fh62UdRqJCMJ9T/xg5QOdnQRYIUg=
235+
github.com/confluentinc/ccloud-sdk-go-v2/networking v0.14.0 h1:btDFGijvzpWaKLKztc7S9YenbtGTsHgjs4+fNuyYceI=
236+
github.com/confluentinc/ccloud-sdk-go-v2/networking v0.14.0/go.mod h1:6BDrwG6OfSnT++Yr3mXStvmy4Jb8uMtdfKK7sZO/X+M=
237237
github.com/confluentinc/ccloud-sdk-go-v2/networking-access-point v0.5.0 h1:q2sfstoFDF/ZUs0MapOzalwIjp4UjL6CK+cMNZmdzNM=
238238
github.com/confluentinc/ccloud-sdk-go-v2/networking-access-point v0.5.0/go.mod h1:HV1xGUwTsGEU3Mgvc+7Ya/0HRpUO69L2rqqxO7LeWMc=
239239
github.com/confluentinc/ccloud-sdk-go-v2/networking-dnsforwarder v0.4.0 h1:LT8trYIkMZ7EFM7nZZOkmH12B7AgOEvZYZjw86nazD0=

internal/flink/command.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ func New(cfg *config.Config, prerunner pcmd.PreRunner) *cobra.Command {
4545
cmd.AddCommand(c.newArtifactCommand())
4646
cmd.AddCommand(c.newComputePoolCommand())
4747
cmd.AddCommand(c.newConnectivityTypeCommand())
48+
cmd.AddCommand(c.newEndpointCommand())
4849
cmd.AddCommand(c.newRegionCommand())
4950
cmd.AddCommand(c.newShellCommand(prerunner))
5051
cmd.AddCommand(c.newStatementCommand())

internal/flink/command_connectivity_type_use.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,20 @@ func (c *command) newUseCommand() *cobra.Command {
1212
return &cobra.Command{
1313
Use: "use <region-access>",
1414
Short: "Select a Flink connectivity type.",
15-
Long: "Select a Flink connectivity type for the current environment as \"public\" or \"private\". If unspecified, the CLI will default to the connectivity type that was set at the organization level.",
15+
Long: "Select a Flink connectivity type for the current environment as \"public\" or \"private\". If unspecified, the CLI will default to public connectivity type.",
1616
Args: cobra.MatchAll(cobra.ExactArgs(1), cobra.OnlyValidArgs),
1717
ValidArgs: fields,
1818
RunE: c.ConnectivityTypeUse,
1919
}
2020
}
21+
2122
func (c *command) ConnectivityTypeUse(_ *cobra.Command, args []string) error {
23+
warning := errors.NewWarningWithSuggestions(
24+
`This command still works to select the connectivity type and set a public or private endpoint for Flink dataplane client.`,
25+
`\nAlternatively, you can run "confluent flink endpoint list" and "confluent flink endpoint use" to view and specify an active endpoint for Flink dataplane client, including CCN endpoints.`,
26+
)
27+
output.ErrPrint(true, warning.DisplayWarningWithSuggestions())
28+
2229
if err := c.Context.SetCurrentFlinkAccessType(args[0]); err != nil {
2330
return err
2431
}

internal/flink/command_endpoint.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package flink
2+
3+
import (
4+
"github.com/spf13/cobra"
5+
6+
pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
7+
)
8+
9+
const (
10+
publicFlinkEndpointType = "PUBLIC"
11+
privateFlinkEndpointType = "PRIVATE"
12+
)
13+
14+
type flinkEndpointOut struct {
15+
IsCurrent bool `human:"Current" serialized:"is_current"`
16+
Endpoint string `human:"Endpoint" serialized:"endpoint"`
17+
Cloud string `human:"Cloud" serialized:"cloud"`
18+
Region string `human:"Region" serialized:"region"`
19+
Type string `human:"Type" serialized:"type"`
20+
}
21+
22+
func (c *command) newEndpointCommand() *cobra.Command {
23+
cmd := &cobra.Command{
24+
Use: "endpoint",
25+
Short: "Manage Flink endpoint.",
26+
Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin},
27+
}
28+
29+
cmd.AddCommand(c.newEndpointListCommand())
30+
cmd.AddCommand(c.newEndpointUseCommand())
31+
cmd.AddCommand(c.newEndpointUnsetCommand())
32+
33+
return cmd
34+
}
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
package flink
2+
3+
import (
4+
"fmt"
5+
"sort"
6+
"strings"
7+
8+
"github.com/spf13/cobra"
9+
10+
networkingprivatelinkv1 "github.com/confluentinc/ccloud-sdk-go-v2/networking-privatelink/v1"
11+
12+
pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
13+
"github.com/confluentinc/cli/v4/pkg/errors"
14+
"github.com/confluentinc/cli/v4/pkg/examples"
15+
"github.com/confluentinc/cli/v4/pkg/output"
16+
)
17+
18+
type CloudRegionKey struct {
19+
cloud string
20+
region string
21+
}
22+
23+
func (c *command) newEndpointListCommand() *cobra.Command {
24+
cmd := &cobra.Command{
25+
Use: "list",
26+
RunE: c.endpointList,
27+
Short: "List Flink endpoint.",
28+
Example: examples.BuildExampleString(
29+
examples.Example{
30+
Text: "List the available Flink endpoints with current cloud provider and region.",
31+
Code: "confluent flink endpoint list",
32+
},
33+
),
34+
}
35+
36+
pcmd.AddContextFlag(cmd, c.CLICommand)
37+
pcmd.AddOutputFlag(cmd)
38+
39+
return cmd
40+
}
41+
42+
func (c *command) endpointList(cmd *cobra.Command, _ []string) error {
43+
// Get the current Flink cloud and region
44+
cloud := c.Context.GetCurrentFlinkCloudProvider()
45+
region := c.Context.GetCurrentFlinkRegion()
46+
if cloud == "" || region == "" {
47+
return errors.NewErrorWithSuggestions(
48+
"Current Flink cloud provider or region is empty",
49+
"Run `confluent flink region use --cloud <cloud> --region <region>` to set the Flink cloud provider and region first.",
50+
)
51+
}
52+
cloud = strings.ToUpper(cloud)
53+
54+
environmentId, err := c.Context.EnvironmentId()
55+
if err != nil {
56+
return err
57+
}
58+
59+
list := output.NewList(cmd)
60+
flinkRegions, err := c.V2Client.ListFlinkRegions(cloud, region)
61+
if err != nil {
62+
return fmt.Errorf("unable to list Flink endpoint, failed to list Flink regions: %w", err)
63+
}
64+
results := make([]*flinkEndpointOut, 0, len(flinkRegions)*2)
65+
66+
// 1 - List all the public endpoints based optionally on cloud(upper case) and region(lower case)
67+
for _, flinkRegion := range flinkRegions {
68+
results = append(results, &flinkEndpointOut{
69+
IsCurrent: flinkRegion.GetHttpEndpoint() == c.Context.GetCurrentFlinkEndpoint(),
70+
Endpoint: flinkRegion.GetHttpEndpoint(),
71+
Cloud: flinkRegion.GetCloud(),
72+
Region: flinkRegion.GetRegionName(),
73+
Type: publicFlinkEndpointType,
74+
})
75+
}
76+
77+
// 2 - List all the private endpoints based on the presence of "READY" PrivateLinkAttachments as filter
78+
// Note the `cloud` and `region` parameters have to be `nil` instead of empty slice in case of no filter
79+
platts, err := c.V2Client.ListPrivateLinkAttachments(environmentId, nil, nil, nil, []string{"READY"})
80+
if err != nil {
81+
return fmt.Errorf("unable to list Flink endpoint, failed to list private link attachments: %w", err)
82+
}
83+
84+
filterKeyMap := buildCloudRegionKeyFilterMapFromPrivateLinkAttachments(platts)
85+
86+
for _, flinkRegion := range flinkRegions {
87+
key := CloudRegionKey{
88+
cloud: flinkRegion.GetCloud(),
89+
region: flinkRegion.GetRegionName(),
90+
}
91+
92+
if _, ok := filterKeyMap[key]; ok {
93+
results = append(results, &flinkEndpointOut{
94+
IsCurrent: flinkRegion.GetPrivateHttpEndpoint() == c.Context.GetCurrentFlinkEndpoint(),
95+
Endpoint: flinkRegion.GetPrivateHttpEndpoint(),
96+
Cloud: flinkRegion.GetCloud(),
97+
Region: flinkRegion.GetRegionName(),
98+
Type: privateFlinkEndpointType,
99+
})
100+
}
101+
}
102+
103+
// 3 - List all the CCN endpoint with the list of "READY" network domains
104+
// Note the cloud and region have to be empty slice instead of `nil` in case of no filter
105+
networks, err := c.V2Client.ListNetworks(environmentId, nil, []string{cloud}, []string{region}, nil, []string{"READY"}, nil)
106+
if err != nil {
107+
return fmt.Errorf("unable to list Flink endpoint, failed to list networks: %w", err)
108+
}
109+
110+
for _, network := range networks {
111+
suffix := network.Status.GetEndpointSuffix()
112+
endpoint := fmt.Sprintf("https://flink%s", suffix)
113+
results = append(results, &flinkEndpointOut{
114+
IsCurrent: endpoint == c.Context.GetCurrentFlinkEndpoint(),
115+
Endpoint: endpoint,
116+
Cloud: network.Spec.GetCloud(),
117+
Region: network.Spec.GetRegion(),
118+
Type: privateFlinkEndpointType,
119+
})
120+
}
121+
122+
// Sort the results order by cloud, region, type and endpoint
123+
sort.Slice(results, func(i, j int) bool {
124+
if results[i].Cloud != results[j].Cloud {
125+
return results[i].Cloud < results[j].Cloud
126+
}
127+
if results[i].Region != results[j].Region {
128+
return results[i].Region < results[j].Region
129+
}
130+
if results[i].Type != results[j].Type {
131+
return results[i].Type < results[j].Type
132+
}
133+
return results[i].Endpoint < results[j].Endpoint
134+
})
135+
136+
for _, result := range results {
137+
list.Add(&flinkEndpointOut{
138+
IsCurrent: result.IsCurrent,
139+
Endpoint: result.Endpoint,
140+
Cloud: result.Cloud,
141+
Region: result.Region,
142+
Type: result.Type,
143+
})
144+
}
145+
146+
// Disable the default sort to use the custom sort above
147+
list.Sort(false)
148+
return list.Print()
149+
}
150+
151+
// buildCloudRegionKeyFilterMapFromPrivateLinkAttachments creates a map of unique cloud/region pairs from PrivateLinkAttachments.
152+
// This function helps deduplicate scenarios where users have multiple private link attachments in the same cloud region.
153+
// Each unique combination of cloud and region is represented as a CloudRegionKey in the returned map.
154+
//
155+
// Parameters:
156+
// - platts: A slice of NetworkingV1PrivateLinkAttachment objects to process
157+
//
158+
// Returns:
159+
// - A map with CloudRegionKey as keys and boolean 'true' as values for each unique cloud/region combination
160+
func buildCloudRegionKeyFilterMapFromPrivateLinkAttachments(platts []networkingprivatelinkv1.NetworkingV1PrivateLinkAttachment) map[CloudRegionKey]bool {
161+
result := make(map[CloudRegionKey]bool, len(platts))
162+
for _, platt := range platts {
163+
if platt.Spec.GetCloud() == "" || platt.Spec.GetRegion() == "" {
164+
continue
165+
}
166+
compositeKey := CloudRegionKey{
167+
cloud: platt.Spec.GetCloud(),
168+
region: platt.Spec.GetRegion(),
169+
}
170+
result[compositeKey] = true
171+
}
172+
return result
173+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package flink
2+
3+
import (
4+
"reflect"
5+
"testing"
6+
7+
networkingprivatelinkv1 "github.com/confluentinc/ccloud-sdk-go-v2/networking-privatelink/v1"
8+
)
9+
10+
func TestBuildCloudRegionKeyFilterMapFromPrivateLinkAttachments(t *testing.T) {
11+
// Helper function to create a private link attachment with specified cloud and region
12+
createPlatt := func(cloud, region string) networkingprivatelinkv1.NetworkingV1PrivateLinkAttachment {
13+
spec := networkingprivatelinkv1.NewNetworkingV1PrivateLinkAttachmentSpec()
14+
spec.SetCloud(cloud)
15+
spec.SetRegion(region)
16+
17+
platt := networkingprivatelinkv1.NewNetworkingV1PrivateLinkAttachment()
18+
platt.SetSpec(*spec)
19+
return *platt
20+
}
21+
22+
tests := []struct {
23+
name string
24+
platts []networkingprivatelinkv1.NetworkingV1PrivateLinkAttachment
25+
expected map[CloudRegionKey]bool
26+
}{
27+
{
28+
name: "Empty slice",
29+
platts: []networkingprivatelinkv1.NetworkingV1PrivateLinkAttachment{},
30+
expected: map[CloudRegionKey]bool{},
31+
},
32+
{
33+
name: "Single attachment",
34+
platts: []networkingprivatelinkv1.NetworkingV1PrivateLinkAttachment{
35+
createPlatt("AWS", "us-east-1"),
36+
},
37+
expected: map[CloudRegionKey]bool{
38+
{cloud: "AWS", region: "us-east-1"}: true,
39+
},
40+
},
41+
{
42+
name: "Multiple unique attachments",
43+
platts: []networkingprivatelinkv1.NetworkingV1PrivateLinkAttachment{
44+
createPlatt("AWS", "us-east-1"),
45+
createPlatt("GCP", "us-central1"),
46+
createPlatt("AZURE", "eastus"),
47+
},
48+
expected: map[CloudRegionKey]bool{
49+
{cloud: "AWS", region: "us-east-1"}: true,
50+
{cloud: "GCP", region: "us-central1"}: true,
51+
{cloud: "AZURE", region: "eastus"}: true,
52+
},
53+
},
54+
{
55+
name: "Duplicate cloud/region combinations",
56+
platts: []networkingprivatelinkv1.NetworkingV1PrivateLinkAttachment{
57+
createPlatt("AWS", "us-east-1"),
58+
createPlatt("AWS", "us-east-1"),
59+
createPlatt("AWS", "us-west-1"),
60+
},
61+
expected: map[CloudRegionKey]bool{
62+
{cloud: "AWS", region: "us-east-1"}: true,
63+
{cloud: "AWS", region: "us-west-1"}: true,
64+
},
65+
},
66+
{
67+
name: "Empty cloud or region values are skipped",
68+
platts: []networkingprivatelinkv1.NetworkingV1PrivateLinkAttachment{
69+
createPlatt("", "us-east-1"),
70+
createPlatt("AWS", ""),
71+
createPlatt("", ""),
72+
createPlatt("GCP", "us-central1"),
73+
},
74+
expected: map[CloudRegionKey]bool{
75+
{cloud: "GCP", region: "us-central1"}: true,
76+
},
77+
},
78+
{
79+
name: "Mix of valid and invalid entries",
80+
platts: []networkingprivatelinkv1.NetworkingV1PrivateLinkAttachment{
81+
createPlatt("AWS", "us-east-1"),
82+
createPlatt("", "eu-west-1"),
83+
createPlatt("AZURE", "eastus"),
84+
createPlatt("GCP", ""),
85+
},
86+
expected: map[CloudRegionKey]bool{
87+
{cloud: "AWS", region: "us-east-1"}: true,
88+
{cloud: "AZURE", region: "eastus"}: true,
89+
},
90+
},
91+
}
92+
93+
for _, tt := range tests {
94+
t.Run(tt.name, func(t *testing.T) {
95+
got := buildCloudRegionKeyFilterMapFromPrivateLinkAttachments(tt.platts)
96+
if !reflect.DeepEqual(got, tt.expected) {
97+
t.Errorf("buildCloudRegionKeyFilterMapFromPrivateLinkAttachments() = %v, expected %v", got, tt.expected)
98+
}
99+
})
100+
}
101+
}

0 commit comments

Comments
 (0)