Skip to content

Commit a1cba2f

Browse files
authored
feat: add Pulsar module (#233)
1 parent 5bb2154 commit a1cba2f

File tree

4 files changed

+251
-0
lines changed

4 files changed

+251
-0
lines changed

.github/workflows/ci.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ jobs:
107107
- uses: taiki-e/install-action@v2
108108
with:
109109
tool: cargo-hack
110+
- name: Install Protoc # for pulsar tests
111+
uses: arduino/setup-protoc@v3
110112
- name: Install the latest Oracle instant client
111113
run: |
112114
curl -Lo basic.zip https://download.oracle.com/otn_software/linux/instantclient/instantclient-basic-linuxx64.zip

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ victoria_metrics = []
5454
zookeeper = []
5555
cockroach_db = []
5656
kwok = []
57+
pulsar = []
5758

5859
[dependencies]
5960
# TODO: update parse-display after MSRV>=1.80.0 bump of `testcontainer-rs` and `testcontainers-modules`
@@ -113,6 +114,7 @@ clickhouse = "0.11.6"
113114
vaultrs = "0.7.2"
114115
openssl-sys = { version = "0.9.103", features = ["vendored"] }
115116
native-tls = { version = "0.2.12", features = ["vendored"] }
117+
pulsar = "6.3"
116118

117119
[[example]]
118120
name = "postgres"

src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,10 @@ pub mod parity_parity;
120120
#[cfg_attr(docsrs, doc(cfg(feature = "postgres")))]
121121
/// **Postgres** (relational database) testcontainer
122122
pub mod postgres;
123+
#[cfg(feature = "pulsar")]
124+
#[cfg_attr(docsrs, doc(cfg(feature = "pulsar")))]
125+
/// **Apache Pulsar** (Cloud-Native, Distributed Messaging and Streaming) testcontainer
126+
pub mod pulsar;
123127
#[cfg(feature = "rabbitmq")]
124128
#[cfg_attr(docsrs, doc(cfg(feature = "rabbitmq")))]
125129
/// **rabbitmq** (message broker) testcontainer

src/pulsar/mod.rs

Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
use std::{borrow::Cow, collections::BTreeMap};
2+
3+
use testcontainers::{
4+
core::{CmdWaitFor, ContainerPort, ContainerState, ExecCommand, Mount, WaitFor},
5+
Image, TestcontainersError,
6+
};
7+
8+
const NAME: &str = "apachepulsar/pulsar";
9+
const TAG: &str = "2.10.6";
10+
11+
const PULSAR_PORT: ContainerPort = ContainerPort::Tcp(6650);
12+
const ADMIN_PORT: ContainerPort = ContainerPort::Tcp(8080);
13+
14+
/// Module to work with [`Apache Pulsar`] inside of tests.
15+
/// **Requires protoc to be installed, otherwise will not build.**
16+
///
17+
/// This module is based on the official [`Apache Pulsar docker image`].
18+
///
19+
/// # Example
20+
/// ```
21+
/// use testcontainers_modules::{pulsar, testcontainers::runners::SyncRunner};
22+
///
23+
/// let pulsar = pulsar::Pulsar::default().start().unwrap();
24+
/// let http_port = pulsar.get_host_port_ipv4(6650).unwrap();
25+
///
26+
/// // do something with the running pulsar instance..
27+
/// ```
28+
///
29+
/// [`Apache Pulsar`]: https://github.com/apache/pulsar
30+
/// [`Apache Pulsar docker image`]: https://hub.docker.com/r/apachepulsar/pulsar/
31+
#[derive(Debug, Clone)]
32+
pub struct Pulsar {
33+
data_mount: Mount,
34+
env: BTreeMap<String, String>,
35+
admin_commands: Vec<Vec<String>>,
36+
}
37+
38+
impl Default for Pulsar {
39+
/**
40+
* Creates new standalone pulsar container, with `/pulsar/data` as a temporary volume
41+
*/
42+
fn default() -> Self {
43+
Self {
44+
data_mount: Mount::tmpfs_mount("/pulsar/data"),
45+
env: BTreeMap::new(),
46+
admin_commands: vec![],
47+
}
48+
}
49+
}
50+
51+
impl Pulsar {
52+
/// Add configuration parameter to Pulsar `conf/standalone.conf` through setting environment variable.
53+
///
54+
/// Container will rewrite `conf/standalone.conf` file using these variables during startup
55+
/// with help of `bin/apply-config-from-env.py` script
56+
pub fn with_config_env(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
57+
self.env
58+
.insert(format!("PULSAR_PREFIX_{}", name.into()), value.into());
59+
self
60+
}
61+
62+
/// Runs admin command after container start
63+
pub fn with_admin_command(
64+
mut self,
65+
command: impl IntoIterator<Item = impl Into<String>>,
66+
) -> Self {
67+
let mut vec: Vec<String> = command.into_iter().map(Into::into).collect();
68+
vec.insert(0, "bin/pulsar-admin".to_string());
69+
self.admin_commands.push(vec);
70+
self
71+
}
72+
73+
/// Creates tenant after container start
74+
pub fn with_tenant(self, tenant: impl Into<String>) -> Self {
75+
let tenant = tenant.into();
76+
self.with_admin_command(["tenants", "create", &tenant])
77+
}
78+
79+
/// Creates namespace after container start
80+
pub fn with_namespace(self, namespace: impl Into<String>) -> Self {
81+
let namespace = namespace.into();
82+
self.with_admin_command(["namespaces", "create", &namespace])
83+
}
84+
85+
/// Creates topic after container start
86+
pub fn with_topic(self, topic: impl Into<String>) -> Self {
87+
let topic = topic.into();
88+
self.with_admin_command(["topics", "create", &topic])
89+
}
90+
}
91+
92+
impl Image for Pulsar {
93+
fn name(&self) -> &str {
94+
NAME
95+
}
96+
97+
fn tag(&self) -> &str {
98+
TAG
99+
}
100+
101+
fn ready_conditions(&self) -> Vec<WaitFor> {
102+
vec![
103+
WaitFor::message_on_stdout("HTTP Service started at"),
104+
WaitFor::message_on_stdout("messaging service is ready"),
105+
]
106+
}
107+
108+
fn mounts(&self) -> impl IntoIterator<Item = &Mount> {
109+
[&self.data_mount]
110+
}
111+
112+
fn env_vars(
113+
&self,
114+
) -> impl IntoIterator<Item = (impl Into<Cow<'_, str>>, impl Into<Cow<'_, str>>)> {
115+
&self.env
116+
}
117+
118+
fn cmd(&self) -> impl IntoIterator<Item = impl Into<Cow<'_, str>>> {
119+
[
120+
"sh",
121+
"-c",
122+
"bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone",
123+
]
124+
}
125+
126+
fn exec_after_start(
127+
&self,
128+
_cs: ContainerState,
129+
) -> Result<Vec<ExecCommand>, TestcontainersError> {
130+
Ok(self
131+
.admin_commands
132+
.iter()
133+
.map(|cmd| ExecCommand::new(cmd).with_cmd_ready_condition(CmdWaitFor::exit_code(0)))
134+
.collect())
135+
}
136+
137+
fn expose_ports(&self) -> &[ContainerPort] {
138+
&[PULSAR_PORT, ADMIN_PORT]
139+
}
140+
}
141+
142+
#[cfg(test)]
143+
mod tests {
144+
use futures::StreamExt;
145+
use pulsar::{
146+
producer::Message, Consumer, DeserializeMessage, Error, Payload, SerializeMessage,
147+
TokioExecutor,
148+
};
149+
use serde::{Deserialize, Serialize};
150+
151+
use super::*;
152+
use crate::testcontainers::runners::AsyncRunner;
153+
154+
#[derive(Serialize, Deserialize)]
155+
struct TestData {
156+
data: String,
157+
}
158+
159+
impl DeserializeMessage for TestData {
160+
type Output = Result<TestData, serde_json::Error>;
161+
162+
fn deserialize_message(payload: &Payload) -> Self::Output {
163+
serde_json::from_slice(&payload.data)
164+
}
165+
}
166+
167+
impl SerializeMessage for TestData {
168+
fn serialize_message(input: Self) -> Result<Message, Error> {
169+
Ok(Message {
170+
payload: serde_json::to_vec(&input).map_err(|e| Error::Custom(e.to_string()))?,
171+
..Default::default()
172+
})
173+
}
174+
}
175+
176+
#[tokio::test]
177+
async fn pulsar_subscribe_and_publish() -> Result<(), Box<dyn std::error::Error + 'static>> {
178+
let topic = "persistent://test/test-ns/test-topic";
179+
180+
let pulsar = Pulsar::default()
181+
.with_tenant("test")
182+
.with_namespace("test/test-ns")
183+
.with_topic(topic)
184+
.start()
185+
.await
186+
.unwrap();
187+
188+
let endpoint = format!(
189+
"pulsar://0.0.0.0:{}",
190+
pulsar.get_host_port_ipv4(6650).await?
191+
);
192+
let client = pulsar::Pulsar::builder(endpoint, TokioExecutor)
193+
.build()
194+
.await?;
195+
196+
let mut consumer: Consumer<TestData, _> =
197+
client.consumer().with_topic(topic).build().await?;
198+
199+
let mut producer = client.producer().with_topic(topic).build().await?;
200+
201+
producer
202+
.send_non_blocking(TestData {
203+
data: "test".to_string(),
204+
})
205+
.await?
206+
.await?;
207+
208+
let data = consumer.next().await.unwrap()?.deserialize()?;
209+
assert_eq!("test", data.data);
210+
211+
Ok(())
212+
}
213+
214+
#[tokio::test]
215+
async fn pulsar_config() -> Result<(), Box<dyn std::error::Error + 'static>> {
216+
let topic = "persistent://test/test-ns/test-topic";
217+
218+
let pulsar = Pulsar::default()
219+
.with_tenant("test")
220+
.with_namespace("test/test-ns")
221+
.with_config_env("allowAutoTopicCreation", "false")
222+
.start()
223+
.await
224+
.unwrap();
225+
226+
let endpoint = format!(
227+
"pulsar://0.0.0.0:{}",
228+
pulsar.get_host_port_ipv4(6650).await?
229+
);
230+
let client = pulsar::Pulsar::builder(endpoint, TokioExecutor)
231+
.build()
232+
.await?;
233+
234+
let producer = client.producer().with_topic(topic).build().await;
235+
236+
match producer {
237+
Ok(_) => panic!("Producer should return error"),
238+
Err(e) => assert_eq!("Connection error: Server error (Some(TopicNotFound)): Topic not found persistent://test/test-ns/test-topic", e.to_string()),
239+
}
240+
241+
Ok(())
242+
}
243+
}

0 commit comments

Comments
 (0)