Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 7 additions & 101 deletions dubbod/planet/pkg/bootstrap/config_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,28 @@ package bootstrap

import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"encoding/pem"
"fmt"
configaggregate "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/config/aggregate"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/config/kube/gateway"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/features"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/leaderelection"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/leaderelection/k8sleaderelection/k8sresourcelock"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/gvr"
"k8s.io/apimachinery/pkg/api/errors"
"net/url"
"strings"

"github.com/apache/dubbo-kubernetes/api/networking/v1alpha3"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/config/kube/crdclient"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/config/kube/file"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/config/kube/gateway"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/config/memory"
dubboCredentials "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/credentials"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/credentials/kube"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/features"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/leaderelection"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/leaderelection/k8sleaderelection/k8sresourcelock"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/model"
"github.com/apache/dubbo-kubernetes/pkg/adsc"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/collections"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/gvr"
"github.com/apache/dubbo-kubernetes/pkg/log"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"net/url"
)

type ConfigSourceAddressScheme string
Expand Down Expand Up @@ -276,93 +269,6 @@ func (s *Server) initConfigController(args *PlanetArgs) error {
return nil
}

// verifyCert verifies given cert against TLS settings like SANs and CRL.
func (s *Server) verifyCert(certs [][]byte, tlsSettings *v1alpha3.ClientTLSSettings) error {
if len(certs) == 0 {
return fmt.Errorf("no certificates provided")
}
cert, err := x509.ParseCertificate(certs[0])
if err != nil {
return fmt.Errorf("failed to parse certificate: %w", err)
}

if len(tlsSettings.SubjectAltNames) > 0 {
sanMatchFound := false
for _, san := range cert.DNSNames {
if sanMatchFound {
break
}
for _, name := range tlsSettings.SubjectAltNames {
if san == name {
sanMatchFound = true
break
}
}
}
if !sanMatchFound {
return fmt.Errorf("no matching SAN found")
}
}

if len(tlsSettings.CaCrl) > 0 {
crlData := []byte(strings.TrimSpace(tlsSettings.CaCrl))
block, _ := pem.Decode(crlData)
if block != nil {
crlData = block.Bytes
}
crl, err := x509.ParseRevocationList(crlData)
if err != nil {
return fmt.Errorf("failed to parse CRL: %w", err)
}
for _, revokedCert := range crl.RevokedCertificateEntries {
if cert.SerialNumber.Cmp(revokedCert.SerialNumber) == 0 {
return fmt.Errorf("certificate is revoked")
}
}
}

return nil
}

// getTransportCredentials attempts to create credentials.TransportCredentials from ClientTLSSettings in mesh config
// Implemented only for SIMPLE_TLS mode
// TODO:
//
// Implement for MUTUAL_TLS/DUBBO_MUTUAL_TLS modes
func (s *Server) getTransportCredentials(args *PlanetArgs, tlsSettings *v1alpha3.ClientTLSSettings) (credentials.TransportCredentials, error) {
// TODO ValidateTLS

switch tlsSettings.GetMode() {
case v1alpha3.ClientTLSSettings_SIMPLE:
if len(tlsSettings.GetCredentialName()) > 0 {
rootCert, err := s.getRootCertFromSecret(tlsSettings.GetCredentialName(), args.Namespace)
if err != nil {
return nil, err
}
tlsSettings.CaCertificates = string(rootCert.Cert)
tlsSettings.CaCrl = string(rootCert.CRL)
}
if tlsSettings.GetInsecureSkipVerify().GetValue() || len(tlsSettings.GetCaCertificates()) == 0 {
return credentials.NewTLS(&tls.Config{
ServerName: tlsSettings.GetSni(),
}), nil
}
certPool := x509.NewCertPool()
if !certPool.AppendCertsFromPEM([]byte(tlsSettings.GetCaCertificates())) {
return nil, fmt.Errorf("failed to add ca certificate from configSource.tlsSettings to pool")
}
return credentials.NewTLS(&tls.Config{
ServerName: tlsSettings.GetSni(),
RootCAs: certPool,
VerifyPeerCertificate: func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error {
return s.verifyCert(rawCerts, tlsSettings)
},
}), nil
default:
return insecure.NewCredentials(), nil
}
}

