File tree Expand file tree Collapse file tree 1 file changed +5
-1
lines changed Expand file tree Collapse file tree 1 file changed +5
-1
lines changed Original file line number Diff line number Diff line change @@ -118,6 +118,8 @@ type IOffsetSpecification interface {
118118 toLinkFilter () amqp.LinkFilter
119119}
120120
121+ const sqlFilter = "sql-filter"
122+ const amqpSqlFilter = "amqp:sql-filter"
121123const rmqStreamFilter = "rabbitmq:stream-filter"
122124const rmqStreamOffsetSpec = "rabbitmq:stream-offset-spec"
123125const rmqStreamMatchUnfiltered = "rabbitmq:stream-match-unfiltered"
@@ -206,7 +208,9 @@ func (sco *StreamConsumerOptions) linkFilters() []amqp.LinkFilter {
206208
207209 filters = append (filters , sco .Offset .toLinkFilter ())
208210 if sco .StreamFilterOptions != nil && ! isStringNilOrEmpty (& sco .StreamFilterOptions .SQL ) {
209-
211+ l := map [string ]any {}
212+ l [amqpSqlFilter ] = sco .StreamFilterOptions .SQL
213+ filters = append (filters , amqp .NewLinkFilter (sqlFilter , 0 , l ))
210214 }
211215
212216 if sco .StreamFilterOptions != nil && sco .StreamFilterOptions .Values != nil {
You can’t perform that action at this time.
0 commit comments