|
7 | 7 | from .exceptions import ValidationCodeException |
8 | 8 | from .qpid.proton._data import Described, symbol |
9 | 9 |
|
| 10 | +SQL_FILTER = "sql-filter" |
| 11 | +AMQP_SQL_FILTER = "amqp:sql-filter" |
10 | 12 | STREAM_FILTER_SPEC = "rabbitmq:stream-filter" |
11 | 13 | STREAM_OFFSET_SPEC = "rabbitmq:stream-offset-spec" |
12 | 14 | STREAM_FILTER_MATCH_UNFILTERED = "rabbitmq:stream-match-unfiltered" |
@@ -245,20 +247,24 @@ def __init__( |
245 | 247 | if offset_specification is not None: |
246 | 248 | self._offset(offset_specification) |
247 | 249 |
|
248 | | - if filter_options is not None and filter_options.values is not None: |
| 250 | + if filter_options is None: |
| 251 | + return |
| 252 | + |
| 253 | + if filter_options.values is not None: |
249 | 254 | self._filter_values(filter_options.values) |
250 | 255 |
|
251 | | - if filter_options is not None and filter_options.match_unfiltered: |
| 256 | + if filter_options.match_unfiltered: |
252 | 257 | self._filter_match_unfiltered(filter_options.match_unfiltered) |
253 | 258 |
|
254 | | - if filter_options is not None and filter_options.message_properties is not None: |
| 259 | + if filter_options.message_properties is not None: |
255 | 260 | self._filter_message_properties(filter_options.message_properties) |
256 | | - if ( |
257 | | - filter_options is not None |
258 | | - and filter_options.application_properties is not None |
259 | | - ): |
| 261 | + |
| 262 | + if filter_options.application_properties is not None: |
260 | 263 | self._filter_application_properties(filter_options.application_properties) |
261 | 264 |
|
| 265 | + if filter_options.sql is not None and filter_options.sql != "": |
| 266 | + self._filter_sql(filter_options.sql) |
| 267 | + |
262 | 268 | def _offset(self, offset_specification: Union[OffsetSpecification, int]) -> None: |
263 | 269 | """ |
264 | 270 | Set the offset specification for the stream. |
@@ -334,6 +340,18 @@ def _filter_application_properties( |
334 | 340 | Described(symbol(AMQP_APPLICATION_PROPERTIES_FILTER), app_prop) |
335 | 341 | ) |
336 | 342 |
|
| 343 | + def _filter_sql(self, sql: str) -> None: |
| 344 | + """ |
| 345 | + Set SQL filter for the stream. |
| 346 | +
|
| 347 | + Args: |
| 348 | + sql: SQL string to apply as a filter |
| 349 | + """ |
| 350 | + if sql != "": |
| 351 | + self._filter_set[symbol(SQL_FILTER)] = Described( |
| 352 | + symbol(AMQP_SQL_FILTER), sql |
| 353 | + ) |
| 354 | + |
337 | 355 | def filter_set(self) -> Dict[symbol, Described]: |
338 | 356 | """ |
339 | 357 | Get the current filter set configuration. |
|
0 commit comments