// getRootCertFromSecret fetches a map of keys and values from a secret with name in namespace
func (s *Server) getRootCertFromSecret(name, namespace string) (*dubboCredentials.CertInfo, error) {
secret, err := s.kubeClient.Kube().CoreV1().Secrets(namespace).Get(context.Background(), name, v1.GetOptions{})
Expand Down
46 changes: 1 addition & 45 deletions dubbod/planet/pkg/bootstrap/dubbo_ca.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,12 @@ package bootstrap
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/features"
securityModel "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/security/model"
"github.com/apache/dubbo-kubernetes/dubbod/security/pkg/cmd"
"github.com/apache/dubbo-kubernetes/dubbod/security/cmd"
"github.com/apache/dubbo-kubernetes/dubbod/security/pkg/pki/ca"
"github.com/apache/dubbo-kubernetes/dubbod/security/pkg/pki/ra"
caserver "github.com/apache/dubbo-kubernetes/dubbod/security/pkg/server/ca"
"github.com/apache/dubbo-kubernetes/dubbod/security/pkg/server/ca/authenticate"
"github.com/apache/dubbo-kubernetes/dubbod/security/pkg/util"
"github.com/apache/dubbo-kubernetes/pkg/config/constants"
"github.com/apache/dubbo-kubernetes/pkg/env"
"github.com/apache/dubbo-kubernetes/pkg/log"
Expand All @@ -39,7 +35,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"os"
"path"
"strings"
"time"
)

Expand Down Expand Up @@ -108,24 +103,6 @@ func (s *Server) initCAServer(ca caserver.CertificateAuthority, opts *caOptions)
}

func (s *Server) RunCA(grpc *grpc.Server) {
iss := trustedIssuer.Get()
aud := audience.Get()

token, err := os.ReadFile(securityModel.ThirdPartyJwtPath)
if err == nil {
tok, err := detectAuthEnv(string(token))
if err != nil {
log.Warnf("Starting with invalid K8S JWT token: %v", err)
} else {
if iss == "" {
iss = tok.Iss
}
if len(tok.Aud) > 0 && len(aud) == 0 {
aud = tok.Aud[0]
}
}
}

s.caServer.Register(grpc)

log.Info("Dubbod CA has started")
Expand Down Expand Up @@ -351,27 +328,6 @@ func (s *Server) handleCACertsFileWatch() {
}
}

func detectAuthEnv(jwt string) (*authenticate.JwtPayload, error) {
jwtSplit := strings.Split(jwt, ".")
if len(jwtSplit) != 3 {
return nil, fmt.Errorf("invalid JWT parts: %s", jwt)
}
payload := jwtSplit[1]

payloadBytes, err := util.DecodeJwtPart(payload)
if err != nil {
return nil, fmt.Errorf("failed to decode jwt: %v", err.Error())
}

structuredPayload := &authenticate.JwtPayload{}
err = json.Unmarshal(payloadBytes, &structuredPayload)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal jwt: %v", err.Error())
}

return structuredPayload, nil
}

