Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 46 additions & 23 deletions src/operator/controllers/webhook_traffic/network_policy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/otterize/intents-operator/src/shared/serviceidresolver"
"github.com/otterize/intents-operator/src/shared/serviceidresolver/serviceidentity"
"github.com/samber/lo"
"github.com/sirupsen/logrus"
"golang.org/x/exp/slices"
admissionv1 "k8s.io/api/admissionregistration/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -33,6 +34,7 @@ const (
ReasonPatchingWebhookTrafficNetpol = "PatchingNetworkPolicyForWebhook"
ReasonPatchingWebhookTrafficNetpolFailed = "PatchingNetworkPolicyForWebhookFailed"
ReasonPatchingWebhookTrafficNetpolSuccess = "PatchingNetworkPolicyForWebhookSucceeded"
ReasonWebhookPortNotFoundOnServiceError = "WebhookPortNotFoundOnServiceError"
)

type WebhookClientConfigurationWithMeta struct {
Expand Down Expand Up @@ -168,7 +170,7 @@ func (n *NetworkPolicyHandler) reduceWebhooksNetpols(ctx context.Context, webhoo
// At this point we want to create the network policy, because the configuration is either set to "always" or
// that it is set to "if blocked by otterize" and the service is blocked by otterize
// TODO: do we also need to create netpol for Otterize?
netpol, err := n.buildNetworkPolicy(ctx, webhookClientConfig.webhookName, service)
netpol, err := n.buildNetworkPolicy(ctx, webhookClientConfig.webhookName, webhookClientConfig.clientConfiguration.Service, service)
if err != nil {
return make(NetworkPolicyWithMetaByName), errors.Wrap(err)
}
Expand All @@ -192,6 +194,9 @@ func (n *NetworkPolicyHandler) reduceWebhooksNetpols(ctx context.Context, webhoo
func (n *NetworkPolicyHandler) isServiceBlockedByOtterize(ctx context.Context, service *corev1.Service) (bool, error) {
endpoints := &corev1.Endpoints{}
err := n.client.Get(ctx, types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, endpoints)
if err != nil && k8serrors.IsNotFound(err) {
return false, nil
}
if err != nil {
return false, errors.Wrap(err)
}
Expand All @@ -200,10 +205,8 @@ func (n *NetworkPolicyHandler) isServiceBlockedByOtterize(ctx context.Context, s

for _, address := range endpointsAddresses {
pod, err := n.getAffectedPod(ctx, address)
if k8sErr := &(k8serrors.StatusError{}); errors.As(err, &k8sErr) {
if k8serrors.IsNotFound(k8sErr) {
continue
}
if k8serrors.IsNotFound(err) {
continue
}

if err != nil {
Expand Down Expand Up @@ -307,7 +310,7 @@ func (n *NetworkPolicyHandler) getWebhookService(ctx context.Context, webhookSer
return service, true, nil
}

func (n *NetworkPolicyHandler) buildNetworkPolicy(ctx context.Context, webhookName string, service *corev1.Service) (v1.NetworkPolicy, error) {
func (n *NetworkPolicyHandler) buildNetworkPolicy(ctx context.Context, webhookName string, webhookService *admissionv1.ServiceReference, service *corev1.Service) (v1.NetworkPolicy, error) {
policyName := fmt.Sprintf("webhook-%s-access-to-%s", strings.ToLower(webhookName), strings.ToLower(service.Name))
rule := v1.NetworkPolicyIngressRule{}

Expand Down Expand Up @@ -352,23 +355,43 @@ func (n *NetworkPolicyHandler) buildNetworkPolicy(ctx context.Context, webhookNa
},
}

netpolPorts := make([]v1.NetworkPolicyPort, 0, 2*len(service.Spec.Ports))

for _, servicePort := range service.Spec.Ports {
// Add the port
netpolPorts = append(netpolPorts,
v1.NetworkPolicyPort{
Port: lo.ToPtr(intstr.IntOrString{IntVal: servicePort.Port, Type: intstr.Int}),
Protocol: lo.ToPtr(servicePort.Protocol),
})
// Add the target port
if servicePort.TargetPort.IntVal != 0 || servicePort.TargetPort.StrVal != "" {
netpolPorts = append(netpolPorts,
v1.NetworkPolicyPort{
Port: lo.ToPtr(servicePort.TargetPort),
Protocol: lo.ToPtr(servicePort.Protocol),
})
}
netpolPorts := make([]v1.NetworkPolicyPort, 0, 2)
if webhookService.Port != nil {
// If the webhook defines a port - lets use it
netpolPorts = append(netpolPorts, v1.NetworkPolicyPort{
Port: lo.ToPtr(intstr.IntOrString{IntVal: *webhookService.Port, Type: intstr.Int}),
Protocol: lo.ToPtr(corev1.ProtocolTCP),
})
} else {
// Otherwise, use the defaukt webhook port - 443
netpolPorts = append(netpolPorts, v1.NetworkPolicyPort{
Port: lo.ToPtr(intstr.IntOrString{IntVal: 443, Type: intstr.Int}),
Protocol: lo.ToPtr(corev1.ProtocolTCP),
})
}

// Find the webhook's-service's-port in the service
servicePort, servicePortFound := lo.Find(service.Spec.Ports, func(item corev1.ServicePort) bool {
return item.Port == netpolPorts[0].Port.IntVal
})

if !servicePortFound {
logrus.WithFields(logrus.Fields{
"WebhookName": webhookName,
"ServiceName": service.Name,
"ServiceNamespace": service.Namespace,
"ServicePort": netpolPorts[0].Port.IntVal,
}).Warning("Webhook service port not found on service resource")
n.RecordWarningEventf(service, ReasonWebhookPortNotFoundOnServiceError, "Webhook %s is defined to work on port %d", webhookName, netpolPorts[0].Port.IntVal)
return v1.NetworkPolicy{}, errors.New("Webhook port was not found on service")
}

// Add the service's target port if exists
if servicePort.TargetPort.IntVal != 0 || servicePort.TargetPort.StrVal != "" {
netpolPorts = append(netpolPorts, v1.NetworkPolicyPort{
Port: lo.ToPtr(servicePort.TargetPort),
Protocol: lo.ToPtr(servicePort.Protocol),
})
}

newPolicy.Spec.Ingress[0].Ports = netpolPorts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,101 @@ func (s *NetworkPolicyHandlerTestSuite) TestNetworkPolicyHandler_HandleAlways_Se
s.ExpectEvent(ReasonCreatingWebhookTrafficNetpolSuccess)
}

func (s *NetworkPolicyHandlerTestSuite) TestNetworkPolicyHandler_HandleAlways_ServiceHasNoTargetPortThanPort_CreatePolicy() {
s.handler = NewNetworkPolicyHandler(s.Client, &runtime.Scheme{}, automate_third_party_network_policy.Always, 32, false)
s.handler.InjectRecorder(s.Recorder)

s.webhookService = &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: TestServiceName,
Namespace: TestNamespace,
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{
"Taylor": "Swift",
},
Ports: []corev1.ServicePort{
{
Port: TestServicePort,
Protocol: corev1.ProtocolTCP,
},
},
},
}

s.mockForReturningValidatingWebhook()
s.mockReturningWebhookService()
//s.mockServiceIsBlockedByOtterize(make([]v1.NetworkPolicy, 0))
s.mockGetControlPlaneIPs()
s.mockGetExistingOtterizeWebhooksNetpols([]v1.NetworkPolicy{})

netpolMatcher := NewNetworkPolicyMatcher([]int32{TestServicePort}, s.handler.allowAllIncomingTraffic, nil)
s.Client.EXPECT().Create(gomock.Any(), gomock.All(netpolMatcher)).Return(nil)
err := s.handler.HandleAll(context.Background())
s.Require().NoError(err)
s.ExpectEvent(ReasonCreatingWebhookTrafficNetpol)
s.ExpectEvent(ReasonCreatingWebhookTrafficNetpolSuccess)
}

func (s *NetworkPolicyHandlerTestSuite) TestNetworkPolicyHandler_HandleAlways_WebhookDoesNotDefinePort_CreatePolicyWithDefaultPort() {
s.handler = NewNetworkPolicyHandler(s.Client, &runtime.Scheme{}, automate_third_party_network_policy.Always, 32, false)
s.handler.InjectRecorder(s.Recorder)

s.validatingWebhook = ValidatingWebhookConfiguration.DeepCopy()
s.validatingWebhook.Webhooks[0].ClientConfig.Service.Port = nil

s.webhookService = &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: TestServiceName,
Namespace: TestNamespace,
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{
"Taylor": "Swift",
},
Ports: []corev1.ServicePort{
{
Port: 443,
Protocol: corev1.ProtocolTCP,
},
},
},
}

s.mockForReturningValidatingWebhook()
s.mockReturningWebhookService()
//s.mockServiceIsBlockedByOtterize(make([]v1.NetworkPolicy, 0))
s.mockGetControlPlaneIPs()
s.mockGetExistingOtterizeWebhooksNetpols([]v1.NetworkPolicy{})

netpolMatcher := NewNetworkPolicyMatcher([]int32{443}, s.handler.allowAllIncomingTraffic, nil)
s.Client.EXPECT().Create(gomock.Any(), gomock.All(netpolMatcher)).Return(nil)
err := s.handler.HandleAll(context.Background())
s.Require().NoError(err)
s.ExpectEvent(ReasonCreatingWebhookTrafficNetpol)
s.ExpectEvent(ReasonCreatingWebhookTrafficNetpolSuccess)
}

func (s *NetworkPolicyHandlerTestSuite) TestNetworkPolicyHandler_HandleAlways_WebhookPortNotFoundOnService_ExpectErrors() {
s.handler = NewNetworkPolicyHandler(s.Client, &runtime.Scheme{}, automate_third_party_network_policy.Always, 32, false)
s.handler.InjectRecorder(s.Recorder)

s.validatingWebhook = ValidatingWebhookConfiguration.DeepCopy()
s.validatingWebhook.Webhooks[0].ClientConfig.Service.Port = nil

s.mockForReturningValidatingWebhook()
s.mockReturningWebhookService()
//s.mockServiceIsBlockedByOtterize(make([]v1.NetworkPolicy, 0))
s.mockGetControlPlaneIPs()
//s.mockGetExistingOtterizeWebhooksNetpols([]v1.NetworkPolicy{})

//netpolMatcher := NewNetworkPolicyMatcher([]int32{443}, s.handler.allowAllIncomingTraffic, nil)
//s.Client.EXPECT().Create(gomock.Any(), gomock.All(netpolMatcher)).Return(nil)
err := s.handler.HandleAll(context.Background())
s.Require().Error(err, "Webhook port was not found on service")
s.ExpectEvent(ReasonWebhookPortNotFoundOnServiceError)
}

func (s *NetworkPolicyHandlerTestSuite) TestNetworkPolicyHandler_HandleAlways_WebhookNameTooLong_CreatePolicy() {
s.handler = NewNetworkPolicyHandler(s.Client, &runtime.Scheme{}, automate_third_party_network_policy.Always, 32, false)
s.handler.InjectRecorder(s.Recorder)
Expand All @@ -427,6 +522,46 @@ func (s *NetworkPolicyHandlerTestSuite) TestNetworkPolicyHandler_HandleAlways_We
s.ExpectEvent(ReasonCreatingWebhookTrafficNetpolSuccess)
}

func (s *NetworkPolicyHandlerTestSuite) TestNetworkPolicyHandler_HandleIfBlockedByOtterize_ServiceIsBlockedByOtterize_TwoWebhooksToSameServiceDifferentPorts_CreatingOneWebhookPolicy() {
secondPort := int32(1432)
s.validatingWebhook.Webhooks = append(s.validatingWebhook.Webhooks,
admissionv1.ValidatingWebhook{
Name: "Second",
ClientConfig: admissionv1.WebhookClientConfig{
Service: &admissionv1.ServiceReference{
Name: TestServiceName,
Namespace: TestNamespace,
Port: lo.ToPtr(secondPort),
},
},
})
s.webhookService.Spec.Ports = append(s.webhookService.Spec.Ports, corev1.ServicePort{
Port: secondPort,
Protocol: corev1.ProtocolTCP,
})

s.mockForReturningValidatingWebhook()

// Called once for "First" webhook
s.mockReturningWebhookService()
s.mockServiceIsBlockedByOtterize(OtterizeIngressNetpols)
s.mockGetControlPlaneIPs()

// Called second time for "Second"" webhook
s.mockReturningWebhookService()
s.mockServiceIsBlockedByOtterize(OtterizeIngressNetpols)
s.mockGetControlPlaneIPs()

s.mockGetExistingOtterizeWebhooksNetpols([]v1.NetworkPolicy{})

netpolMatcher := NewNetworkPolicyMatcher([]int32{secondPort, TestServicePort}, s.handler.allowAllIncomingTraffic, nil)
s.Client.EXPECT().Create(gomock.Any(), gomock.All(netpolMatcher)).Return(nil)
err := s.handler.HandleAll(context.Background())
s.Require().NoError(err)
s.ExpectEvent(ReasonCreatingWebhookTrafficNetpol)
s.ExpectEvent(ReasonCreatingWebhookTrafficNetpolSuccess)
}

func (s *NetworkPolicyHandlerTestSuite) mockForReturningValidatingWebhook() {
s.Client.EXPECT().List(
gomock.Any(), gomock.Eq(&admissionv1.ValidatingWebhookConfigurationList{}),
Expand Down
3 changes: 1 addition & 2 deletions src/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ import (
"github.com/otterize/intents-operator/src/shared/k8sconf"
"github.com/otterize/intents-operator/src/shared/operator_cloud_client"
"github.com/otterize/intents-operator/src/shared/operatorconfig"
"github.com/otterize/intents-operator/src/shared/operatorconfig/automate_third_party_network_policy"
"github.com/otterize/intents-operator/src/shared/operatorconfig/enforcement"
"github.com/otterize/intents-operator/src/shared/reconcilergroup"
"github.com/otterize/intents-operator/src/shared/serviceidresolver"
Expand Down Expand Up @@ -240,7 +239,7 @@ func main() {

webhooksTrafficNetworkHandler := webhook_traffic.NewNetworkPolicyHandler(mgr.GetClient(),
scheme,
automate_third_party_network_policy.Off,
enforcementConfig.GetAutomateAllowWebhookTraffic(),
viper.GetInt(operatorconfig.ControlPlaneIPv4CidrPrefixLength),
viper.GetBool(operatorconfig.WebhookTrafficAllowAllKey))
webhookTrafficReconcilerManager := webhook_traffic.NewWebhookTrafficReconcilerManager(mgr.GetClient(), webhooksTrafficNetworkHandler)
Expand Down
15 changes: 14 additions & 1 deletion src/shared/operator_cloud_client/status_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,19 @@ func getAllowExternalTrafficConfig() graphqlclient.AllowExternalTrafficPolicy {
}
}

func getAutomateAllowWebhookTrafficConfig() graphqlclient.AutomateThirdPartyNetworkPolicy {
switch enforcement.GetConfig().AutomateAllowWebhookTraffic {
case automate_third_party_network_policy.Always:
return graphqlclient.AutomateThirdPartyNetworkPolicyAlways
case automate_third_party_network_policy.Off:
return graphqlclient.AutomateThirdPartyNetworkPolicyOff
case automate_third_party_network_policy.IfBlockedByOtterize:
return graphqlclient.AutomateThirdPartyNetworkPolicyIfBlockedByOtterize
default:
return ""
}
}

func uploadConfiguration(ctx context.Context, client CloudClient, mgr manager.Manager) {
ingressConfigIdentities := operatorconfig.GetIngressControllerServiceIdentities()
externallyManagedPolicyWorkloadIdentities := operatorconfig.GetExternallyManagedPoliciesServiceIdentities()
Expand Down Expand Up @@ -143,6 +156,7 @@ func uploadConfiguration(ctx context.Context, client CloudClient, mgr manager.Ma
AllowExternalTrafficPolicy: getAllowExternalTrafficConfig(), // The server expect for AllowExternalTrafficPolicy because of backwards compatibility
AutomateThirdPartyNetworkPolicies: getAutomateThirdPartyNetworkPoliciesConfig(),
PrometheusServerConfigs: getPrometheusServiceIdentities(),
AutomateAllowWebhookTraffic: getAutomateAllowWebhookTrafficConfig(),
}

configInput.IngressControllerConfig = lo.Map(ingressConfigIdentities, func(identity serviceidentity.ServiceIdentity, _ int) graphqlclient.IngressControllerConfigInput {
Expand All @@ -165,7 +179,6 @@ func uploadConfiguration(ctx context.Context, client CloudClient, mgr manager.Ma
configInput.AutomatedThirdPartyPolicyTypes = []graphqlclient.AutomatedThirdPartyPolicyTypes{
graphqlclient.AutomatedThirdPartyPolicyTypesExternalTraffic,
graphqlclient.AutomatedThirdPartyPolicyTypesMetricsTraffic,
graphqlclient.AutomatedThirdPartyPolicyTypesWebhookTraffic,
}

client.ReportIntentsOperatorConfiguration(timeoutCtx, configInput)
Expand Down
22 changes: 22 additions & 0 deletions src/shared/operatorconfig/enforcement/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Config struct {
EnforcedNamespaces *goset.Set[string]
ExcludedStrictModeNamespaces *goset.Set[string]
AutomateThirdPartyNetworkPolicies automate_third_party_network_policy.Enum
AutomateAllowWebhookTraffic automate_third_party_network_policy.Enum
PrometheusServiceIdentities []serviceidentity.ServiceIdentity
}

Expand All @@ -46,10 +47,29 @@ func (c Config) GetAutomateThirdPartyNetworkPolicy() automate_third_party_networ
}
}

func (c Config) GetAutomateAllowWebhookTraffic() automate_third_party_network_policy.Enum {
switch c.AutomateAllowWebhookTraffic {
case automate_third_party_network_policy.Off:
return automate_third_party_network_policy.Off
case automate_third_party_network_policy.Always:
if !c.EnforcementDefaultState {
// We don't want to create network policies for third parties when enforcement is disabled.
// However, if one uses shadow mode we can still block third party traffic to his protected services
// therefore we should return automate_third_party_network_policy.IfBlockedByOtterize
return automate_third_party_network_policy.IfBlockedByOtterize
}
return automate_third_party_network_policy.Always
default:
return automate_third_party_network_policy.IfBlockedByOtterize
}
}

const (
ActiveEnforcementNamespacesKey = "active-enforcement-namespaces" // When using the "shadow enforcement" mode, namespaces in this list will be treated as if the enforcement were active
AutomateThirdPartyNetworkPoliciesKey = "automate-third-party-network-policies" // Whether to automatically create network policies for external traffic & metrics collection traffic
AutomateThirdPartyNetworkPoliciesDefault = string(automate_third_party_network_policy.IfBlockedByOtterize)
AutomateAllowWebhookTrafficKey = "automate-allow-webhook-traffic" // Whether to automatically create network policies for webhook services
AutomateAllowWebhookTrafficDefault = string(automate_third_party_network_policy.IfBlockedByOtterize)
EnforcementDefaultStateKey = "enforcement-default-state" // Sets the default state of the If true, always enforces. If false, can be overridden using ProtectedService.
EnforcementDefaultStateDefault = true
EnableNetworkPolicyKey = "enable-network-policy-creation" // Whether to enable Intents network policy creation
Expand Down Expand Up @@ -88,6 +108,7 @@ func init() {
viper.SetDefault(EnableGCPPolicyKey, EnableGCPPolicyDefault)
viper.SetDefault(EnableAzurePolicyKey, EnableAzurePolicyDefault)
viper.SetDefault(AutomateThirdPartyNetworkPoliciesKey, AutomateThirdPartyNetworkPoliciesDefault)
viper.SetDefault(AutomateAllowWebhookTrafficKey, AutomateAllowWebhookTrafficDefault)
viper.SetDefault(EnableStrictModeIntentsKey, EnableStrictModeIntentsDefault)
}

Expand Down Expand Up @@ -121,6 +142,7 @@ func GetConfig() Config {
EnforcedNamespaces: goset.FromSlice(viper.GetStringSlice(ActiveEnforcementNamespacesKey)),
ExcludedStrictModeNamespaces: goset.FromSlice(viper.GetStringSlice(ActiveEnforcementNamespacesKey)),
AutomateThirdPartyNetworkPolicies: automate_third_party_network_policy.Enum(viper.GetString(AutomateThirdPartyNetworkPoliciesKey)),
AutomateAllowWebhookTraffic: automate_third_party_network_policy.Enum(viper.GetString(AutomateAllowWebhookTrafficKey)),
PrometheusServiceIdentities: GetPrometheusServiceIdentities(),
}
}
Expand Down
Loading
Loading