Skip to content

Commit 749de74

Browse files
committed
add support for Pub/Sub via HTTP
1 parent 4b3e058 commit 749de74

File tree

6 files changed

+124
-4
lines changed

6 files changed

+124
-4
lines changed

docs/conf.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
templates_path = ['_templates']
6464

6565
# The suffix of source filenames.
66-
source_suffix = '.rst'
66+
source_suffix = {'.rst': 'restructuredtext'}
6767

6868
locale_dirs = ['locale/'] # path is example but recommended.
6969

docs/pubsub.rst

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,30 @@ Example directive:
2727
broker:
2828
type: mqtt
2929
url: mqtt://localhost:1883
30+
channel: messages/a/data # optional
31+
32+
HTTP
33+
----
34+
35+
Example directive:
36+
37+
.. code-block:: yaml
38+
39+
pubsub:
40+
broker:
41+
type: http
42+
url: https://ntfy.sh
43+
channel: messages/a/data # optional
3044
3145
.. note::
3246

33-
For MQTT endpoints requiring authentication, encode the ``url`` value as follows: ``mqtt://username:password@localhost:1883``
47+
For any Pub/Sub endpoints requiring authentication, encode the ``url`` value as follows:
48+
49+
* ``mqtt://username:password@localhost:1883``
50+
* ``https://username:password@localhost``
51+
52+
.. note::
3453

54+
If no ``channel`` is defined, the relevant OGC API endpoint is used.
3555

3656
.. _`OGC API Publish-Subscribe Workflow - Part 1: Core`: https://docs.ogc.org/DRAFTS/25-030.html

pycsw/broker/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,5 +48,6 @@ def load_client(def_: dict) -> BasePubSubClient:
4848

4949

5050
CLIENTS = {
51-
'mqtt': 'pycsw.broker.mqtt.MQTTPubSubClient'
51+
'mqtt': 'pycsw.broker.mqtt.MQTTPubSubClient',
52+
'http': 'pycsw.broker.http.HTTPPubSubClient'
5253
}

pycsw/broker/base.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ def __init__(self, publisher_def: dict):
5050

5151
self.type = None
5252
self.client_id = f'pycsw-pubsub-{random.randint(0, 1000)}'
53+
self.channel = publisher_def.get('channel')
5354

5455
self.show_link = publisher_def.get('show_link', True)
5556
self.broker = publisher_def['url']

pycsw/broker/http.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
# =================================================================
2+
#
3+
# Authors: Tom Kralidis <tomkralidis@gmail.com>
4+
#
5+
# Copyright (c) 2025 Tom Kralidis
6+
#
7+
# Permission is hereby granted, free of charge, to any person
8+
# obtaining a copy of this software and associated documentation
9+
# files (the "Software"), to deal in the Software without
10+
# restriction, including without limitation the rights to use,
11+
# copy, modify, merge, publish, distribute, sublicense, and/or sell
12+
# copies of the Software, and to permit persons to whom the
13+
# Software is furnished to do so, subject to the following
14+
# conditions:
15+
#
16+
# The above copyright notice and this permission notice shall be
17+
# included in all copies or substantial portions of the Software.
18+
#
19+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
20+
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
21+
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
22+
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
23+
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
24+
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
25+
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
26+
# OTHER DEALINGS IN THE SOFTWARE.
27+
#
28+
# =================================================================
29+
30+
import logging
31+
32+
import requests
33+
34+
from pycsw.broker.base import BasePubSubClient
35+
36+
LOGGER = logging.getLogger(__name__)
37+
38+
39+
class HTTPPubSubClient(BasePubSubClient):
40+
"""HTTP client"""
41+
42+
def __init__(self, broker_url):
43+
"""
44+
Initialize object
45+
46+
:param publisher_def: provider definition
47+
48+
:returns: pycsw.pubsub.http.HTTPPubSubClient
49+
"""
50+
51+
super().__init__(broker_url)
52+
self.type = 'http'
53+
self.auth = None
54+
55+
msg = f'Initializing to broker {self.broker_safe_url} with id {self.client_id}' # noqa
56+
LOGGER.debug(msg)
57+
58+
if None not in [self.broker_url.username, self.broker_url.password]:
59+
LOGGER.debug('Setting credentials')
60+
self.auth = (
61+
self.broker_url.username,
62+
self.broker_url.password
63+
)
64+
65+
def connect(self) -> None:
66+
"""
67+
Connect to an HTTP broker
68+
69+
:returns: None
70+
"""
71+
72+
LOGGER.debug('No connection to HTTP')
73+
pass
74+
75+
def pub(self, channel: str, message: str, qos: int = 1) -> bool:
76+
"""
77+
Publish a message to a broker/channel
78+
79+
:param channel: `str` of topic
80+
:param message: `str` of message
81+
82+
:returns: `bool` of publish result
83+
"""
84+
85+
LOGGER.debug(f'Publishing to broker {self.broker_safe_url}')
86+
LOGGER.debug(f'Channel: {channel}')
87+
LOGGER.debug(f'Message: {message}')
88+
89+
url = f'{self.broker}/{channel}'
90+
91+
try:
92+
response = requests.post(url, auth=self.auth, json=message)
93+
response.raise_for_status()
94+
except Exception as err:
95+
LOGGER.debug(f'Message publishing failed: {err}')
96+
97+
def __repr__(self):
98+
return f'<HTTPPubSubClient> {self.broker_safe_url}'

pycsw/ogc/pubsub/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def publish_message(pubsub_client, action: str, collection: str = None,
4848
:returns: `bool` of whether message publishing was successful
4949
"""
5050

51-
channel = f'collections/{collection}'
51+
channel = pubsub_client.channel or f'collections/{collection}'
5252
type_ = f'org.ogc.api.collection.item.{action}'
5353

5454
if action in ['create', 'update']:

0 commit comments

Comments
 (0)