Skip to content

Commit f8172da

Browse files
Add ProcessGuarantee to WindowConfig (#792)
* Add `ProcessGuarantee` to WindowConfig * Fix error * Fix ci
1 parent 3c92372 commit f8172da

File tree

8 files changed

+82
-12
lines changed

8 files changed

+82
-12
lines changed

.ci/helm.sh

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,10 +337,22 @@ function ci::verify_crypto_function() {
337337
function ci::send_test_data() {
338338
inputtopic=$1
339339
inputmessage=$2
340-
kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-client produce -m "${inputmessage}" -n 100 "${inputtopic}"
340+
count=$3
341+
kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-client produce -m "${inputmessage}" -n $count "${inputtopic}"
341342
return 0
342343
}
343344

345+
function ci::verify_backlog() {
346+
topic=$1
347+
sub=$2
348+
expected=$3
349+
BACKLOG=$(kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin topics stats $topic | grep msgBacklog)
350+
if [[ "$BACKLOG" == *"\"msgBacklog\" : $expected"* ]]; then
351+
return 0
352+
fi
353+
return 1
354+
}
355+
344356
function ci::verify_exclamation_function() {
345357
inputtopic=$1
346358
outputtopic=$2

.ci/tests/integration/cases/logging-window-function/manifests.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ spec:
3636
windowConfig:
3737
windowLengthCount: 10
3838
slidingIntervalCount: 5
39+
processingGuarantee: ATLEAST_ONCE
40+
# the processingGuarantee should be manual for window function
41+
# see: https://github.com/apache/pulsar/pull/16279/files#diff-c77c024ccb31c94a7aa80cb8e96d7e370709157bdc104a1be7867fb6c7aa0586R318-R319
42+
processingGuarantee: manual
3943
subscriptionPosition: earliest
4044
---
4145
apiVersion: v1

.ci/tests/integration/cases/logging-window-function/verify.sh

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,46 @@ if [ $? -ne 0 ]; then
4343
exit 1
4444
fi
4545

46-
verify_java_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::send_test_data "persistent://public/default/window-function-input-topic" "test-message" 2>&1)
46+
verify_java_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::send_test_data "persistent://public/default/window-function-input-topic" "test-message" 3 2>&1)
4747
if [ $? -ne 0 ]; then
4848
echo "$verify_java_result"
4949
kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true
5050
exit 1
5151
fi
5252

53+
sleep 3
54+
55+
# the 3 messages will not be processed, so backlog should be 3
56+
verify_backlog_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_backlog "persistent://public/default/window-function-input-topic" "public/default/window-function-sample" 3 2>&1)
57+
if [ $? -ne 0 ]; then
58+
echo "$verify_backlog_result"
59+
kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true
60+
exit 1
61+
fi
62+
63+
# it will fire the window with first 5 messages when get the 5th message, and then fire again with 10 messages when get 10th message
64+
verify_java_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::send_test_data "persistent://public/default/window-function-input-topic" "test-message" 7 2>&1)
65+
if [ $? -ne 0 ]; then
66+
echo "$verify_java_result"
67+
kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true
68+
exit 1
69+
fi
70+
71+
# there is a bug in upstream that messages don't get ack if the function return null
72+
# should be fixed by: https://github.com/apache/pulsar/pull/23618
73+
#sleep 3
74+
#
75+
#verify_backlog_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_backlog "persistent://public/default/window-function-input-topic" "public/default/window-function-sample" 0 2>&1)
76+
#if [ $? -ne 0 ]; then
77+
# echo "$verify_backlog_result"
78+
# kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true
79+
# exit 1
80+
#fi
81+
5382
verify_log_result=$(kubectl logs -l compute.functionmesh.io/name=window-function-sample --tail=-1 | grep -e "-window-log" | wc -l)
5483
if [ $verify_log_result -ne 0 ]; then
5584
sub_name=$(echo $RANDOM | md5sum | head -c 20; echo;)
56-
verify_log_topic_result=$(kubectl exec -n ${PULSAR_NAMESPACE} ${PULSAR_RELEASE_NAME}-pulsar-broker-0 -- bin/pulsar-client consume -n 10 -s $sub_name --subscription-position Earliest "persistent://public/default/window-function-logs" | grep -e "-window-log" | wc -l)
85+
verify_log_topic_result=$(kubectl exec -n ${PULSAR_NAMESPACE} ${PULSAR_RELEASE_NAME}-pulsar-broker-0 -- bin/pulsar-client consume -n 15 -s $sub_name --subscription-position Earliest "persistent://public/default/window-function-logs" | grep -e "-window-log" | wc -l)
5786
if [ $verify_log_topic_result -ne 0 ]; then
5887
echo "e2e-test: ok" | yq eval -
5988
else

