Skip to content

Commit 3d41529

Browse files
committed
introduce schema support & update source/sink apis
1 parent c8c1b97 commit 3d41529

17 files changed

+670
-322
lines changed

api/v1alpha1/common.go

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,44 @@ type SecretRef struct {
5151
Key string `json:"key,omitempty"`
5252
}
5353

54+
type SourceConf struct {
55+
Topics []string `json:"topics,omitempty"`
56+
TopicPattern string `json:"topicPattern,omitempty"`
57+
CustomSerdeSources map[string]string `json:"customSerdeSources,omitempty"`
58+
CustomSchemaSources map[string]string `json:"customSchemaSources,omitempty"`
59+
SourceSpecs map[string]ConsumerConfig `json:"sourceSpecs,omitempty"`
60+
}
61+
62+
type ConsumerConfig struct {
63+
SchemaType string `json:"schemaType,omitempty"`
64+
SerdeClassName string `json:"serdeClassname,omitempty"`
65+
IsRegexPattern bool `json:"isRegexPattern,omitempty"`
66+
SchemaProperties map[string]string `json:"schemaProperties,omitempty"`
67+
ConsumerProperties map[string]string `json:"consumerProperties,omitempty"`
68+
ReceiverQueueSize int32 `json:"receiverQueueSize,omitempty"`
69+
}
70+
71+
type SinkConf struct {
72+
Topic string `json:"topic,omitempty"`
73+
SinkSerdeClassName string `json:"sinkSerdeClassName,omitempty"`
74+
SinkSchemaType string `json:"sinkSchemaType,omitempty"`
75+
ProducerConf *ProducerConfig `json:"producerConf,omitempty"`
76+
CustomSchemaSinks map[string]string `json:"customSchemaSinks,omitempty"`
77+
}
78+
79+
type ProducerConfig struct {
80+
MaxPendingMessages int32 `json:"maxPendingMessages,omitempty"`
81+
MaxPendingMessagesAcrossPartitions int32 `json:"maxPendingMessagesAcrossPartitions,omitempty"`
82+
UseThreadLocalProducers bool `json:"useThreadLocalProducers,omitempty"`
83+
}
84+
85+
type SubscribePosition string
86+
87+
const (
88+
Latest SubscribePosition = "latest"
89+
Earliest SubscribePosition = "earliest"
90+
)
91+
5492
type Component string
5593

