Skip to content

Commit 33e5fdd

Browse files
authored
check if the version is rabbitmq 4.2 (#61) for the SQL filters
Closes #58 Update CI with RabbitMQ version to 4.2-rc Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 6dd64ee commit 33e5fdd

File tree

3 files changed

+22
-2
lines changed

3 files changed

+22
-2
lines changed

.ci/ubuntu/gha-setup.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ set -o xtrace
77
script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
88
readonly script_dir
99
echo "[INFO] script_dir: '$script_dir'"
10-
readonly rabbitmq_image=rabbitmq:4.1-management-alpine
10+
readonly rabbitmq_image=rabbitmq:4.2-rc-management-alpine
1111

1212

1313
readonly docker_name_prefix='rabbitmq-amqp-go-client'

pkg/rabbitmqamqp/amqp_types.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,9 @@ type StreamFilterOptions struct {
171171

172172
// Filter the data based on Message Properties
173173
Properties *amqp.MessageProperties
174+
175+
// SQLFilter
176+
SQL string
174177
}
175178

176179
/*
@@ -202,6 +205,10 @@ func (sco *StreamConsumerOptions) linkFilters() []amqp.LinkFilter {
202205
var filters []amqp.LinkFilter
203206

204207
filters = append(filters, sco.Offset.toLinkFilter())
208+
if sco.StreamFilterOptions != nil && !isStringNilOrEmpty(&sco.StreamFilterOptions.SQL) {
209+
210+
}
211+
205212
if sco.StreamFilterOptions != nil && sco.StreamFilterOptions.Values != nil {
206213
var l []any
207214
for _, f := range sco.StreamFilterOptions.Values {
@@ -287,11 +294,22 @@ func (sco *StreamConsumerOptions) id() string {
287294
}
288295

289296
func (sco *StreamConsumerOptions) validate(available *featuresAvailable) error {
290-
if sco.StreamFilterOptions != nil && sco.StreamFilterOptions.Properties != nil {
297+
if sco.StreamFilterOptions == nil {
298+
return nil
299+
}
300+
301+
if sco.StreamFilterOptions.Properties != nil {
291302
if !available.is41OrMore {
292303
return fmt.Errorf("stream consumer with properties filter is not supported. You need RabbitMQ 4.1 or later")
293304
}
294305
}
306+
307+
if !isStringNilOrEmpty(&sco.StreamFilterOptions.SQL) {
308+
if !available.is42rMore {
309+
return fmt.Errorf("stream consumer with SQL filter is not supported. You need RabbitMQ 4.2 or later")
310+
}
311+
return nil
312+
}
295313
return nil
296314
}
297315

pkg/rabbitmqamqp/features_available.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ func (v Version) Compare(other Version) int {
2626
type featuresAvailable struct {
2727
is4OrMore bool
2828
is41OrMore bool
29+
is42rMore bool
2930
isRabbitMQ bool
3031
}
3132

@@ -45,6 +46,7 @@ func (f *featuresAvailable) ParseProperties(properties map[string]any) error {
4546

4647
f.is4OrMore = isVersionGreaterOrEqual(version, "4.0.0")
4748
f.is41OrMore = isVersionGreaterOrEqual(version, "4.1.0")
49+
f.is42rMore = isVersionGreaterOrEqual(version, "4.2.0")
4850
f.isRabbitMQ = strings.EqualFold(properties["product"].(string), "RabbitMQ")
4951
return nil
5052
}

0 commit comments

Comments
 (0)