Operator v1: Support multiple external listeners in Cluster CRD#455
Operator v1: Support multiple external listeners in Cluster CRD#455paulzhang97 merged 5 commits intomainfrom
Conversation
f86e73d to
d1ecee8
Compare
| AuthenticationMethod string | ||
| } | ||
|
|
||
| // Encode returns the listenerTemplateSpec as a string in the format as below: |
There was a problem hiding this comment.
Is there a reason to not use json/yaml.Marshal here? If the need is to avoid quoting of potentially templated values, you should be able to work around that with a custom type and omitempty:
type TemplatedString string
type TemplatedInt string
func (t TemplatedString) MarshalYAML() {
// format the value so it's not escaped here. It should be possible to generate invalid JSON/YAML though I've not tried it myself...
}There was a problem hiding this comment.
You make me think it again 😄 . The issue here is how to wrap key and value with single quote '. I still could not figure out how to do it.
We need to encode it like { 'name': 'listener-name', 'address':'0.0.0.0'} rather than { "name": "listener-name", "address": "0.0.0.0" }.
There was a problem hiding this comment.
Why do we need to encode them with single quotes?
EDIT:
If that's the constraint, feel we'd be better off regexing the quotes after JSON marshaling than maintaining all this custom serialization code. Something like: s/(^|[^\\])"/'/g should do it. You just need to be sure to not replace escaped quotes in strings which is what the capture group there should do. Though a negative lookbehind should work as well if you're familiar with them and go's regex engine supports them. I'd like to better understand they why of single quotes before going down that path though ;)
There was a problem hiding this comment.
I haven't looked into why single quotes are needed. I feel that like it has something to do with config.Set().
I like your proposal. I will give it a try.
There was a problem hiding this comment.
Well. I made hybrid changes. See the last commit.
I failed to write MarshalJSON for allListenersTemplateSpec, since a list type of field inside the struct needs to be encoded as e.g. "redpanda.kafka_api":"[{...},{...}]" (with double quotes on the list value, the value needs to be a string) instead of redpanda.kafka_api":[{...},{...}]. I tried to pass the one without double quotes to Configurator, it does not work. Did not dig into much. Bottom of line, I would not want to change how it works today since it is how Cloudv2 configures additional listeners for Private Link today.
Any other idea?
There was a problem hiding this comment.
config.Set just calls yaml.Unmarshal (With some... interesting preprocessing). I see no reason why single quotes would be required.
// Set sets a field in pointer-to-struct p to a value, following yaml tags.
//
// Key: string containing the yaml field tags, e.g: 'rpk.admin_api'.
// Value: string representation of the value
func Set[T any](p *T, key, value string) error {
if key == "" {
return fmt.Errorf("key field must not be empty")
}
tags := strings.Split(key, ".")
for _, tag := range tags {
if _, _, err := splitTagIndex(tag); err != nil {
return err
}
}
finalTag := tags[len(tags)-1]
if len(tags) > 1 && (finalTag == "enabled" && tags[len(tags)-2] == "tls" || finalTag == "tls") {
switch value {
case "{}":
case "null":
case "true":
value = "{}"
case "false":
value = "null"
default:
// If the final tag is 'tls', it might be a value. So we continue
// and handle below.
if finalTag != "tls" {
return fmt.Errorf("%s must be true or {}", key)
}
}
if finalTag == "enabled" {
tags = tags[:len(tags)-1]
finalTag = tags[len(tags)-1]
}
}
field, other, err := getField(tags, "", reflect.ValueOf(p).Elem())
if err != nil {
return err
}
isOther := other != reflect.Value{}
// For Other fields, we need to wrap the value in key:value format when
// unmarshaling, and we forbid indexing.
if isOther {
if _, index, _ := splitTagIndex(finalTag); index >= 0 {
return fmt.Errorf("cannot index into unknown field %q", finalTag)
}
field = other
}
if !field.CanAddr() {
return errors.New("rpk bug, please describe how you encountered this at https://github.com/redpanda-data/redpanda/issues/new?assignees=&labels=kind%2Fbug&template=01_bug_report.md")
}
if isOther {
value = fmt.Sprintf("%s: %s", finalTag, value)
}
// If we cannot unmarshal, but our error looks like we are trying to
// unmarshal a single element into a slice, we index[0] into the slice
// and try unmarshaling again.
rawv := []byte(value)
if err := yaml.Unmarshal(rawv, field.Addr().Interface()); err != nil {
// First we try wrapped with [ and ].
if wrapped, ok := tryValueAsUnwrappedArray(field, value, err); ok {
if err := yaml.Unmarshal([]byte(wrapped), field.Addr().Interface()); err == nil {
return nil
}
}
// If that still fails, we try setting a slice value if the
// target is a slice.
if elem0, ok := tryValueAsSlice0(field, err); ok {
return yaml.Unmarshal(rawv, elem0.Addr().Interface())
}
return err
}
return nil
}It also seems like it's be reasonably easy to side step config.Set in favor of directly setting these values in the configurator. Something like:
res, err := utils.Compute(v, utils.NewEndpointTemplateData(hostIndex, hostIP, hostIndexOffset), false)
if err != nil {
return err
}
var addlListeners []config.NamedAuthNSocketAddress
yaml.Unmarshal(*&res, addListeners)
nodeConfig.Redpanda.KafkaAPI = append(nodeConfig.Redpanda.KafkaAPI, addlListeners)It also appears that config.Set has been removed (or at least I can't find it) in newer versions of rpk 🤔
There was a problem hiding this comment.
I see config.set is still there.
I will take a look at replacing config.Set, and see whether it is viable since we would not want to change the way how the env ADDITIONAL_LISTENERS is set.
FYI, if using double quotes instead of single quotes, I get the error such as invalid character 'n' after object key:value pair.
There was a problem hiding this comment.
Update: it works if I use escaped double quotes as the outer string uses double quotes, like, "[{\"name\": \"mtls-kafka\", 'address': '{{ .Index }}-f415bda0-{{ .HostIP | sha256sum | substr 0 7 }}.redpanda.com', 'port': {{39002 | add .Index}}}]"
There was a problem hiding this comment.
I think I did it. Please check.
| httpBasic := "http_basic" | ||
| mtls := "mtls_identity" | ||
|
|
||
| require.NoError(t, vectorizedv1alpha1.AddToScheme(scheme.Scheme)) |
There was a problem hiding this comment.
Don't mutate global values. There's a package that exports a Scheme with both v1 and v2 CRs added that you can reference instead of building a new scheme here.
There was a problem hiding this comment.
I have not found the pkg so far. Could you send a link to it?
| } | ||
|
|
||
| // validateAuthNListeners checks whether listeners1 is equal to listeners2. | ||
| func validateAuthNListeners(t *testing.T, cfg1, cfg2 []config.NamedAuthNSocketAddress) { |
There was a problem hiding this comment.
Does require.ElementsMatch not work here?
| // vaule1 = [{'name':'mtls-kafka','port':{{9094 | add .Index | add .HostIndexOffset}}}] | ||
| // value2 = [{'name':'sasl-kafka','port': {{9092 | add .Index | add .HostIndexOffset}}}] | ||
| // Concat value = [{'name':'mtls-kafka','port':{{9094 | add .Index | add .HostIndexOffset}},{'name':'pl2-kafka','port': {{9092 | add .Index | add .HostIndexOffset}}}] | ||
| func (a *allListenersTemplateSpec) Concat(spec1 map[string]string) (string, error) { |
There was a problem hiding this comment.
Again citing the above comment, you should be able to ditch all of this if you have a type that can correctly serialize templated values.
There was a problem hiding this comment.
I know. I will think hard to see whether we can leverage customize MarshalJSON functions.
| return fmt.Sprintf("--advertise-rpc-addr=$(POD_NAME).%s:%d", serviceFQDN, rpcAPIPort) | ||
| } | ||
|
|
||
| // TODO |
339ee8d to
e2124f7
Compare
RafalKorepta
left a comment
There was a problem hiding this comment.
Should the
redpanda-operator/operator/api/vectorized/v1alpha1/cluster_webhook.go
Lines 1249 to 1315 in 4a4452c
| } else { | ||
| i = slices.IndexFunc(*advListeners, func(l config.NamedSocketAddress) bool { return !strings.Contains(l.Address, "svc.cluster.local") }) | ||
| if i != -1 { | ||
| externalAPICfg = &(*advListeners)[i] | ||
| } |
There was a problem hiding this comment.
I'm not user why we should look for any listener that does not contains svc.cluster.local. From my point of view the user exposed API is not direct on which listener should be external if this code is looking for listener that has a name or does not include Kubernetes Domain.
There was a problem hiding this comment.
I agree with that creating an advertised listener should not be depend on the name or actual address. But in reality, Operator hards the name for the only external listener as e.g. kafka-external.
Regarding looking for and address that does not contain svc.cluster.local, the code tries to look for an external address, if not, it falls back to whatever the next address that can be a svc.cluster.local, and use the address to set the address in an advertised listener if not set. It just treats a non svc.cluster.local in higher priority.
| for i := 0; i < len(listenerNames); i++ { | ||
| m := slices.IndexFunc(additionalTLSCfgs, func(t config.ServerTLS) bool { return t.Name == listenerNames[i] }) | ||
| if m != -1 { | ||
| cfg := additionalTLSCfgs[m] | ||
| cfg.Enabled = true | ||
| if cfg.CertFile == "" { | ||
| cfg.CertFile = serverTLSCfg.CertFile | ||
| } | ||
| if cfg.KeyFile == "" { | ||
| cfg.KeyFile = serverTLSCfg.KeyFile | ||
| } | ||
| if cfg.RequireClientAuth { | ||
| if cfg.TruststoreFile == "" { | ||
| cfg.TruststoreFile = serverTLSCfg.TruststoreFile | ||
| } | ||
| } else if cfg.TruststoreFile != "" { | ||
| cfg.RequireClientAuth = true | ||
| } | ||
| if cfg.Other == nil { | ||
| cfg.Other = serverTLSCfg.Other | ||
| } | ||
| *tlsCfgs = append(*tlsCfgs, cfg) | ||
| } else { | ||
| // additionalTLSCfgs does not have a config for the listener, use the default listener TLS config. | ||
| *tlsCfgs = append(*tlsCfgs, config.ServerTLS{ | ||
| Name: additionalAdvListeners[i].Name, | ||
| Name: listenerNames[i], |
There was a problem hiding this comment.
I don't understand this function. It get's first TLS configuration that matches default external listener name. Or just the first TLS configuration. Then it overwrites existing additional TLS configuration or creates new TLS configuration, but not in additional TLS list, but rather in TLS configuration.
Why this function mutates additionalTLSCfgs when tlsCfg have some some configuration. Please help me understand when this function should be used.
There was a problem hiding this comment.
Each of additionalTLSCfgs might not have all the fields (CertFile, KeyFile, RequiredClientAuth, TruststoreFile) set. The function tries to set each of the unset fields. It looks for the default external listener or the first listener if not, and use the TLS configuration in the listener to set each of the fields.
I will add a comment.
operator/pkg/networking/ports.go
Outdated
| internalListener := rpCluster.InternalListener() | ||
| externalListener := rpCluster.ExternalListener() | ||
| externalListeners := rpCluster.AllKafkaAPIExternalListeners() | ||
| adminAPIInternal := rpCluster.AdminAPIInternal() | ||
| adminAPIExternal := rpCluster.AdminAPIExternal() | ||
| proxyAPIInternal := rpCluster.PandaproxyAPIInternal() | ||
| proxyAPIExternal := rpCluster.PandaproxyAPIExternal() | ||
| proxyAPIExternals := rpCluster.AllPandaproxyAPIExternalListeners() |
There was a problem hiding this comment.
The naming is confusing as on one hand external starts with All, on the other hand it does not have any prefix.
Should rpCluster.AdminAPIExternal() be renamed to rpCluster.AllAdminAPIExternalListeners()?
There was a problem hiding this comment.
I would keep it same as I think we only support one internal and one external Admin API. And I would not want to rename the existing function AdminAPIExternal for backward compatibility.
|
|
||
| // ExternalListener returns external listener if found in configuration. Returns | ||
| // AllKafkaAPIExternalListeners returns all the Kafka external listeners. | ||
| func (r *Cluster) AllKafkaAPIExternalListeners() []KafkaAPI { |
There was a problem hiding this comment.
NIT: In my opinion ExternalListener function should be renamed to FirstExternalListener. Then AllKafkaAPIExternalListeners could become KafkaExternalListeners. The Prefix All seems obsolete.
There was a problem hiding this comment.
We would not want to rename existing function ExternalListener since it can be used by other apps out in the world as Operator is open source.
I will create FirstExternalListener and have ExternalListener to call it.
I will rename AllKafkaAPIExternalListeners
Is there a plan to have new version of CRD so we can deprecate it?
There was a problem hiding this comment.
Is there a plan to have new version of CRD so we can deprecate it?
Yes, but it take a while
| PortTemplate string `json:"portTemplate,omitempty"` | ||
| // NoPortExposure is an indication that tells the controller not to expose the node port | ||
| // in a Kubernetes service. This is useful when the port is exposed by other means. | ||
| NoPortExposure bool `json:"noPortExposure,omitempty"` |
There was a problem hiding this comment.
What "other means" would expose a port?
ExcludeFromService might be a more clear indicator of what this field controls.
There was a problem hiding this comment.
The ports for Private Link listeners are exposed externally.
I will rename it.
| SchemaRegistry *SchemaRegistryAPI `json:"schemaRegistry,omitempty"` | ||
| DeveloperMode bool `json:"developerMode,omitempty"` | ||
|
|
||
| AdditionalSchemaRegistry []SchemaRegistryAPI `json:"additionalSchemaRegistry,omitempty"` |
There was a problem hiding this comment.
What's the relationship between this field and SchemaRegistry? If it's being added to all specifying multiple schema registry listeners, I'd strongly prefer us to follow the existing convention and add a SchemaRegistryAPI []SchemaRegistryAPI field here.
You can then add a helper method (or perform some UnmarshalJSON tricks) to return an aggregated list. It would also be good to rename SchemaRegistry to DeprecatedSchemaRegistry so it doesn't get referred to. Either way you go, the relationship and desired usage of both these fields should be denoted as comments.
There was a problem hiding this comment.
I will rename AdditionalSchemaRegistry to SchemaRegistryAPI.
For backward compatibility, I don't think we can rename SchemaRegistry to DeprecatedSchemaRegistry. It would break any existing cluster CRs as we will not be able to update the existings at the same time when taking new version of Operator.
I can add marker indicating that it will be deprecated in .e.g v1beta1.
| host := r.Status.Nodes.SchemaRegistry.Internal | ||
| if r.IsSchemaRegistryExternallyAvailable() && r.IsSchemaRegistryTLSEnabled() { | ||
| host = r.Status.Nodes.SchemaRegistry.External | ||
| allSchemaRegistry := r.AllSchemaRegistryListeners() |
There was a problem hiding this comment.
This function appears to have exactly one consumer and your change has made the expected behavior very ambiguous. I think you'd be better served by removing this and related functions and instead replace it with a single function that returns a config.Schema struct suitable for consumption by Console.
From looking at the usages, it seems like you could wipe out a good chunk of these methods. In the cases where there are multiple consumers, I think you'll be better served by either inverting the behavior or by making these methods accept a name parameter.
There was a problem hiding this comment.
As it is only called by Console, we don't need to change it. I will revert.
| } | ||
|
|
||
| additionalListenerCfgNames := []string{"redpanda.kafka_api", "redpanda.advertised_kafka_api", "pandaproxy.pandaproxy_api", "pandaproxy.advertised_pandaproxy_api"} | ||
| structuredDecode := false |
There was a problem hiding this comment.
Rather than having a field that could decode in one of two ways, could we instead add a new field that takes precedence over the old one? This behavior will be very confusing to reasonable, especially so if something ever results in an older version of the configuration running with a new version of the operator.
There was a problem hiding this comment.
an older version of the configuration running with a new version of the operator.
It is what I would like to actually support. The new operation needs to be backward compatible. For example, when we take a new version of the operator, we will not have to make the corresponding changes to the upper layer at the same time. The operator needs to work with existing clusters.
There was a problem hiding this comment.
The operator needs to work with existing clusters.
The preferred way of doing so would be adding an optional field (Or envvar in this case) that the configurator will prefer, if it finds it. If it's not found, it will fallback to the previous behavior.
The operator itself would set both fields and the old field would be removed in a future release.
It's a tried and true way of maintaining backwards compatibility that has the bonus of being very obvious to those that did not implement it. Having a single field that may have one of two different formats, especially with a language like go that's doesn't have great enum support, is really confusing.
If it's not too big of a change (I don't think it is), would you mind making it before merging?
| type TemplatedInt string | ||
|
|
||
| func (t TemplatedInt) MarshalJSON() ([]byte, error) { | ||
| return []byte(fmt.Sprintf(`"removequote%sremovequote"`, t)), nil |
There was a problem hiding this comment.
Please extract the removequote into a const and explain why it's necessary in a comment.
Good catch. I was debating whether to update since cluster deployment will eventually fails if ports collide. |
| return res | ||
| } | ||
|
|
||
| // ExternalListener returns an external listener. It will be deprecated in the future. |
There was a problem hiding this comment.
nit: I think now is the time to mark this as deprecated.
| // ExternalListener returns an external listener. It will be deprecated in the future. | |
| // ExternalListener returns the first external kafka listener. | |
| // Deprecated: Prefer FirstExternalListener or KafkaAPIExternalListeners |
| } | ||
|
|
||
| additionalListenerCfgNames := []string{"redpanda.kafka_api", "redpanda.advertised_kafka_api", "pandaproxy.pandaproxy_api", "pandaproxy.advertised_pandaproxy_api"} | ||
| structuredDecode := false |
There was a problem hiding this comment.
The operator needs to work with existing clusters.
The preferred way of doing so would be adding an optional field (Or envvar in this case) that the configurator will prefer, if it finds it. If it's not found, it will fallback to the previous behavior.
The operator itself would set both fields and the old field would be removed in a future release.
It's a tried and true way of maintaining backwards compatibility that has the bonus of being very obvious to those that did not implement it. Having a single field that may have one of two different formats, especially with a language like go that's doesn't have great enum support, is really confusing.
If it's not too big of a change (I don't think it is), would you mind making it before merging?
| if err != nil { | ||
| return "", err | ||
| } | ||
| re := regexp.MustCompile(`"removequote|removequote"`) |
There was a problem hiding this comment.
Use the const here. It would also be preferable to pull this regex into a top level var so its only compiled once at init time.
If you want to keep it here and compile it on the fly, instead handle the error so we don't have to worry about accidental panics at runtime.
| continue | ||
| } | ||
| // Replace 'port': {{ ... }} with 'port': '{{ ... }}' for working with yaml.Unmarshal | ||
| re := regexp.MustCompile(`'port'\s*:\s*\{\{.*?\}\}`) |
There was a problem hiding this comment.
Some regex hoisting comment as above.
…atic configurations can be created in the base configmap
… and resolve comments
3c66597 to
8b5f705
Compare
…additional listener configs in JSON format in Configurator
8b5f705 to
80ac6be
Compare
💚 All backports created successfully
Note: Successful backport PRs will be merged automatically after passing CI. Questions ?Please refer to the Backport tool documentation |
💚 All backports created successfully
Note: Successful backport PRs will be merged automatically after passing CI. Questions ?Please refer to the Backport tool documentation |
3 similar comments
💚 All backports created successfully
Note: Successful backport PRs will be merged automatically after passing CI. Questions ?Please refer to the Backport tool documentation |
💚 All backports created successfully
Note: Successful backport PRs will be merged automatically after passing CI. Questions ?Please refer to the Backport tool documentation |
💚 All backports created successfully
Note: Successful backport PRs will be merged automatically after passing CI. Questions ?Please refer to the Backport tool documentation |
We support only one external listener in each of the API endpoints (Kafka, Proxy, and Schema Registry) in Cluster CRD. This PR is to support multiple external listeners in Cluster CRD.