Skip to content

Commit 3595362

Browse files
authored
feat: support enabling webook listener (#931)
Signed-off-by: arkbriar <arkbriar@gmail.com>
1 parent 7dbe57a commit 3595362

12 files changed

+281
-12
lines changed

apis/risingwave/v1alpha1/risingwave_types.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,12 @@ type RisingWaveSpec struct {
114114
// with their IP addresses. This is useful when one wants to avoid the DNS resolution overhead and latency.
115115
EnableAdvertisingWithIP *bool `json:"enableAdvertisingWithIP,omitempty"`
116116

117+
// Flag to control whether to enable the webhook listener. If enabled, the webhook listener will be started
118+
// in the frontend nodes to receive the webhook events from external systems, e.g., GitHub.
119+
// +optional
120+
// +kubebuilder:default=false
121+
EnableWebhookListener *bool `json:"enableWebhookListener,omitempty"`
122+
117123
// Image for RisingWave component.
118124
Image string `json:"image"`
119125

apis/risingwave/v1alpha1/zz_generated.deepcopy.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

config/crd/bases/risingwave.risingwavelabs.com_risingwaves.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31648,6 +31648,12 @@ spec:
3164831648
Flag to control whether to deploy in standalone mode or distributed mode. If standalone mode is used,
3164931649
spec.components will be ignored. Standalone mode can be turned on/off dynamically.
3165031650
type: boolean
31651+
enableWebhookListener:
31652+
default: false
31653+
description: |-
31654+
Flag to control whether to enable the webhook listener. If enabled, the webhook listener will be started
31655+
in the frontend nodes to receive the webhook events from external systems, e.g., GitHub.
31656+
type: boolean
3165131657
frontendServiceType:
3165231658
default: ClusterIP
3165331659
description: FrontendServiceType determines the service type of the

config/risingwave-operator-test.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31665,6 +31665,12 @@ spec:
3166531665
Flag to control whether to deploy in standalone mode or distributed mode. If standalone mode is used,
3166631666
spec.components will be ignored. Standalone mode can be turned on/off dynamically.
3166731667
type: boolean
31668+
enableWebhookListener:
31669+
default: false
31670+
description: |-
31671+
Flag to control whether to enable the webhook listener. If enabled, the webhook listener will be started
31672+
in the frontend nodes to receive the webhook events from external systems, e.g., GitHub.
31673+
type: boolean
3166831674
frontendServiceType:
3166931675
default: ClusterIP
3167031676
description: FrontendServiceType determines the service type of the

config/risingwave-operator.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31665,6 +31665,12 @@ spec:
3166531665
Flag to control whether to deploy in standalone mode or distributed mode. If standalone mode is used,
3166631666
spec.components will be ignored. Standalone mode can be turned on/off dynamically.
3166731667
type: boolean
31668+
enableWebhookListener:
31669+
default: false
31670+
description: |-
31671+
Flag to control whether to enable the webhook listener. If enabled, the webhook listener will be started
31672+
in the frontend nodes to receive the webhook events from external systems, e.g., GitHub.
31673+
type: boolean
3166831674
frontendServiceType:
3166931675
default: ClusterIP
3167031676
description: FrontendServiceType determines the service type of the

docs/general/api.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,19 @@ with their IP addresses. This is useful when one wants to avoid the DNS resoluti
628628
</tr>
629629
<tr>
630630
<td>
631+
<code>enableWebhookListener</code><br/>
632+
<em>
633+
bool
634+
</em>
635+
</td>
636+
<td>
637+
<em>(Optional)</em>
638+
<p>Flag to control whether to enable the webhook listener. If enabled, the webhook listener will be started
639+
in the frontend nodes to receive the webhook events from external systems, e.g., GitHub.</p>
640+
</td>
641+
</tr>
642+
<tr>
643+
<td>
631644
<code>image</code><br/>
632645
<em>
633646
string
@@ -4761,6 +4774,19 @@ with their IP addresses. This is useful when one wants to avoid the DNS resoluti
47614774
</tr>
47624775
<tr>
47634776
<td>
4777+
<code>enableWebhookListener</code><br/>
4778+
<em>
4779+
bool
4780+
</em>
4781+
</td>
4782+
<td>
4783+
<em>(Optional)</em>
4784+
<p>Flag to control whether to enable the webhook listener. If enabled, the webhook listener will be started
4785+
in the frontend nodes to receive the webhook events from external systems, e.g., GitHub.</p>
4786+
</td>
4787+
</tr>
4788+
<tr>
4789+
<td>
47644790
<code>image</code><br/>
47654791
<em>
47664792
string

pkg/consts/consts.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ const (
114114
// Port names of components.
115115
const (
116116
PortService string = "service"
117+
PortHTTP string = "http"
117118
PortMetrics string = "metrics"
118119
PortDashboard string = "dashboard"
119120
)
@@ -127,6 +128,7 @@ const (
127128
ComputeMetricsPort int32 = 1222
128129
FrontendServicePort int32 = 4567
129130
FrontendMetricsPort int32 = 8080
131+
FrontendWebhookPort int32 = 4560
130132
CompactorServicePort int32 = 6660
131133
CompactorMetricsPort int32 = 1260
132134
)

pkg/factory/envs/risingwave.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ const (
5555
RWLicenseKeyPath = "RW_LICENSE_KEY_PATH"
5656
RWSecretStorePrivateKeyHex = "RW_SECRET_STORE_PRIVATE_KEY_HEX"
5757
RWResourceGroup = "RW_RESOURCE_GROUP"
58+
RWWebhookListenAddr = "RW_WEBHOOK_LISTEN_ADDR"
5859
)
5960

6061
// MinIO.

pkg/factory/risingwave_object_factory.go

Lines changed: 62 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -872,6 +872,10 @@ func (f *RisingWaveObjectFactory) envsForTLS() []corev1.EnvVar {
872872
return nil
873873
}
874874

875+
func (f *RisingWaveObjectFactory) isWebhookListenerEnabled() bool {
876+
return ptr.Deref(f.risingwave.Spec.EnableWebhookListener, false)
877+
}
878+
875879
func (f *RisingWaveObjectFactory) envsForFrontendArgs() []corev1.EnvVar {
876880
envVars := []corev1.EnvVar{
877881
{
@@ -900,6 +904,13 @@ func (f *RisingWaveObjectFactory) envsForFrontendArgs() []corev1.EnvVar {
900904
},
901905
}
902906

907+
if f.isWebhookListenerEnabled() {
908+
envVars = append(envVars, corev1.EnvVar{
909+
Name: envs.RWWebhookListenAddr,
910+
Value: fmt.Sprintf("0.0.0.0:%d", consts.FrontendWebhookPort),
911+
})
912+
}
913+
903914
return append(envVars, f.envsForTLS()...)
904915
}
905916

@@ -1912,7 +1923,7 @@ func newWorkloadObjectForComponentNodeGroup[T client.Object](f *RisingWaveObject
19121923
}
19131924

19141925
func (f *RisingWaveObjectFactory) portsForFrontendContainer() []corev1.ContainerPort {
1915-
return []corev1.ContainerPort{
1926+
ports := []corev1.ContainerPort{
19161927
{
19171928
Name: consts.PortService,
19181929
Protocol: corev1.ProtocolTCP,
@@ -1924,6 +1935,16 @@ func (f *RisingWaveObjectFactory) portsForFrontendContainer() []corev1.Container
19241935
ContainerPort: consts.FrontendMetricsPort,
19251936
},
19261937
}
1938+
1939+
if f.isWebhookListenerEnabled() {
1940+
ports = append(ports, corev1.ContainerPort{
1941+
Name: consts.PortHTTP,
1942+
Protocol: corev1.ProtocolTCP,
1943+
ContainerPort: consts.FrontendWebhookPort,
1944+
})
1945+
}
1946+
1947+
return ports
19271948
}
19281949

19291950
func (f *RisingWaveObjectFactory) isEmbeddedServingModeEnabled() bool {
@@ -2142,8 +2163,7 @@ func (f *RisingWaveObjectFactory) argsForStandaloneV2() []string {
21422163
}
21432164

21442165
func (f *RisingWaveObjectFactory) portsForStandaloneContainer() []corev1.ContainerPort {
2145-
// TODO: either expose each metrics port, or combine them into one.
2146-
return []corev1.ContainerPort{
2166+
ports := []corev1.ContainerPort{
21472167
{
21482168
Name: consts.PortMetrics,
21492169
Protocol: corev1.ProtocolTCP,
@@ -2160,6 +2180,16 @@ func (f *RisingWaveObjectFactory) portsForStandaloneContainer() []corev1.Contain
21602180
ContainerPort: consts.MetaDashboardPort,
21612181
},
21622182
}
2183+
2184+
if f.isWebhookListenerEnabled() {
2185+
ports = append(ports, corev1.ContainerPort{
2186+
Name: consts.PortHTTP,
2187+
Protocol: corev1.ProtocolTCP,
2188+
ContainerPort: consts.FrontendWebhookPort,
2189+
})
2190+
}
2191+
2192+
return ports
21632193
}
21642194

21652195
func (f *RisingWaveObjectFactory) setupStandaloneContainer(podSpec *corev1.PodSpec, container *corev1.Container) {
@@ -2223,6 +2253,13 @@ func (f *RisingWaveObjectFactory) setupStandaloneContainer(podSpec *corev1.PodSp
22232253
func(a, b *corev1.EnvVar) bool { return a.Name == b.Name })
22242254
}
22252255

2256+
if f.isWebhookListenerEnabled() {
2257+
container.Env = append(container.Env, corev1.EnvVar{
2258+
Name: envs.RWWebhookListenAddr,
2259+
Value: fmt.Sprintf("0.0.0.0:%d", consts.FrontendWebhookPort),
2260+
})
2261+
}
2262+
22262263
if vol, volMount := f.volumeAndVolumeMountForTLS(); vol != nil && volMount != nil {
22272264
// Add or override the volume and volume mount for TLS.
22282265
podSpec.Volumes = mergeListWhenKeyEquals(podSpec.Volumes, *vol, func(a, b *corev1.Volume) bool {
@@ -2280,7 +2317,7 @@ func (f *RisingWaveObjectFactory) NewStandaloneAdvancedStatefulSet() *kruiseapps
22802317

22812318
// NewStandaloneService creates a Service for standalone component.
22822319
func (f *RisingWaveObjectFactory) NewStandaloneService() *corev1.Service {
2283-
standaloneSvc := f.newService(consts.ComponentStandalone, corev1.ServiceTypeClusterIP, []corev1.ServicePort{
2320+
svcPorts := []corev1.ServicePort{
22842321
{
22852322
Name: consts.PortService,
22862323
Protocol: corev1.ProtocolTCP,
@@ -2299,7 +2336,18 @@ func (f *RisingWaveObjectFactory) NewStandaloneService() *corev1.Service {
22992336
Port: consts.MetaDashboardPort,
23002337
TargetPort: intstr.FromString(consts.PortDashboard),
23012338
},
2302-
})
2339+
}
2340+
2341+
if f.isWebhookListenerEnabled() {
2342+
svcPorts = append(svcPorts, corev1.ServicePort{
2343+
Name: consts.PortHTTP,
2344+
Protocol: corev1.ProtocolTCP,
2345+
Port: consts.FrontendWebhookPort,
2346+
TargetPort: intstr.FromString(consts.PortHTTP),
2347+
})
2348+
}
2349+
2350+
standaloneSvc := f.newService(consts.ComponentStandalone, corev1.ServiceTypeClusterIP, svcPorts)
23032351

23042352
return mustSetControllerReference(f.risingwave, standaloneSvc, f.scheme)
23052353
}
@@ -2355,6 +2403,15 @@ func (f *RisingWaveObjectFactory) NewFrontendService() *corev1.Service {
23552403
})
23562404
}
23572405

2406+
if f.isWebhookListenerEnabled() {
2407+
svcPorts = append(svcPorts, corev1.ServicePort{
2408+
Name: consts.PortHTTP,
2409+
Protocol: corev1.ProtocolTCP,
2410+
Port: consts.FrontendWebhookPort,
2411+
TargetPort: intstr.FromString(consts.PortHTTP),
2412+
})
2413+
}
2414+
23582415
frontendSvc := f.newService(consts.ComponentFrontend, f.risingwave.Spec.FrontendServiceType, svcPorts)
23592416

23602417
// Hijack selector if it's standalone mode.

pkg/factory/risingwave_object_factory_predicate_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1290,3 +1290,58 @@ func secretStorePredicates() []predicate[*corev1.PodTemplateSpec, secretStoreTes
12901290
},
12911291
}
12921292
}
1293+
1294+
func webhookPredicates() []predicate[*corev1.PodTemplateSpec, webhookTestcase] {
1295+
return []predicate[*corev1.PodTemplateSpec, webhookTestcase]{
1296+
{
1297+
Name: "env-contains",
1298+
Fn: func(podTemplate *corev1.PodTemplateSpec, tc webhookTestcase) bool {
1299+
if len(podTemplate.Spec.Containers) == 0 {
1300+
return false
1301+
}
1302+
return listContainsByKey(podTemplate.Spec.Containers[0].Env, tc.envs,
1303+
func(t *corev1.EnvVar) string { return t.Name }, deepEqual[corev1.EnvVar])
1304+
},
1305+
},
1306+
{
1307+
Name: "port-contains",
1308+
Fn: func(podTemplate *corev1.PodTemplateSpec, tc webhookTestcase) bool {
1309+
if len(podTemplate.Spec.Containers) == 0 {
1310+
return false
1311+
}
1312+
return listContainsByKey(podTemplate.Spec.Containers[0].Ports, tc.containerPorts,
1313+
func(t *corev1.ContainerPort) string { return t.Name }, deepEqual[corev1.ContainerPort])
1314+
},
1315+
},
1316+
{
1317+
Name: "env-not-contains",
1318+
Fn: func(podTemplate *corev1.PodTemplateSpec, tc webhookTestcase) bool {
1319+
if len(tc.unexpectedEnvs) == 0 {
1320+
return true
1321+
}
1322+
if len(podTemplate.Spec.Containers) == 0 {
1323+
return false
1324+
}
1325+
// Contains none of unexpected envs.
1326+
return !lo.ContainsBy(podTemplate.Spec.Containers[0].Env, func(item corev1.EnvVar) bool {
1327+
return lo.Contains(tc.unexpectedEnvs, item.Name)
1328+
})
1329+
},
1330+
},
1331+
{
1332+
Name: "port-not-contains",
1333+
Fn: func(podTemplate *corev1.PodTemplateSpec, tc webhookTestcase) bool {
1334+
if len(tc.unexpectedPorts) == 0 {
1335+
return true
1336+
}
1337+
if len(podTemplate.Spec.Containers) == 0 {
1338+
return false
1339+
}
1340+
// Contains none of the unexpected ports.
1341+
return !lo.ContainsBy(podTemplate.Spec.Containers[0].Ports, func(item corev1.ContainerPort) bool {
1342+
return lo.Contains(tc.unexpectedPorts, item.ContainerPort)
1343+
})
1344+
},
1345+
},
1346+
}
1347+
}

0 commit comments

Comments
 (0)