Skip to content

Commit 0859d7f

Browse files
committed
Proxy requests for kafka inside docker to support parallel kafka tests
1 parent 3c84a88 commit 0859d7f

File tree

2 files changed

+93
-21
lines changed

2 files changed

+93
-21
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ async-trait = "0.1.81"
3131
bytesize = "1.2.0"
3232
compact_str = "0.7.1"
3333
hashlink = "0.8.3"
34-
rdkafka = "0.34.0"
35-
tokio = {version = "1.30.0", features = ["full"]}
34+
rdkafka = "0.37.0"
35+
tokio = {version = "1.45", features = ["full"]}
3636
uriparse = "0.6.4"
3737
tonic = { workspace = true }
3838
prost = { workspace = true }

src/log/kafka.rs

Lines changed: 91 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -327,40 +327,112 @@ impl LogFactory for KafkaLogFactory {
327327

328328
#[cfg(test)]
329329
mod tests {
330+
use std::net::TcpListener;
331+
332+
use ignore_result::Ignore;
330333
use speculoos::*;
331334
use testcontainers::clients::Cli as DockerCli;
332-
use testcontainers::core::{Container, Port, RunnableImage, WaitFor};
335+
use testcontainers::core::{Container, RunnableImage, WaitFor};
333336
use testcontainers::images::generic::GenericImage;
334337

335338
use super::*;
336339

337-
fn kafka_image() -> RunnableImage<GenericImage> {
340+
fn kafka_image(network: String) -> RunnableImage<GenericImage> {
338341
let image = GenericImage::new("apache/kafka", "3.9.1")
339342
.with_wait_for(WaitFor::StdOutMessage { message: "Kafka Server started".into() });
343+
RunnableImage::from(image).with_network(network)
344+
}
340345

341-
// I can't find a way to connect to random mapping port for Kafka.
342-
//
343-
// Kafka registers listening port to cluster, and advertise it to client. So client will receive the advertised
344-
// port for connection. And it is bound in configuration. See also:
345-
//
346-
// * https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
347-
// * https://stackoverflow.com/questions/59343783/run-kafka-using-docker-compose-and-expose-a-different-port-instead-of-default-on
348-
RunnableImage::from(image).with_mapped_port(Port { local: 9092, internal: 9092 })
346+
fn kafka_proxy_image(listener_port: u16, advertised_port: u16) -> RunnableImage<GenericImage> {
347+
let image = GenericImage::new("grepplabs/kafka-proxy", "0.3.12")
348+
.with_wait_for(WaitFor::StdErrMessage { message: "Ready for new connections".into() })
349+
.with_exposed_port(listener_port);
350+
let args = vec![
351+
"server".into(),
352+
"--bootstrap-server-mapping".into(),
353+
format!("localhost:9092,0.0.0.0:{listener_port},127.0.0.1:{advertised_port}"),
354+
];
355+
RunnableImage::from((image, args))
356+
}
357+
358+
struct KafkaProxyContainer {
359+
advertised_port: u16,
360+
container: Container<'static, GenericImage>,
361+
_port_forwarder: asyncs::task::TaskHandle<()>,
349362
}
350363

351-
fn kafka_container() -> Container<'static, GenericImage> {
352-
let docker = DockerCli::default();
353-
let kafka = docker.run(kafka_image());
354-
tracing::debug!("kafka container started, listen on host port {}", kafka.get_host_port_ipv4(9092));
355-
unsafe { std::mem::transmute(kafka) }
364+
impl KafkaProxyContainer {
365+
const CONTAINER_PORT: u16 = 2222;
366+
367+
pub fn new() -> Self {
368+
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
369+
let advertised_port = listener.local_addr().unwrap().port();
370+
371+
let docker = DockerCli::default();
372+
let container = docker.run(kafka_proxy_image(Self::CONTAINER_PORT, advertised_port));
373+
let listener_port = container.get_host_port_ipv4(Self::CONTAINER_PORT);
374+
let _port_forwarder = asyncs::spawn(async move {
375+
listener.set_nonblocking(true).unwrap();
376+
let listener = tokio::net::TcpListener::from_std(listener).unwrap();
377+
loop {
378+
let (mut source, _) = listener.accept().await.unwrap();
379+
asyncs::spawn(async move {
380+
let mut target =
381+
tokio::net::TcpStream::connect(format!("127.0.0.1:{listener_port}")).await.unwrap();
382+
tokio::io::copy_bidirectional(&mut source, &mut target).await.ignore();
383+
});
384+
}
385+
})
386+
.attach();
387+
let container = unsafe { std::mem::transmute(container) };
388+
Self { advertised_port, container, _port_forwarder }
389+
}
390+
391+
pub fn get_listen_port(&self) -> u16 {
392+
self.container.get_host_port_ipv4(Self::CONTAINER_PORT)
393+
}
394+
395+
pub fn get_advertised_port(&self) -> u16 {
396+
self.advertised_port
397+
}
398+
399+
pub fn network(&self) -> String {
400+
format!("container:{}", self.container.id())
401+
}
402+
}
403+
404+
pub struct KafkaContainer {
405+
_kafka: Container<'static, GenericImage>,
406+
proxy: KafkaProxyContainer,
407+
}
408+
409+
impl KafkaContainer {
410+
pub fn new() -> Self {
411+
let proxy = KafkaProxyContainer::new();
412+
let docker = DockerCli::default();
413+
let kafka = docker.run(kafka_image(proxy.network()));
414+
tracing::debug!(
415+
"kafka container started, listen on {}, advertised at port {}",
416+
proxy.get_listen_port(),
417+
proxy.get_advertised_port()
418+
);
419+
let kafka = unsafe { std::mem::transmute(kafka) };
420+
Self { _kafka: kafka, proxy }
421+
}
422+
423+
pub fn get_host_port(&self) -> u16 {
424+
self.proxy.get_listen_port()
425+
}
356426
}
357427

358-
#[test_log::test(tokio::test)]
359-
#[serial_test::serial("kafka_9092")]
428+
// There are request timeouts in case of current_thread scheduler.
429+
// I guess there are blocking operations in async context. But after
430+
// bumping both tokio and rdkafka, the phenomenon still exists.
431+
#[test_log::test(tokio::test(flavor = "multi_thread"))]
360432
#[tracing_test::traced_test]
361433
async fn test_kafka_basic() {
362-
let kafka = kafka_container();
363-
let server = format!("kafka://127.0.0.1:{}/logs", kafka.get_host_port_ipv4(9092));
434+
let kafka = KafkaContainer::new();
435+
let server = format!("kafka://127.0.0.1:{}/logs", kafka.get_host_port());
364436

365437
let uri: OwnedServiceUri = server.try_into().unwrap();
366438
let factory = KafkaLogFactory::default();

0 commit comments

Comments
 (0)