api/compute/v1alpha1/common.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,10 @@ const (
407407
Manual ProcessGuarantee = "manual"
408408
)
409409

410+
// WindowProcessGuarantee enum type
411+
// +kubebuilder:validation:Enum=ATLEAST_ONCE;ATMOST_ONCE
412+
type WindowProcessGuarantee string
413+
410414
// LogTopicAgent enum type
411415
// +kubebuilder:validation:Enum=runtime;sidecar
412416
type LogTopicAgent string
@@ -533,15 +537,16 @@ type LogConfig struct {
533537
}
534538

535539
type WindowConfig struct {
536-
ActualWindowFunctionClassName string `json:"actualWindowFunctionClassName"`
537-
WindowLengthCount *int32 `json:"windowLengthCount,omitempty"`
538-
WindowLengthDurationMs *int64 `json:"windowLengthDurationMs,omitempty"`
539-
SlidingIntervalCount *int32 `json:"slidingIntervalCount,omitempty"`
540-
SlidingIntervalDurationMs *int64 `json:"slidingIntervalDurationMs,omitempty"`
541-
LateDataTopic string `json:"lateDataTopic,omitempty"`
542-
MaxLagMs *int64 `json:"maxLagMs,omitempty"`
543-
WatermarkEmitIntervalMs *int64 `json:"watermarkEmitIntervalMs,omitempty"`
544-
TimestampExtractorClassName *string `json:"timestampExtractorClassName,omitempty"`
540+
ActualWindowFunctionClassName string `json:"actualWindowFunctionClassName"`
541+
WindowLengthCount *int32 `json:"windowLengthCount,omitempty"`
542+
WindowLengthDurationMs *int64 `json:"windowLengthDurationMs,omitempty"`
543+
SlidingIntervalCount *int32 `json:"slidingIntervalCount,omitempty"`
544+
SlidingIntervalDurationMs *int64 `json:"slidingIntervalDurationMs,omitempty"`
545+
LateDataTopic string `json:"lateDataTopic,omitempty"`
546+
MaxLagMs *int64 `json:"maxLagMs,omitempty"`
547+
WatermarkEmitIntervalMs *int64 `json:"watermarkEmitIntervalMs,omitempty"`
548+
TimestampExtractorClassName *string `json:"timestampExtractorClassName,omitempty"`
549+
ProcessingGuarantee WindowProcessGuarantee `json:"processingGuarantee,omitempty"`
545550
}
546551

547552
type VPASpec struct {

charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3786,6 +3786,11 @@ spec:
37863786
maxLagMs:
37873787
format: int64
37883788
type: integer
3789+
processingGuarantee:
3790+
enum:
3791+
- ATLEAST_ONCE
3792+
- ATMOST_ONCE
3793+
type: string
37893794
slidingIntervalCount:
37903795
format: int32
37913796
type: integer

charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3805,6 +3805,11 @@ spec:
38053805
maxLagMs:
38063806
format: int64
38073807
type: integer
3808+
processingGuarantee:
3809+
enum:
3810+
- ATLEAST_ONCE
3811+
- ATMOST_ONCE
3812+
type: string
38083813
slidingIntervalCount:
38093814
format: int32
38103815
type: integer

config/crd/bases/compute.functionmesh.io_functionmeshes.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3786,6 +3786,11 @@ spec:
37863786
maxLagMs:
37873787
format: int64
37883788
type: integer
3789+
processingGuarantee:
3790+
enum:
3791+
- ATLEAST_ONCE
3792+
- ATMOST_ONCE
3793+
type: string
37893794
slidingIntervalCount:
37903795
format: int32
37913796
type: integer

config/crd/bases/compute.functionmesh.io_functions.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3783,6 +3783,11 @@ spec:
37833783
maxLagMs:
37843784
format: int64
37853785
type: integer
3786+
processingGuarantee:
3787+
enum:
3788+
- ATLEAST_ONCE
3789+
- ATMOST_ONCE
3790+
type: string
37863791
slidingIntervalCount:
37873792
format: int32
37883793
type: integer

0 commit comments

Comments
 (0)