5694
const (
@@ -88,10 +126,12 @@ const (
88126
NoAction ReconcileAction = "NoAction"
89127
)
90128

129+
type ProcessGuarantee string
130+
91131
const (
92-
AtleastOnce string = "atleast_once"
93-
AtmostOnce string = "atmost_once"
94-
EffectivelyOnce string = "effectively_once"
132+
AtleastOnce ProcessGuarantee = "atleast_once"
133+
AtmostOnce ProcessGuarantee = "atmost_once"
134+
EffectivelyOnce ProcessGuarantee = "effectively_once"
95135

96136
DefaultTenant string = "public"
97137
DefaultCluster string = "kubernetes"

api/v1alpha1/function_types.go

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,28 +30,35 @@ type FunctionSpec struct {
3030
// Important: Run "make" to regenerate code after modifying this file
3131
Name string `json:"name,omitempty"`
3232
ClassName string `json:"className,omitempty"`
33-
Tenant string `json:"tentant,omitempty"`
33+
Tenant string `json:"tenant,omitempty"`
3434
ClusterName string `json:"clusterName,omitempty"`
3535
SourceType string `json:"sourceType,omitempty"`
3636
SinkType string `json:"sinkType,omitempty"`
3737
Replicas *int32 `json:"replicas,omitempty"`
3838
MaxReplicas *int32 `json:"maxReplicas,omitempty"` // if provided, turn on autoscaling
39-
Sources []string `json:"sources,omitempty"`
40-
Sink string `json:"sink,omitempty"`
39+
Sources SourceConf `json:"sources,omitempty"`
40+
Sink SinkConf `json:"sink,omitempty"`
4141
LogTopic string `json:"logTopic,omitempty"`
4242
FuncConfig map[string]string `json:"funcConfig,omitempty"`
4343
Resources corev1.ResourceList `json:"resources,omitempty"`
4444
SecretsMap map[string]SecretRef `json:"secretsMap,omitempty"`
4545

46-
Timeout int32 `json:"timeout,omitempty"`
47-
AutoAck *bool `json:"autoAck,omitempty"`
48-
MaxMessageRetry int32 `json:"maxMessageRetry,omitempty"`
49-
ProcessingGuarantee string `json:"processingGuarantee,omitempty"`
50-
RetainOrdering bool `json:"retainOrdering,omitempty"`
51-
RetainKeyOrdering bool `json:"retainKeyOrdering,omitempty"`
52-
DeadLetterTopic string `json:"deadLetterTopic,omitempty"`
53-
ForwardSourceMessageProperty *bool `json:"forwardSourceMessageProperty,omitempty"`
54-
MaxPendingAsyncRequests *int32 `json:"maxPendingAsyncRequests,omitempty"`
46+
Timeout int32 `json:"timeout,omitempty"`
47+
AutoAck *bool `json:"autoAck,omitempty"`
48+
MaxMessageRetry int32 `json:"maxMessageRetry,omitempty"`
49+
ProcessingGuarantee ProcessGuarantee `json:"processingGuarantee,omitempty"`
50+
RetainOrdering bool `json:"retainOrdering,omitempty"`
51+
RetainKeyOrdering bool `json:"retainKeyOrdering,omitempty"`
52+
DeadLetterTopic string `json:"deadLetterTopic,omitempty"`
53+
ForwardSourceMessageProperty *bool `json:"forwardSourceMessageProperty,omitempty"`
54+
MaxPendingAsyncRequests *int32 `json:"maxPendingAsyncRequests,omitempty"`
55+
56+
RuntimeFlags string `json:"runtimeFlags,omitempty"`
57+
SubscriptionName string `json:"subscriptionName,omitempty"`
58+
CleanupSubscription bool `json:"cleanupSubscription,omitempty"`
59+
SubscriptionPosition SubscribePosition `json:"subscriptionPosition,omitempty"`
60+
61+
// TODO: windowconfig, customRuntimeOptions?
5562

5663
Messaging `json:",inline"`
5764
Runtime `json:",inline"`

api/v1alpha1/function_webhook.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,7 @@ func (r *Function) ValidateCreate() error {
106106
}
107107
}
108108

109-
// TODO: verify topic names are valid
110-
if r.Spec.Sources == nil {
111-
return errors.New("no source topics specified")
112-
}
109+
// TODO: verify source conf
113110

114111
// TODO: allow 0 replicas, currently hpa's min value has to be 1
115112
if *r.Spec.Replicas == 0 {

api/v1alpha1/sink_types.go

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
2525
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
2626

27-
// SinkSpec defines the desired state of Sink
27+
// SinkSpec defines the desired state of Topic
2828
type SinkSpec struct {
2929
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
3030
// Important: Run "make" to regenerate code after modifying this file
@@ -36,27 +36,29 @@ type SinkSpec struct {
3636
SinkType string `json:"sinkType,omitempty"`
3737
Replicas *int32 `json:"replicas,omitempty"`
3838
MaxReplicas *int32 `json:"maxReplicas,omitempty"` // if provided, turn on autoscaling
39-
Sources []string `json:"sources,omitempty"`
40-
LogTopic string `json:"logTopic,omitempty"`
39+
Sources SourceConf `json:"sources,omitempty"`
4140
SinkConfig map[string]string `json:"sinkConfig,omitempty"`
4241
Resources corev1.ResourceList `json:"resources,omitempty"`
4342
SecretsMap map[string]SecretRef `json:"secretsMap,omitempty"`
4443

45-
Timeout int32 `json:"timeout,omitempty"`
46-
AutoAck *bool `json:"autoAck,omitempty"`
47-
MaxMessageRetry int32 `json:"maxMessageRetry,omitempty"`
48-
ProcessingGuarantee string `json:"processingGuarantee,omitempty"`
49-
RetainOrdering bool `json:"retainOrdering,omitempty"`
50-
RetainKeyOrdering bool `json:"retainKeyOrdering,omitempty"`
51-
DeadLetterTopic string `json:"deadLetterTopic,omitempty"`
52-
ForwardSourceMessageProperty *bool `json:"forwardSourceMessageProperty,omitempty"`
53-
MaxPendingAsyncRequests *int32 `json:"maxPendingAsyncRequests,omitempty"`
44+
Timeout int32 `json:"timeout,omitempty"`
45+
NegativeAckRedeliveryDelayMs int32 `json:"negativeAckRedeliveryDelayMs,omitempty"`
46+
AutoAck *bool `json:"autoAck,omitempty"`
47+
MaxMessageRetry int32 `json:"maxMessageRetry,omitempty"`
48+
ProcessingGuarantee ProcessGuarantee `json:"processingGuarantee,omitempty"`
49+
RetainOrdering bool `json:"retainOrdering,omitempty"`
50+
DeadLetterTopic string `json:"deadLetterTopic,omitempty"`
51+
52+
RuntimeFlags string `json:"runtimeFlags,omitempty"`
53+
SubscriptionName string `json:"subscriptionName,omitempty"`
54+
CleanupSubscription bool `json:"cleanupSubscription,omitempty"`
55+
SubscriptionPosition SubscribePosition `json:"subscriptionPosition,omitempty"`
5456

5557
Messaging `json:",inline"`
5658
Runtime `json:",inline"`
5759
}
5860

59-
// SinkStatus defines the observed state of Sink
61+
// SinkStatus defines the observed state of Topic
6062
type SinkStatus struct {
6163
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
6264
// Important: Run "make" to regenerate code after modifying this file
@@ -69,7 +71,7 @@ type SinkStatus struct {
6971
// +kubebuilder:subresource:status
7072
//+kubebuilder:subresource:scale:specpath=.spec.replicas,statuspath=.status.replicas,selectorpath=.status.selector
7173

72-
// Sink is the Schema for the sinks API
74+
// Topic is the Schema for the sinks API
7375
type Sink struct {
7476
metav1.TypeMeta `json:",inline"`
7577
metav1.ObjectMeta `json:"metadata,omitempty"`
@@ -80,7 +82,7 @@ type Sink struct {
8082

8183
// +kubebuilder:object:root=true
8284

83-
// SinkList contains a list of Sink
85+
// SinkList contains a list of Topic
8486
type SinkList struct {
8587
metav1.TypeMeta `json:",inline"`
8688
metav1.ListMeta `json:"metadata,omitempty"`

api/v1alpha1/sink_webhook.go

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -71,16 +71,6 @@ func (r *Sink) Default() {
7171
r.Spec.Tenant = DefaultTenant
7272
}
7373

74-
if r.Spec.MaxPendingAsyncRequests == nil {
75-
maxPending := int32(1000)
76-
r.Spec.MaxPendingAsyncRequests = &maxPending
77-
}
78-
79-
if r.Spec.ForwardSourceMessageProperty == nil {
80-
trueVal := true
81-
r.Spec.ForwardSourceMessageProperty = &trueVal
82-
}
83-
8474
if r.Spec.Resources.Cpu() == nil {
8575
r.Spec.Resources.Cpu().Set(int64(1))
8676
}
@@ -105,10 +95,7 @@ func (r *Sink) ValidateCreate() error {
10595
}
10696
}
10797

108-
// TODO: verify topic names are valid
109-
if r.Spec.Sources == nil {
110-
return errors.New("no source topics specified")
111-
}
98+
// TODO: verify inputConf
11299

113100
// TODO: allow 0 replicas, currently hpa's min value has to be 1
114101
if *r.Spec.Replicas == 0 {
@@ -135,14 +122,6 @@ func (r *Sink) ValidateCreate() error {
135122
return errors.New("dead letter topic is set but max message retry is set to infinity")
136123
}
137124

138-
if r.Spec.RetainKeyOrdering && r.Spec.ProcessingGuarantee == EffectivelyOnce {
139-
return errors.New("when effectively once processing guarantee is specified, retain Key ordering cannot be set")
140-
}
141-
142-
if r.Spec.RetainKeyOrdering && r.Spec.RetainOrdering {
143-
return errors.New("only one of retain ordering or retain key ordering can be set")
144-
}
145-
146125
if r.Spec.Java == nil && r.Spec.Python == nil && r.Spec.Golang == nil {
147126
return errors.New("must specify a runtime from java, python or golang")
148127
}

api/v1alpha1/source_types.go

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -28,30 +28,20 @@ import (
2828
type SourceSpec struct {
2929
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
3030
// Important: Run "make" to regenerate code after modifying this file
31-
Name string `json:"name,omitempty"`
32-
ClassName string `json:"className,omitempty"`
33-
Tenant string `json:"tentant,omitempty"`
34-
ClusterName string `json:"clusterName,omitempty"`
35-
SourceType string `json:"sourceType,omitempty"`
36-
SinkType string `json:"sinkType,omitempty"`
37-
Replicas *int32 `json:"replicas,omitempty"`
38-
MaxReplicas *int32 `json:"maxReplicas,omitempty"` // if provided, turn on autoscaling
39-
Sink string `json:"sink,omitempty"`
40-
SchemaType string `json:"schemaType,omitempty"`
41-
LogTopic string `json:"logTopic,omitempty"`
42-
SourceConfig map[string]string `json:"sourceConfig,omitempty"`
43-
Resources corev1.ResourceList `json:"resources,omitempty"`
44-
SecretsMap map[string]SecretRef `json:"secretsMap,omitempty"`
45-
46-
Timeout int32 `json:"timeout,omitempty"`
47-
AutoAck *bool `json:"autoAck,omitempty"`
48-
MaxMessageRetry int32 `json:"maxMessageRetry,omitempty"`
49-
ProcessingGuarantee string `json:"processingGuarantee,omitempty"`
50-
RetainOrdering bool `json:"retainOrdering,omitempty"`
51-
RetainKeyOrdering bool `json:"retainKeyOrdering,omitempty"`
52-
DeadLetterTopic string `json:"deadLetterTopic,omitempty"`
53-
ForwardSourceMessageProperty *bool `json:"forwardSourceMessageProperty,omitempty"`
54-
MaxPendingAsyncRequests *int32 `json:"maxPendingAsyncRequests,omitempty"`
31+
Name string `json:"name,omitempty"`
32+
ClassName string `json:"className,omitempty"`
33+
Tenant string `json:"tenant,omitempty"`
34+
ClusterName string `json:"clusterName,omitempty"`
35+
SourceType string `json:"sourceType,omitempty"`
36+
SinkType string `json:"sinkType,omitempty"`
37+
Replicas *int32 `json:"replicas,omitempty"`
38+
MaxReplicas *int32 `json:"maxReplicas,omitempty"` // if provided, turn on autoscaling
39+
Sink SinkConf `json:"sink,omitempty"`
40+
SourceConfig map[string]string `json:"sourceConfig,omitempty"`
41+
Resources corev1.ResourceList `json:"resources,omitempty"`
42+
SecretsMap map[string]SecretRef `json:"secretsMap,omitempty"`
43+
ProcessingGuarantee ProcessGuarantee `json:"processingGuarantee,omitempty"`
44+
RuntimeFlags string `json:"runtimeFlags,omitempty"`
5545

5646
Messaging `json:",inline"`
5747
Runtime `json:",inline"`

api/v1alpha1/source_webhook.go

Lines changed: 5 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,10 @@ func (r *Source) Default() {
5050
r.Spec.Replicas = &zeroVal
5151
}
5252

53-
if r.Spec.AutoAck == nil {
54-
trueVal := true
55-
r.Spec.AutoAck = &trueVal
56-
}
53+
//if r.Spec.AutoAck == nil {
54+
// trueVal := true
55+
// r.Spec.AutoAck = &trueVal
56+
//}
5757

5858
if r.Spec.ProcessingGuarantee == "" {
5959
r.Spec.ProcessingGuarantee = AtleastOnce
@@ -71,16 +71,6 @@ func (r *Source) Default() {
7171
r.Spec.Tenant = DefaultTenant
7272
}
7373

74-
if r.Spec.MaxPendingAsyncRequests == nil {
75-
maxPending := int32(1000)
76-
r.Spec.MaxPendingAsyncRequests = &maxPending
77-
}
78-
79-
if r.Spec.ForwardSourceMessageProperty == nil {
80-
trueVal := true
81-
r.Spec.ForwardSourceMessageProperty = &trueVal
82-
}
83-
8474
if r.Spec.Resources.Cpu() == nil {
8575
r.Spec.Resources.Cpu().Set(int64(1))
8676
}
@@ -104,10 +94,7 @@ func (r *Source) ValidateCreate() error {
10494
}
10595
}
10696

107-
// TODO: verify topic names are valid
108-
if r.Spec.Sink == "" {
109-
return errors.New("no sink topics specified")
110-
}
97+
// TODO: verify inputConf
11198

11299
// TODO: allow 0 replicas, currently hpa's min value has to be 1
113100
if *r.Spec.Replicas == 0 {
@@ -122,26 +109,6 @@ func (r *Source) ValidateCreate() error {
122109
return errors.New("resource request is invalid. each resource value must be positive")
123110
}
124111

125-
if r.Spec.Timeout != 0 && r.Spec.ProcessingGuarantee != AtleastOnce {
126-
return errors.New("message timeout can only be set for AtleastOnce processing guarantee")
127-
}
128-
129-
if r.Spec.MaxMessageRetry > 0 && r.Spec.ProcessingGuarantee == EffectivelyOnce {
130-
return errors.New("MaxMessageRetries and Effectively once are not compatible")
131-
}
132-
133-
if r.Spec.MaxMessageRetry <= 0 && r.Spec.DeadLetterTopic != "" {
134-
return errors.New("dead letter topic is set but max message retry is set to infinity")
135-
}
136-
137-
if r.Spec.RetainKeyOrdering && r.Spec.ProcessingGuarantee == EffectivelyOnce {
138-
return errors.New("when effectively once processing guarantee is specified, retain Key ordering cannot be set")
139-
}
140-
141-
if r.Spec.RetainKeyOrdering && r.Spec.RetainOrdering {
142-
return errors.New("only one of retain ordering or retain key ordering can be set")
143-
}
144-
145112
if r.Spec.Java == nil && r.Spec.Python == nil && r.Spec.Golang == nil {
146113
return errors.New("must specify a runtime from java, python or golang")
147114
}

0 commit comments

Comments
 (0)