From 2487c598f6a823db7ca252b0a4527b78836cb2f0 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Fri, 16 May 2025 15:04:27 -0400 Subject: [PATCH 1/7] Add channel name validation --- dispatcherd/brokers/pg_notify.py | 36 +++++++-- tests/integration/brokers/test_pg_notify.py | 83 +++++++++++++++++++++ tests/unit/brokers/test_pg_notify.py | 10 +++ 3 files changed, 124 insertions(+), 5 deletions(-) create mode 100644 tests/unit/brokers/test_pg_notify.py diff --git a/dispatcherd/brokers/pg_notify.py b/dispatcherd/brokers/pg_notify.py index 488576f4..60e1c269 100644 --- a/dispatcherd/brokers/pg_notify.py +++ b/dispatcherd/brokers/pg_notify.py @@ -3,6 +3,7 @@ import threading import time import uuid +import re from typing import Any, AsyncGenerator, Callable, Coroutine, Iterator, Optional, Union import psycopg @@ -37,6 +38,26 @@ def create_connection(**config) -> psycopg.Connection: # type: ignore[no-untype return connection +class DispatcherdInvalidChannel(psycopg.errors.SyntaxError): + pass + + +def validate_channel_name(channel_name: str) -> None: + """Raise an exception if the channel name can not be reliably used. + + This might happen due to a number of reasons. + The notify logic uses the channel name as a parameter, which is good for + security reasons, but imposes a character length constraint. + """ + if len(channel_name.encode('utf-8')) > 63: + raise DispatcherdInvalidChannel(f'Channel name is too long chars={len(channel_name.encode("utf-8"))}') + + # # Organic failure from postgres, from the LISTEN query + # # psycopg.errors.SyntaxError: syntax error at or near "-" + # if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', channel_name): + # raise DispatcherdInvalidChannel("Channel name must start with a letter or underscore, and contain only letters, digits, or underscores.") + + class Broker(BrokerProtocol): NOTIFY_QUERY_TEMPLATE = 'SELECT pg_notify(%s, %s);' @@ -122,14 +143,17 @@ def generate_self_check_channel_name(cls) -> str: def get_publish_channel(self, channel: Optional[str] = None) -> str: "Handle default for the publishing channel for calls to publish_message, shared sync and async" if channel is not None: - return channel + return_channel = channel elif self.default_publish_channel is not None: - return self.default_publish_channel + return_channel = self.default_publish_channel elif len(self.user_channels) == 1: # de-facto default channel, because there is only 1 - return self.channels[0] + return_channel = self.channels[0] + else: + raise ValueError('Could not determine a channel to use publish to from settings or PGNotify config') - raise ValueError('Could not determine a channel to use publish to from settings or PGNotify config') + validate_channel_name(return_channel) + return return_channel def __str__(self) -> str: return 'pg_notify-broker' @@ -159,7 +183,9 @@ def get_listen_query(self, channel: str) -> psycopg.sql.Composed: This uses the psycopg utilities which ensure correct escaping so SQL injection is not possible. Return value is a valid argument for cursor.execute() """ - return psycopg.sql.SQL("LISTEN {};").format(psycopg.sql.Identifier(channel)) + # Postgres does not allow parameters for identifiers, so this limits what channel we can accept + validate_channel_name(channel) + return psycopg.sql.SQL("LISTEN {}").format(psycopg.sql.Identifier(channel)) def get_unlisten_query(self) -> psycopg.sql.SQL: """Stops listening on all channels for current session, see pg_notify docs""" diff --git a/tests/integration/brokers/test_pg_notify.py b/tests/integration/brokers/test_pg_notify.py index d31cb98a..a6ab52d5 100644 --- a/tests/integration/brokers/test_pg_notify.py +++ b/tests/integration/brokers/test_pg_notify.py @@ -3,6 +3,8 @@ import pytest +import psycopg + from dispatcherd.brokers.pg_notify import Broker, acreate_connection, create_connection @@ -106,3 +108,84 @@ async def test_async_connection_from_config_reuse(conn_config): assert conn is conn2 assert conn is not await acreate_connection(**conn_config) + + + +VALID_CHANNEL_NAMES = [ + 'foobar', + 'foobar🔥', + 'foo-bar', + '-foo-bar', + 'a' * 63 # just under the limit +] + + +BAD_CHANNEL_NAMES = [ + '🔥' * 22, # under 64 but expanded unicode is over + 'a' * 64, # over the limit of 63 + 'a' * 120 +] + + +class TestChannelSanitization: + def test_valid_channel(self, conn_config): + broker = Broker(config=conn_config) + broker.publish_message(channel='foobar', message='foobar') + + @pytest.mark.parametrize('channel_name', VALID_CHANNEL_NAMES) + def test_psycopg_valid_sanity_check(self, channel_name, conn_config): + """Sanity check that postgres itself will accept valid names for listening""" + conn = psycopg.connect(**conn_config, autocommit=True) + conn.execute(psycopg.sql.SQL("LISTEN {};").format(psycopg.sql.Identifier(channel_name))) + # conn.execute(f"LISTEN {channel_name}") + conn.execute(Broker.NOTIFY_QUERY_TEMPLATE, (channel_name, 'foo')) + + @pytest.mark.parametrize('channel_name', BAD_CHANNEL_NAMES) + def test_psycopg_error_sanity_check(self, channel_name, conn_config): + """Sanity check that postgres itself will raise an error for the known invalid names""" + conn = psycopg.connect(**conn_config, autocommit=True) + with pytest.raises(psycopg.DatabaseError): + conn.execute(psycopg.sql.SQL("LISTEN {};").format(psycopg.sql.Identifier(channel_name))) + # conn.execute(f"LISTEN {channel_name}") + conn.execute(Broker.NOTIFY_QUERY_TEMPLATE, (channel_name, 'foo')) + + @pytest.mark.parametrize('channel_name', VALID_CHANNEL_NAMES) + def test_valid_channel_publish(self, channel_name, conn_config): + broker = Broker(config=conn_config) + broker.publish_message(channel=channel_name, message='foobar') + + @pytest.mark.parametrize('channel_name', BAD_CHANNEL_NAMES) + def test_invalid_channel_publish(self, channel_name, conn_config): + broker = Broker(config=conn_config) + with pytest.raises(psycopg.DatabaseError): + broker.publish_message(channel=channel_name, message='foobar') + + @pytest.fixture + def can_receive_notification(self, conn_config): + def _rf(channel_name): + conn = psycopg.connect(**conn_config, autocommit=True) + try: + conn.execute(psycopg.sql.SQL("LISTEN {};").format(psycopg.sql.Identifier(channel_name))) + # conn.execute(f"LISTEN {psycopg.sql.Identifier(channel_name)};") + conn.execute(Broker.NOTIFY_QUERY_TEMPLATE, (channel_name, 'this is a test message')) + except Exception: + return False # did not work + gen = conn.notifies(timeout=0.001) + try: + for notify in gen: + print(notify) + gen.close() + return True + else: + return False + finally: + gen.close() + return _rf + + @pytest.mark.parametrize('channel_name', VALID_CHANNEL_NAMES) + def test_can_receive_over_valid_channels(self, can_receive_notification, channel_name): + assert can_receive_notification(channel_name) + + @pytest.mark.parametrize('channel_name', BAD_CHANNEL_NAMES) + def test_can_not_receive_over_invalid_channels(self, can_receive_notification, channel_name): + assert not can_receive_notification(channel_name) diff --git a/tests/unit/brokers/test_pg_notify.py b/tests/unit/brokers/test_pg_notify.py new file mode 100644 index 00000000..d3f5db44 --- /dev/null +++ b/tests/unit/brokers/test_pg_notify.py @@ -0,0 +1,10 @@ +import pytest + +from dispatcherd.brokers.pg_notify import Broker + + +def test_invalid_characters(conn_config): + broker = Broker(config=conn_config) + with pytest.raises(Exception): + broker.publish_message(channel='foo-bar', message='message') + From aa820f7d26be8c37d1cb574c678f4ef733caa65b Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Fri, 16 May 2025 15:05:13 -0400 Subject: [PATCH 2/7] Remove comments --- dispatcherd/brokers/pg_notify.py | 5 ----- tests/integration/brokers/test_pg_notify.py | 3 --- 2 files changed, 8 deletions(-) diff --git a/dispatcherd/brokers/pg_notify.py b/dispatcherd/brokers/pg_notify.py index 60e1c269..249eeff7 100644 --- a/dispatcherd/brokers/pg_notify.py +++ b/dispatcherd/brokers/pg_notify.py @@ -52,11 +52,6 @@ def validate_channel_name(channel_name: str) -> None: if len(channel_name.encode('utf-8')) > 63: raise DispatcherdInvalidChannel(f'Channel name is too long chars={len(channel_name.encode("utf-8"))}') - # # Organic failure from postgres, from the LISTEN query - # # psycopg.errors.SyntaxError: syntax error at or near "-" - # if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', channel_name): - # raise DispatcherdInvalidChannel("Channel name must start with a letter or underscore, and contain only letters, digits, or underscores.") - class Broker(BrokerProtocol): NOTIFY_QUERY_TEMPLATE = 'SELECT pg_notify(%s, %s);' diff --git a/tests/integration/brokers/test_pg_notify.py b/tests/integration/brokers/test_pg_notify.py index a6ab52d5..8a052b68 100644 --- a/tests/integration/brokers/test_pg_notify.py +++ b/tests/integration/brokers/test_pg_notify.py @@ -137,7 +137,6 @@ def test_psycopg_valid_sanity_check(self, channel_name, conn_config): """Sanity check that postgres itself will accept valid names for listening""" conn = psycopg.connect(**conn_config, autocommit=True) conn.execute(psycopg.sql.SQL("LISTEN {};").format(psycopg.sql.Identifier(channel_name))) - # conn.execute(f"LISTEN {channel_name}") conn.execute(Broker.NOTIFY_QUERY_TEMPLATE, (channel_name, 'foo')) @pytest.mark.parametrize('channel_name', BAD_CHANNEL_NAMES) @@ -146,7 +145,6 @@ def test_psycopg_error_sanity_check(self, channel_name, conn_config): conn = psycopg.connect(**conn_config, autocommit=True) with pytest.raises(psycopg.DatabaseError): conn.execute(psycopg.sql.SQL("LISTEN {};").format(psycopg.sql.Identifier(channel_name))) - # conn.execute(f"LISTEN {channel_name}") conn.execute(Broker.NOTIFY_QUERY_TEMPLATE, (channel_name, 'foo')) @pytest.mark.parametrize('channel_name', VALID_CHANNEL_NAMES) @@ -166,7 +164,6 @@ def _rf(channel_name): conn = psycopg.connect(**conn_config, autocommit=True) try: conn.execute(psycopg.sql.SQL("LISTEN {};").format(psycopg.sql.Identifier(channel_name))) - # conn.execute(f"LISTEN {psycopg.sql.Identifier(channel_name)};") conn.execute(Broker.NOTIFY_QUERY_TEMPLATE, (channel_name, 'this is a test message')) except Exception: return False # did not work From f30ed3aec99a19a7cba1e0b4151f7d01b61a0dfc Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Fri, 16 May 2025 15:05:36 -0400 Subject: [PATCH 3/7] Fix linters --- dispatcherd/brokers/pg_notify.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dispatcherd/brokers/pg_notify.py b/dispatcherd/brokers/pg_notify.py index 249eeff7..cfd01811 100644 --- a/dispatcherd/brokers/pg_notify.py +++ b/dispatcherd/brokers/pg_notify.py @@ -3,7 +3,6 @@ import threading import time import uuid -import re from typing import Any, AsyncGenerator, Callable, Coroutine, Iterator, Optional, Union import psycopg From b3d4f2d6b275666a8f5b02ae817681b02825889a Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Fri, 16 May 2025 15:06:43 -0400 Subject: [PATCH 4/7] Remove abandoned file --- tests/unit/brokers/test_pg_notify.py | 10 ---------- 1 file changed, 10 deletions(-) delete mode 100644 tests/unit/brokers/test_pg_notify.py diff --git a/tests/unit/brokers/test_pg_notify.py b/tests/unit/brokers/test_pg_notify.py deleted file mode 100644 index d3f5db44..00000000 --- a/tests/unit/brokers/test_pg_notify.py +++ /dev/null @@ -1,10 +0,0 @@ -import pytest - -from dispatcherd.brokers.pg_notify import Broker - - -def test_invalid_characters(conn_config): - broker = Broker(config=conn_config) - with pytest.raises(Exception): - broker.publish_message(channel='foo-bar', message='message') - From 54f133a44534f44fd059bef9e2f85e719983ffa5 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Mon, 19 May 2025 15:02:53 -0400 Subject: [PATCH 5/7] Add blank string to invalid list --- tests/integration/brokers/test_pg_notify.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/integration/brokers/test_pg_notify.py b/tests/integration/brokers/test_pg_notify.py index 8a052b68..15d97ed3 100644 --- a/tests/integration/brokers/test_pg_notify.py +++ b/tests/integration/brokers/test_pg_notify.py @@ -121,9 +121,10 @@ async def test_async_connection_from_config_reuse(conn_config): BAD_CHANNEL_NAMES = [ - '🔥' * 22, # under 64 but expanded unicode is over + 'a' + '🔥' * 22, # under 64 but expanded unicode is over 'a' * 64, # over the limit of 63 - 'a' * 120 + 'a' * 120, + '' ] @@ -170,7 +171,7 @@ def _rf(channel_name): gen = conn.notifies(timeout=0.001) try: for notify in gen: - print(notify) + assert notify.payload == 'this is a test message' gen.close() return True else: From 8800d7242b64635347aafe9fc2078e340e16a8fa Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Mon, 19 May 2025 15:10:00 -0400 Subject: [PATCH 6/7] Split sanity and code tests --- tests/integration/brokers/test_pg_notify.py | 48 ++++++++++++++------- 1 file changed, 33 insertions(+), 15 deletions(-) diff --git a/tests/integration/brokers/test_pg_notify.py b/tests/integration/brokers/test_pg_notify.py index 15d97ed3..19194459 100644 --- a/tests/integration/brokers/test_pg_notify.py +++ b/tests/integration/brokers/test_pg_notify.py @@ -128,11 +128,11 @@ async def test_async_connection_from_config_reuse(conn_config): ] -class TestChannelSanitization: - def test_valid_channel(self, conn_config): - broker = Broker(config=conn_config) - broker.publish_message(channel='foobar', message='foobar') +class TestChannelSanitizationPostgresSanity: + """These do not test dispatcherd itself, but give a reference by testing psycopg and postgres + These tests validate that the valid and bad channel name lists are, in fact, bad and valid. + """ @pytest.mark.parametrize('channel_name', VALID_CHANNEL_NAMES) def test_psycopg_valid_sanity_check(self, channel_name, conn_config): """Sanity check that postgres itself will accept valid names for listening""" @@ -148,17 +148,6 @@ def test_psycopg_error_sanity_check(self, channel_name, conn_config): conn.execute(psycopg.sql.SQL("LISTEN {};").format(psycopg.sql.Identifier(channel_name))) conn.execute(Broker.NOTIFY_QUERY_TEMPLATE, (channel_name, 'foo')) - @pytest.mark.parametrize('channel_name', VALID_CHANNEL_NAMES) - def test_valid_channel_publish(self, channel_name, conn_config): - broker = Broker(config=conn_config) - broker.publish_message(channel=channel_name, message='foobar') - - @pytest.mark.parametrize('channel_name', BAD_CHANNEL_NAMES) - def test_invalid_channel_publish(self, channel_name, conn_config): - broker = Broker(config=conn_config) - with pytest.raises(psycopg.DatabaseError): - broker.publish_message(channel=channel_name, message='foobar') - @pytest.fixture def can_receive_notification(self, conn_config): def _rf(channel_name): @@ -187,3 +176,32 @@ def test_can_receive_over_valid_channels(self, can_receive_notification, channel @pytest.mark.parametrize('channel_name', BAD_CHANNEL_NAMES) def test_can_not_receive_over_invalid_channels(self, can_receive_notification, channel_name): assert not can_receive_notification(channel_name) + + +class TestChannelSanitizationPostgres: + """These tests verify that we do early validation + + Specifically, this means that dispatcherd will not let you listen to a channel you can not send to + and that you can not send to a channel you can not listen to""" + + @pytest.mark.parametrize('channel_name', VALID_CHANNEL_NAMES) + def test_valid_channel_publish(self, channel_name, conn_config): + broker = Broker(config=conn_config) + broker.publish_message(channel=channel_name, message='foobar') + + @pytest.mark.parametrize('channel_name', BAD_CHANNEL_NAMES) + def test_invalid_channel_publish(self, channel_name, conn_config): + broker = Broker(config=conn_config) + with pytest.raises(psycopg.DatabaseError): + broker.publish_message(channel=channel_name, message='foobar') + + @pytest.mark.parametrize('channel_name', VALID_CHANNEL_NAMES) + def test_valid_channel_listen(self, channel_name, conn_config): + broker = Broker(config=conn_config, channels=[channel_name]) + broker.process_notify(max_messages=0) + + @pytest.mark.parametrize('channel_name', BAD_CHANNEL_NAMES) + def test_invalid_channel_listen(self, channel_name, conn_config): + with pytest.raises(psycopg.DatabaseError): + broker = Broker(config=conn_config, channels=[channel_name]) + broker.process_notify(max_messages=0) From 1e3200256509360e71c7df4307f157ea04e8b6fb Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Mon, 19 May 2025 16:44:53 -0400 Subject: [PATCH 7/7] Add logic change missing --- dispatcherd/brokers/pg_notify.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/dispatcherd/brokers/pg_notify.py b/dispatcherd/brokers/pg_notify.py index cfd01811..233bddb9 100644 --- a/dispatcherd/brokers/pg_notify.py +++ b/dispatcherd/brokers/pg_notify.py @@ -51,6 +51,9 @@ def validate_channel_name(channel_name: str) -> None: if len(channel_name.encode('utf-8')) > 63: raise DispatcherdInvalidChannel(f'Channel name is too long chars={len(channel_name.encode("utf-8"))}') + if not channel_name: + raise DispatcherdInvalidChannel(f'Received blank channel name {channel_name}. PG notify channel name can not be blank.') + class Broker(BrokerProtocol): NOTIFY_QUERY_TEMPLATE = 'SELECT pg_notify(%s, %s);' @@ -120,6 +123,10 @@ def __init__( server_channels.append(self.self_check_channel) self.channels = server_channels + # Raise an early error if any of the channel names are invalid + for channel in self.channels: + validate_channel_name(channel) + self.default_publish_channel = default_publish_channel self.self_check_status = BrokerSelfCheckStatus.IDLE self.last_self_check_message_time = time.monotonic()