@@ -34,10 +34,11 @@ public async Task TestSqlFilterFunctionalityAsync()
3434 IQueueSpecification q = _management . Queue ( _queueName ) . Stream ( ) . Queue ( ) ;
3535 await q . DeclareAsync ( ) ;
3636 TaskCompletionSource < IMessage > tcs =
37- new TaskCompletionSource < IMessage > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
37+ new ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
3838 IConsumer consumer = await _connection . ConsumerBuilder ( )
3939 . Queue ( _queueName )
40- . Stream ( ) . Filter ( ) . Sql ( "properties.subject LIKE '%John%'" ) . Stream ( ) . Offset ( StreamOffsetSpecification . First )
40+ . Stream ( ) . Filter ( ) . Sql ( "properties.subject LIKE '%John%'" ) . Stream ( )
41+ . Offset ( StreamOffsetSpecification . First )
4142 . Builder ( ) . MessageHandler ( ( IContext ctx , IMessage msg ) =>
4243 {
4344 tcs . SetResult ( msg ) ;
@@ -61,10 +62,60 @@ public async Task TestSqlFilterFunctionalityAsync()
6162
6263 Assert . Equal ( "Test message for SQL filter" , tcs . Task . Result . BodyAsString ( ) ) ;
6364 Assert . Equal ( "John" , tcs . Task . Result . Subject ( ) ) ;
65+ Assert . Equal ( "Test message for SQL filter" , tcs . Task . Result . BodyAsString ( ) ) ;
6466 await consumer . CloseAsync ( ) ;
6567 await publisher . CloseAsync ( ) ;
6668 await q . DeleteAsync ( ) ;
6769 await _connection . CloseAsync ( ) ;
6870 }
71+
72+ [ SkippableTheory ]
73+ [ Trait ( "Category" , "SqlFilter" ) ]
74+ [ InlineData ( "myP" , "John" ) ]
75+ [ InlineData ( "myP" , "Doe" ) ]
76+ [ InlineData ( "user_id" , "Alice" ) ]
77+ [ InlineData ( "user_id" , "Bob" ) ]
78+ public async Task TestSqlFilterFunctionalityAsyncValues ( string property , string value )
79+ {
80+ Assert . NotNull ( _connection ) ;
81+ Assert . NotNull ( _management ) ;
82+
83+ // cast to AMQPConnection to use Skip.If
84+ var amqpConnection = ( _connection as AmqpConnection ) ;
85+ Skip . IfNot ( amqpConnection is { _featureFlags . IsSqlFeatureEnabled : true } ,
86+ "SQL filter is not supported by the connection." ) ;
87+ IQueueSpecification q = _management . Queue ( _queueName ) . Stream ( ) . Queue ( ) ;
88+ await q . DeclareAsync ( ) ;
89+ TaskCompletionSource < IMessage > tcs =
90+ new ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
91+ IConsumer consumer = await _connection . ConsumerBuilder ( )
92+ . Queue ( _queueName )
93+ . Stream ( ) . Filter ( ) . Sql ( $ "{ property } LIKE '%{ value } '") . Stream ( )
94+ . Offset ( StreamOffsetSpecification . First )
95+ . Builder ( ) . MessageHandler ( ( IContext ctx , IMessage msg ) =>
96+ {
97+ tcs . SetResult ( msg ) ;
98+ // Here you would implement the logic to handle messages that match the SQL filter.
99+ // For example, you could validate that the message content matches expected SQL criteria.
100+ return Task . CompletedTask ;
101+ } )
102+ . BuildAndStartAsync ( ) ;
103+
104+ IPublisher publisher = await _connection . PublisherBuilder ( ) . Queue ( _queueName ) . BuildAsync ( ) ;
105+ await publisher . PublishAsync ( new AmqpMessage ( $ "NO")
106+ . Property ( property , "NO" ) ) ;
107+
108+ await publisher . PublishAsync ( new AmqpMessage ( $ "with property_{ property } value { value } ")
109+ . Property ( property , value ) ) ;
110+
111+ await tcs . Task . WaitAsync ( TimeSpan . FromSeconds ( 10 ) ) ;
112+ Assert . Equal ( $ "with property_{ property } value { value } ", tcs . Task . Result . BodyAsString ( ) ) ;
113+ Assert . Equal ( value , tcs . Task . Result . Property ( property ) ) ;
114+ await consumer . CloseAsync ( ) ;
115+ await publisher . CloseAsync ( ) ;
116+ await q . DeleteAsync ( ) ;
117+ await _connection . CloseAsync ( ) ;
118+
119+ }
69120 }
70121}
0 commit comments