1515
1616#include < pybmq_session.h>
1717
18+ #include < pybmq_gilacquireguard.h>
1819#include < pybmq_gilreleaseguard.h>
1920#include < pybmq_messageutils.h>
2021#include < pybmq_mocksession.h>
@@ -77,13 +78,14 @@ Session::Session(
7778 PyObject* py_session_event_callback,
7879 PyObject* py_message_event_callback,
7980 PyObject* py_ack_event_callback,
81+ PyObject* fake_authn_credential_cb,
8082 const char * broker_uri,
8183 const char * script_name,
8284 bmqt::CompressionAlgorithmType::Enum message_compression_type,
8385 bsl::optional<int > num_processing_threads,
8486 bsl::optional<int > blob_buffer_size,
8587 bsl::optional<int > channel_high_watermark,
86- bsl::optional<bsl::pair<int , int > > event_queue_watermarks,
88+ bsl::optional<bsl::pair<int , int >> event_queue_watermarks,
8789 const bsls::TimeInterval& stats_dump_interval,
8890 const bsls::TimeInterval& connect_timeout,
8991 const bsls::TimeInterval& disconnect_timeout,
@@ -119,6 +121,74 @@ Session::Session(
119121 }
120122
121123 d_message_compression_type = message_compression_type;
124+
125+ AuthnCredentialCb cpp_callback;
126+ bool has_auth_callback = false ;
127+
128+ if (fake_authn_credential_cb != nullptr && fake_authn_credential_cb != Py_None) {
129+ // Increment reference count since we're storing the Python object
130+ Py_INCREF (fake_authn_credential_cb);
131+ has_auth_callback = true ;
132+
133+ // Create a C++ lambda that wraps the Python callback
134+ cpp_callback =
135+ [fake_authn_credential_cb](
136+ bsl::ostream& error) -> bsl::optional<bmqt::AuthnCredential> {
137+ pybmq::GilAcquireGuard guard;
138+
139+ // Call get_credential_data() method on the Python object
140+ bslma::ManagedPtr<PyObject> result =
141+ RefUtils::toManagedPtr (PyObject_CallMethod (
142+ fake_authn_credential_cb,
143+ " get_credential_data" ,
144+ nullptr ));
145+
146+ if (!result) {
147+ // Python exception occurred
148+ PyErr_Print ();
149+ error << " Error calling get_credential_data()" ;
150+ return bsl::optional<bmqt::AuthnCredential>();
151+ }
152+
153+ if (result.get () == Py_None) {
154+ return bsl::optional<bmqt::AuthnCredential>();
155+ }
156+
157+ // Extract tuple (mechanism, data)
158+ if (!PyTuple_Check (result.get ()) || PyTuple_Size (result.get ()) != 2 ) {
159+ error << " get_credential_data() must return (str, bytes) or None" ;
160+ return bsl::optional<bmqt::AuthnCredential>();
161+ }
162+
163+ PyObject* mechanism_obj = PyTuple_GetItem (result.get (), 0 );
164+ PyObject* data_obj = PyTuple_GetItem (result.get (), 1 );
165+
166+ if (!PyUnicode_Check (mechanism_obj) || !PyBytes_Check (data_obj)) {
167+ error << " get_credential_data() must return (str, bytes) or None" ;
168+ return bsl::optional<bmqt::AuthnCredential>();
169+ }
170+
171+ // Convert Python str to C++ string
172+ const char * mechanism_cstr = PyUnicode_AsUTF8 (mechanism_obj);
173+ bsl::string mechanism (mechanism_cstr);
174+
175+ // Convert Python bytes to vector<char>
176+ char * data_ptr;
177+ Py_ssize_t data_len;
178+ PyBytes_AsStringAndSize (data_obj, &data_ptr, &data_len);
179+ bsl::vector<char > data (data_ptr, data_ptr + data_len);
180+
181+ // Construct and return AuthnCredential
182+ bmqt::AuthnCredential credential;
183+ credential.setMechanism (mechanism).setData (data);
184+
185+ // Move credential into optional (AuthnCredential is move-only)
186+ bsl::optional<bmqt::AuthnCredential> opt_credential;
187+ opt_credential.emplace (bslmf::MovableRefUtil::move (credential));
188+ return opt_credential;
189+ };
190+ }
191+
122192 {
123193 pybmq::GilReleaseGuard guard;
124194 bmqt::SessionOptions options;
@@ -144,6 +214,11 @@ Session::Session(
144214 event_queue_watermarks.value ().second );
145215 }
146216
217+ if (has_auth_callback) {
218+ // TODO: This will only compile with setAuthnCredentialCb in SessionOptions
219+ options.setAuthnCredentialCb (cpp_callback);
220+ }
221+
147222 if (stats_dump_interval != bsls::TimeInterval ()) {
148223 options.setStatsDumpInterval (stats_dump_interval);
149224 }
@@ -527,8 +602,8 @@ Session::post(
527602 oss << " Failed to post message to " << queue_uri << " queue: " << post_rc;
528603 throw GenericError (oss.str ());
529604 }
530- // We have a successful post and the SDK now owns the `on_ack` callback object
531- // so release our reference without a DECREF.
605+ // We have a successful post and the SDK now owns the `on_ack` callback
606+ // object so release our reference without a DECREF.
532607 managed_on_ack.release ();
533608 } catch (const GenericError& exc) {
534609 PyErr_SetString (d_error, exc.what ());
0 commit comments