Skip to content

Commit 1acb99e

Browse files
authored
Merge pull request #2 from Goopil/fix-php-8-2
fix: how we poll the messages to accomodate php 8.2
2 parents 173d1bb + 0e9d906 commit 1acb99e

File tree

3 files changed

+17
-58
lines changed

3 files changed

+17
-58
lines changed

php/tests/tests/Unit/ConnectionTest.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,20 @@ function mq_get_connections(): array {
1717
CURLOPT_USERPWD => $auth,
1818
CURLOPT_HTTPAUTH => CURLAUTH_BASIC,
1919
CURLOPT_TIMEOUT => 3,
20+
CURLOPT_SSL_VERIFYPEER => false,
21+
CURLOPT_SSL_VERIFYHOST => false,
2022
]);
2123
$res = curl_exec($ch);
2224
if ($res === false) {
25+
$error = curl_error($ch);
26+
fwrite(STDERR, "Curl error: " . $error . "\n");
27+
curl_close($ch);
2328
return [];
2429
}
2530
$code = curl_getinfo($ch, CURLINFO_HTTP_CODE);
2631
curl_close($ch);
2732
if ($code !== 200) {
33+
fwrite(STDERR, "HTTP code: " . $code . "\n");
2834
return [];
2935
}
3036
$json = json_decode($res, true);

src/core/channels/channel.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -678,7 +678,8 @@ impl AmqpChannel {
678678
Err(flume::RecvTimeoutError::Disconnected) => break,
679679
}
680680

681-
std::thread::sleep(Duration::from_millis(5));
681+
// Reduce sleep time for better responsiveness
682+
std::thread::sleep(Duration::from_millis(1));
682683
}
683684

684685
// One last non-blocking sweep to catch messages that arrived between the timeout firing

src/php/channel.rs

Lines changed: 9 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -442,62 +442,6 @@ impl PhpChannel {
442442
.flush_ff_publishes()
443443
.map_err(|e| PhpException::default(php_safe(format!("wait flush failed: {e}"))))?;
444444

445-
// Fast path: if messages are already queued locally, drain them without waiting
446-
let want = if max > 0 { max as usize } else { 0 };
447-
if want > 0 {
448-
let batch = self.inner.simple_consume_poll_batch(0, want).map_err(|e| {
449-
PhpException::default(php_safe(format!("wait pre-drain failed: {e}")))
450-
})?;
451-
452-
if !batch.is_empty() {
453-
let mut count: i64 = 0;
454-
if let Some(ref mut st) = *self.auto_qos.write() {
455-
st.record_and_maybe_adjust(batch.len(), |next| {
456-
self.inner.qos(next, false).map_err(|e| {
457-
PhpException::default(php_safe(format!("Auto-QoS adjust failed: {e}")))
458-
})
459-
})?;
460-
}
461-
462-
for delivery in batch {
463-
let tag = delivery.delivery_tag;
464-
let policy = self.inner.consume_policy();
465-
let amqp_delivery = AmqpDelivery::new(delivery.clone(), policy.no_ack);
466-
let cb_guard = self.callback.read();
467-
let cb = cb_guard.as_ref().ok_or_else(|| {
468-
PhpException::default(
469-
"No callback registered. Call simpleConsume(queue, callback) first."
470-
.into(),
471-
)
472-
})?;
473-
match Self::call_php_callback(cb, amqp_delivery) {
474-
Ok(()) => {}
475-
Err(e) => {
476-
if !policy.no_ack {
477-
if policy.reject_on_exception {
478-
let _ = self
479-
.inner
480-
.simple_consume_nack_by_tag(tag, policy.requeue_on_reject);
481-
}
482-
return Err(e);
483-
}
484-
let error_message = format!("{:?}", e).to_lowercase();
485-
if error_message.contains("cannot ack in auto-ack mode")
486-
|| error_message.contains("cannot nack in auto-ack mode")
487-
|| error_message.contains("cannot reject in auto-ack mode")
488-
{
489-
// swallow this specific misuse in auto-ack mode
490-
} else {
491-
return Err(e);
492-
}
493-
}
494-
}
495-
count += 1;
496-
}
497-
return Ok(count);
498-
}
499-
}
500-
501445
let cb_guard = self.callback.read();
502446
let cb = cb_guard.as_ref().ok_or_else(|| {
503447
PhpException::default(
@@ -508,6 +452,7 @@ impl PhpChannel {
508452
let deadline = Instant::now() + Duration::from_millis(timeout_ms as u64);
509453
let mut count: i64 = 0;
510454

455+
// Continue looping until we've received all expected messages or timeout
511456
while count < max {
512457
let now = Instant::now();
513458
if now >= deadline {
@@ -516,12 +461,19 @@ impl PhpChannel {
516461

517462
let remaining_ms = (deadline - now).as_millis() as u64;
518463
let want = (max - count) as usize;
464+
465+
// Use a minimum timeout to ensure we don't miss messages
466+
let effective_timeout = remaining_ms.max(10);
467+
519468
let batch = self
520469
.inner
521-
.simple_consume_poll_batch(remaining_ms, want)
470+
.simple_consume_poll_batch(effective_timeout, want)
522471
.map_err(|e| PhpException::default(php_safe(format!("wait failed: {e}"))))?;
523472

524473
if batch.is_empty() {
474+
// Even if we got no messages, continue looping until timeout
475+
// This gives more time for messages to arrive
476+
std::thread::sleep(Duration::from_millis(5));
525477
continue;
526478
}
527479

0 commit comments

Comments
 (0)