Skip to content

Commit c64c673

Browse files
committed
provide a way to create credential
Signed-off-by: Emelia Lei <[email protected]>
1 parent c1ec741 commit c64c673

File tree

9 files changed

+169
-3
lines changed

9 files changed

+169
-3
lines changed

src/blazingmq/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from . import exceptions
1717
from . import session_events
1818
from ._about import __version__
19+
from ._authncb import BasicAuthnCredentialCb
1920
from ._enums import AckStatus
2021
from ._enums import CompressionAlgorithmType
2122
from ._enums import PropertyType
@@ -34,6 +35,7 @@
3435
__all__ = [
3536
"Ack",
3637
"AckStatus",
38+
"BasicAuthnCredentialCb",
3739
"BasicHealthMonitor",
3840
"CompressionAlgorithmType",
3941
"Error",

src/blazingmq/_authncb.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Copyright 2019-2023 Bloomberg Finance L.P.
2+
# SPDX-License-Identifier: Apache-2.0
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
from __future__ import annotations
17+
from typing import Callable, Optional, Tuple
18+
from ._ext import FakeAuthnCredentialCb
19+
20+
CredentialTuple = Tuple[str, bytes]
21+
22+
23+
class BasicAuthnCredentialCb:
24+
"""Wrap a Python callable returning (mechanism:str, data:bytes) or None."""
25+
26+
def __init__(self, callback: Callable[[], Optional[CredentialTuple]]):
27+
if not callable(callback):
28+
raise TypeError("callback must be callable")
29+
self._authncb = FakeAuthnCredentialCb(callback)
30+
31+
def __repr__(self) -> str:
32+
return "BasicAuthnCredentialCb(...)"

src/blazingmq/_ext.pyi

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ class FakeHostHealthMonitor:
3737
def set_healthy(self) -> None: ...
3838
def set_unhealthy(self) -> None: ...
3939

40+
class FakeAuthnCredentialCb:
41+
def __init__(self, callback: Callable[[], Optional[tuple[str, bytes]]]) -> None: ...
42+
4043
class Session:
4144
def __init__(
4245
self,
@@ -53,6 +56,7 @@ class Session:
5356
timeouts: Timeouts = Timeouts(),
5457
monitor_host_health: bool = False,
5558
fake_host_health_monitor: Optional[FakeHostHealthMonitor] = None,
59+
fake_authn_credential_cb: Optional[FakeAuthnCredentialCb] = None,
5660
) -> None: ...
5761
def stop(self) -> None: ...
5862
def open_queue_sync(

src/blazingmq/_ext.pyx

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,15 @@ import weakref
2121
from bsl cimport optional
2222
from bsl cimport pair
2323
from bsl cimport shared_ptr
24+
from bsl cimport vector
25+
from bsl cimport string
2426
from bsl.bsls cimport TimeInterval
2527
from cpython.ceval cimport PyEval_InitThreads
2628
from libcpp cimport bool as cppbool
2729

2830
from bmq.bmqa cimport ManualHostHealthMonitor
2931
from bmq.bmqt cimport AckResult
32+
from bmq.bmqt cimport AuthnCredential
3033
from bmq.bmqt cimport CompressionAlgorithmType
3134
from bmq.bmqt cimport HostHealthState
3235
from bmq.bmqt cimport PropertyType
@@ -153,6 +156,38 @@ cdef class FakeHostHealthMonitor:
153156
self._monitor.get().setState(HostHealthState.e_UNHEALTHY)
154157

155158

159+
cdef class FakeAuthnCredentialCb:
160+
cdef object _callback # Store the Python callable
161+
162+
def __cinit__(self, callback):
163+
if not callable(callback):
164+
raise TypeError("callback must be callable")
165+
self._callback = callback
166+
167+
# This method will be called by C++ code
168+
# Returns None for no credential, or (mechanism, data) tuple
169+
cdef object get_credential_data(self):
170+
try:
171+
result = self._callback()
172+
if result is None:
173+
return None
174+
175+
if not isinstance(result, tuple) or len(result) != 2:
176+
raise ValueError("callback must return (str, bytes) or None")
177+
178+
mechanism, data = result
179+
if not isinstance(mechanism, str) or not isinstance(data, bytes):
180+
raise ValueError("callback must return (str, bytes) or None")
181+
182+
# Return as-is, let C++ side handle conversion
183+
return result
184+
185+
except Exception as e:
186+
# Log error or handle as needed
187+
LOGGER.exception("Error in authentication credential callback")
188+
return None
189+
190+
156191
cdef class Session:
157192
cdef object __weakref__
158193
cdef NativeSession* _session
@@ -173,6 +208,7 @@ cdef class Session:
173208
timeouts: _timeouts.Timeouts = _timeouts.Timeouts(),
174209
monitor_host_health: bool = False,
175210
fake_host_health_monitor: FakeHostHealthMonitor = None,
211+
fake_authn_credential_cb: FakeAuthnCredentialCb = None,
176212
_mock: Optional[object] = None,
177213
) -> None:
178214
cdef shared_ptr[ManualHostHealthMonitor] fake_host_health_monitor_sp
@@ -224,6 +260,7 @@ cdef class Session:
224260
session_cb,
225261
message_cb,
226262
ack_cb,
263+
fake_authn_credential_cb,
227264
c_broker_uri,
228265
c_script_name,
229266
COMPRESSION_ALGO_FROM_PY_MAPPING[message_compression_algorithm],

src/blazingmq/_session.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from ._messages import Message
3636
from ._messages import MessageHandle
3737
from ._monitors import BasicHealthMonitor
38+
from ._authncb import BasicAuthnCredentialCb
3839
from ._timeouts import Timeouts
3940
from ._typing import PropertyTypeDict
4041
from ._typing import PropertyValueDict
@@ -418,6 +419,7 @@ def __init__(
418419
),
419420
timeout: Union[Timeouts, float] = DEFAULT_TIMEOUT,
420421
host_health_monitor: Union[BasicHealthMonitor, None] = (DefaultMonitor()),
422+
authn_credential_cb: Optional[BasicAuthnCredentialCb] = None,
421423
num_processing_threads: Optional[int] = None,
422424
blob_buffer_size: Optional[int] = None,
423425
channel_high_watermark: Optional[int] = None,
@@ -433,6 +435,7 @@ def __init__(
433435

434436
monitor_host_health = host_health_monitor is not None
435437
fake_host_health_monitor = getattr(host_health_monitor, "_monitor", None)
438+
fake_authn_credential_cb = getattr(authn_credential_cb, "_authncb", None)
436439

437440
self._has_no_on_message = on_message is None
438441

@@ -459,6 +462,7 @@ def __init__(
459462
timeouts=_validate_timeouts(timeout),
460463
monitor_host_health=monitor_host_health,
461464
fake_host_health_monitor=fake_host_health_monitor,
465+
fake_authn_credential_cb=fake_authn_credential_cb,
462466
)
463467

464468
@classmethod

src/cpp/pybmq_session.cpp

Lines changed: 73 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
#include <pybmq_session.h>
1717

18+
#include <pybmq_gilacquireguard.h>
1819
#include <pybmq_gilreleaseguard.h>
1920
#include <pybmq_messageutils.h>
2021
#include <pybmq_mocksession.h>
@@ -77,13 +78,14 @@ Session::Session(
7778
PyObject* py_session_event_callback,
7879
PyObject* py_message_event_callback,
7980
PyObject* py_ack_event_callback,
81+
PyObject* fake_authn_credential_cb,
8082
const char* broker_uri,
8183
const char* script_name,
8284
bmqt::CompressionAlgorithmType::Enum message_compression_type,
8385
bsl::optional<int> num_processing_threads,
8486
bsl::optional<int> blob_buffer_size,
8587
bsl::optional<int> channel_high_watermark,
86-
bsl::optional<bsl::pair<int, int> > event_queue_watermarks,
88+
bsl::optional<bsl::pair<int, int>> event_queue_watermarks,
8789
const bsls::TimeInterval& stats_dump_interval,
8890
const bsls::TimeInterval& connect_timeout,
8991
const bsls::TimeInterval& disconnect_timeout,
@@ -144,6 +146,74 @@ Session::Session(
144146
event_queue_watermarks.value().second);
145147
}
146148

149+
if (fake_authn_credential_cb != nullptr && fake_authn_credential_cb != Py_None)
150+
{
151+
// Increment reference count since we're storing the Python object
152+
Py_INCREF(fake_authn_credential_cb);
153+
154+
// Create a C++ lambda that wraps the Python callback
155+
AuthnCredentialCb cpp_callback =
156+
[fake_authn_credential_cb](bsl::ostream& error)
157+
-> bsl::optional<bmqt::AuthnCredential> {
158+
pybmq::GilAcquireGuard guard;
159+
160+
// Call get_credential_data() method on the Python object
161+
bslma::ManagedPtr<PyObject> result =
162+
RefUtils::toManagedPtr(PyObject_CallMethod(
163+
fake_authn_credential_cb,
164+
"get_credential_data",
165+
nullptr));
166+
167+
if (!result) {
168+
// Python exception occurred
169+
PyErr_Print();
170+
error << "Error calling get_credential_data()";
171+
return bsl::optional<bmqt::AuthnCredential>();
172+
}
173+
174+
if (result.get() == Py_None) {
175+
return bsl::optional<bmqt::AuthnCredential>();
176+
}
177+
178+
// Extract tuple (mechanism, data)
179+
if (!PyTuple_Check(result.get()) || PyTuple_Size(result.get()) != 2) {
180+
error << "get_credential_data() must return (str, bytes) or None";
181+
return bsl::optional<bmqt::AuthnCredential>();
182+
}
183+
184+
PyObject* mechanism_obj = PyTuple_GetItem(result.get(), 0);
185+
PyObject* data_obj = PyTuple_GetItem(result.get(), 1);
186+
187+
if (!PyUnicode_Check(mechanism_obj) || !PyBytes_Check(data_obj)) {
188+
error << "get_credential_data() must return (str, bytes) or None";
189+
return bsl::optional<bmqt::AuthnCredential>();
190+
}
191+
192+
// Convert Python str to C++ string
193+
const char* mechanism_cstr = PyUnicode_AsUTF8(mechanism_obj);
194+
bsl::string mechanism(mechanism_cstr);
195+
196+
// Convert Python bytes to vector<char>
197+
char* data_ptr;
198+
Py_ssize_t data_len;
199+
PyBytes_AsStringAndSize(data_obj, &data_ptr, &data_len);
200+
bsl::vector<char> data(data_ptr, data_ptr + data_len);
201+
202+
// Construct and return AuthnCredential
203+
bmqt::AuthnCredential credential;
204+
credential.setMechanism(mechanism).setData(data);
205+
206+
// Move credential into optional (AuthnCredential is move-only)
207+
bsl::optional<bmqt::AuthnCredential> opt_credential;
208+
opt_credential.emplace(bslmf::MovableRefUtil::move(credential));
209+
return opt_credential;
210+
};
211+
212+
// TODO: Uncomment when setAuthnCredentialCb is available in SessionOptions
213+
// options.setAuthnCredentialCb(cpp_callback);
214+
(void)cpp_callback; // Suppress unused variable warning
215+
}
216+
147217
if (stats_dump_interval != bsls::TimeInterval()) {
148218
options.setStatsDumpInterval(stats_dump_interval);
149219
}
@@ -527,8 +597,8 @@ Session::post(
527597
oss << "Failed to post message to " << queue_uri << " queue: " << post_rc;
528598
throw GenericError(oss.str());
529599
}
530-
// We have a successful post and the SDK now owns the `on_ack` callback object
531-
// so release our reference without a DECREF.
600+
// We have a successful post and the SDK now owns the `on_ack` callback
601+
// object so release our reference without a DECREF.
532602
managed_on_ack.release();
533603
} catch (const GenericError& exc) {
534604
PyErr_SetString(d_error, exc.what());

src/cpp/pybmq_session.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include <bmqa_abstractsession.h>
2323
#include <bmqa_manualhosthealthmonitor.h>
24+
#include <bmqt_authncredential.h>
2425
#include <bmqt_compressionalgorithmtype.h>
2526

2627
#include <bsl_memory.h>
@@ -47,10 +48,15 @@ class Session
4748
Session(const Session&);
4849
Session& operator=(const Session&);
4950

51+
// TODO: this is in sessionoptions.h
52+
typedef bsl::function<bsl::optional<bmqt::AuthnCredential>(bsl::ostream& error)>
53+
AuthnCredentialCb;
54+
5055
public:
5156
Session(PyObject* py_session_event_callback,
5257
PyObject* py_message_event_callback,
5358
PyObject* py_ack_event_callback,
59+
PyObject* fake_authn_credential_cb,
5460
const char* broker_uri,
5561
const char* script_name,
5662
bmqt::CompressionAlgorithmType::Enum message_compression_type,

src/declarations/bmq/bmqt.pxd

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
# limitations under the License.
1515

1616
from libcpp cimport bool
17+
from bsl cimport string
18+
from bsl cimport vector
1719

1820

1921
cdef extern from "bmqt_sessioneventtype.h" namespace "BloombergLP::bmqt::SessionEventType" nogil:
@@ -73,3 +75,11 @@ cdef extern from "bmqt_queueoptions.h" namespace "BloombergLP::bmqt::QueueOption
7375
int k_DEFAULT_MAX_UNCONFIRMED_BYTES
7476
int k_DEFAULT_CONSUMER_PRIORITY
7577
bool k_DEFAULT_SUSPENDS_ON_BAD_HOST_HEALTH
78+
79+
cdef extern from "bmqt_authncredential.h" namespace "BloombergLP::bmqt" nogil:
80+
cdef cppclass AuthnCredential:
81+
AuthnCredential() except +
82+
AuthnCredential& setMechanism(const string&) except +
83+
AuthnCredential& setData(const vector[char]&) except +
84+
const string& mechanism() const
85+
const vector[char]& data() const

src/declarations/pybmq.pxd

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ cdef extern from "pybmq_session.h" namespace "BloombergLP::pybmq" nogil:
3939
Session(object on_session_event,
4040
object on_message_event,
4141
object on_ack_event,
42+
object fake_authn_credential_cb,
4243
const char* broker_uri,
4344
const char* script_name,
4445
CompressionAlgorithmType message_compression_algorithm,

0 commit comments

Comments
 (0)