func handleEvent(s *Server) {
log.Info("Update Dubbod cacerts")

Expand Down
2 changes: 0 additions & 2 deletions dubbod/planet/pkg/model/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/config/protocol"
"github.com/apache/dubbo-kubernetes/pkg/config/visibility"
"github.com/apache/dubbo-kubernetes/pkg/maps"
"github.com/apache/dubbo-kubernetes/pkg/network"
"github.com/apache/dubbo-kubernetes/pkg/slices"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -167,7 +166,6 @@ type DubboEndpoint struct {
LegacyClusterPortKey int
EndpointPort uint32
WorkloadName string
Network network.ID
Namespace string
// Specifies the hostname of the Pod, empty for vm workload.
HostName string
Expand Down
6 changes: 3 additions & 3 deletions dubbod/planet/pkg/networking/grpcgen/cds.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (b *clusterBuilder) applyDestinationRule(defaultCluster *cluster.Cluster) (
recheckTLS = (tlsMode == networking.ClientTLSSettings_DUBBO_MUTUAL || tlsModeStr == "DUBBO_MUTUAL")
}
if hasTLS || recheckTLS {
log.Infof("applyDestinationRule: applying TLS to default cluster %s (DestinationRule has ISTIO_MUTUAL)", b.defaultClusterName)
log.Infof("applyDestinationRule: applying TLS to default cluster %s (DestinationRule has DUBBO_MUTUAL)", b.defaultClusterName)
b.applyTLSForCluster(defaultCluster, nil)
} else {
log.Debugf("applyDestinationRule: skipping TLS for default cluster %s (DestinationRule has no TrafficPolicy or TLS)", b.defaultClusterName)
Expand All @@ -270,7 +270,7 @@ func (b *clusterBuilder) applyDestinationRule(defaultCluster *cluster.Cluster) (
core.HealthStatus_DEGRADED,
},
}
log.Infof("applyDestinationRule: applying TLS to newly generated default cluster %s (DestinationRule has ISTIO_MUTUAL)", b.defaultClusterName)
log.Infof("applyDestinationRule: applying TLS to newly generated default cluster %s (DestinationRule has DUBBO_MUTUAL)", b.defaultClusterName)
b.applyTLSForCluster(defaultCluster, nil)
return nil, defaultCluster // Return the newly generated default cluster
}
Expand Down Expand Up @@ -327,7 +327,7 @@ func (b *clusterBuilder) applyDestinationRule(defaultCluster *cluster.Cluster) (
}

// applyTLSForCluster attaches a gRPC-compatible TLS transport socket whenever the
// DestinationRule (or subset override) specifies ISTIO_MUTUAL/DUBBO_MUTUAL mode.
// DestinationRule (or subset override) specifies DUBBO_MUTUAL/DUBBO_MUTUAL mode.
func (b *clusterBuilder) applyTLSForCluster(c *cluster.Cluster, subset *networking.Subset) {
if c == nil || b.svc == nil {
return
Expand Down
1 change: 0 additions & 1 deletion dubbod/planet/pkg/networking/grpcgen/grpcgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func (g *GrpcConfigGenerator) Generate(proxy *model.Proxy, w *model.WatchedResou
}

// buildCommonTLSContext creates a TLS context that matches gRPC xDS expectations.
// It is adapted from Istio's buildCommonTLSContext implementation, but kept minimal:
// - Uses certificate provider "default" for workload certs and root CA
// - Does not configure explicit SAN matches (left to future hardening)
func buildCommonTLSContext() *tlsv3.CommonTlsContext {
Expand Down
7 changes: 3 additions & 4 deletions dubbod/planet/pkg/networking/grpcgen/lds.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,9 @@ func buildInboundListeners(node *model.Proxy, push *model.PushContext, names []s
log.Debugf("buildInboundListeners: listener %s, service=%s, isGatewayPod=%v, node.Type=%v, node.IsRouter()=%v",
name, si.Service.Attributes.Name, isGatewayPod, node.Type, node.IsRouter())

// - DestinationRule with ISTIO_MUTUAL only configures CLIENT-SIDE (outbound) mTLS
// - DestinationRule with DUBBO_MUTUAL only configures CLIENT-SIDE (outbound) mTLS
// - PeerAuthentication with STRICT configures SERVER-SIDE (inbound) mTLS
// Both are REQUIRED for mTLS to work. Server-side mTLS should ONLY be controlled by PeerAuthentication.
// Reference: https://istio.io/latest/blog/2021/proxyless-grpc/#enabling-mtls
mode := push.InboundMTLSModeForProxy(node, uint32(listenPort))
if mode == model.MTLSPermissive {
log.Warnf("buildInboundListeners: PERMISSIVE mTLS is not supported for proxyless gRPC; defaulting to plaintext on listener %s", name)
Expand Down Expand Up @@ -518,7 +517,7 @@ func buildOutboundListeners(node *model.Proxy, push *model.PushContext, filter l
routeName := clusterName

// For gRPC proxyless, outbound listeners MUST use ApiListener with RDS
// This is the correct pattern used by Istio for gRPC xDS clients
// This is the correct pattern used by Dubbo for gRPC xDS clients
// Using FilterChain with inline RouteConfig causes the gRPC client to remain in IDLE state
hcm := &hcmv3.HttpConnectionManager{
CodecType: hcmv3.HttpConnectionManager_AUTO,
Expand All @@ -543,7 +542,7 @@ func buildOutboundListeners(node *model.Proxy, push *model.PushContext, filter l
},
}

// Build outbound listener with ApiListener (Istio pattern)
// Build outbound listener with ApiListener
// gRPC xDS clients expect ApiListener for outbound, not FilterChain
ll := &listener.Listener{
Name: fullListenerName,
Expand Down
16 changes: 1 addition & 15 deletions dubbod/planet/pkg/serviceregistry/kube/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ type controllerInterface interface {

var (
log = dubbolog.RegisterScope("controller", "kube controller debugging")
_ controllerInterface = &Controller{}
_ serviceregistry.Instance = &Controller{}
)

Expand Down Expand Up @@ -391,7 +390,7 @@ func (c *Controller) addOrUpdateService(pre, curr *v1.Service, currConv *model.S
c.opts.XDSUpdater.ServiceUpdate(shard, string(currConv.Hostname), ns, event)

// Note: Endpoint updates are handled separately by EndpointSlice events, not here.
// This matches Istio's behavior where service changes don't immediately update endpoints.
// service changes don't immediately update endpoints.
// EndpointSlice events will trigger EDSUpdate (with logPushType=true) which will properly
// log "Full push, new service" when a new endpoint shard is created.

Expand Down Expand Up @@ -546,16 +545,3 @@ func (c *Controller) servicesForNamespacedName(name types.NamespacedName) []*mod
}
return nil
}

func (c *Controller) Network(endpointIP string, labels labels.Instance) network.ID {
// 1. check the pod/workloadEntry label
if nw := labels["topology.dubbo.apache.org/network"]; nw != "" {
return network.ID(nw)
}
// 2. check the system namespace labels
if nw := c.networkFromSystemNamespace(); nw != "" {
return nw
}

return ""
}
Loading
Loading