Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
a238463
Basic containers.rs test module. Basic KafkaContex.
j7nw4r Oct 2, 2025
2351d8f
Get bootstrap servers.
j7nw4r Oct 2, 2025
ea554b7
Basic test for KafkaContext based on TestContainers.
j7nw4r Oct 2, 2025
b4a0ac5
Some test functionality organization.
j7nw4r Oct 2, 2025
bef9136
Basic produce.
j7nw4r Oct 2, 2025
1cf7042
Various updates. KafkaContext is now shared.
j7nw4r Oct 10, 2025
92711d4
admin. test topic creation.
j7nw4r Oct 10, 2025
f16e83b
basic producer test works.
j7nw4r Oct 11, 2025
b6d9bc9
clear up
j7nw4r Oct 11, 2025
180ec7e
test_topic_create_and_delete
j7nw4r Oct 11, 2025
c06b57e
fix: test_incorect_replication_factors_are_ignored
j7nw4r Oct 12, 2025
87f159b
fix: test_incorrect_replication_factors_are_ignored_when_creating_par…
j7nw4r Oct 12, 2025
722deb2
fix: test_mixed_success_results
j7nw4r Oct 12, 2025
e9d5812
fix: test commenting
j7nw4r Oct 12, 2025
cab0b48
fix: test_delete_records
j7nw4r Oct 12, 2025
50528fb
fix: test_configs
j7nw4r Oct 12, 2025
bc12604
fix: test_delete_records
j7nw4r Oct 12, 2025
89ba998
fix: test_consumer_groups_deletion and test_delete_unknown_group
j7nw4r Oct 13, 2025
74c1ec0
fix: test_consumer_group_action_mixed_results
j7nw4r Oct 13, 2025
8c9ab9b
fix: remove test_admin.rs
j7nw4r Oct 13, 2025
5684312
fix: test_produce_consume_base_assign
j7nw4r Oct 17, 2025
6bc6c75
fix: test_produce_consume_base_unassign
j7nw4r Oct 17, 2025
e010414
test_produce_consume_base_incremental_assign_and_unassign
j7nw4r Oct 17, 2025
1ab92a2
stream consumer tests.
j7nw4r Oct 17, 2025
7f501ee
future_producer tests
j7nw4r Oct 17, 2025
3ee3551
More test updates.
j7nw4r Oct 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,891 changes: 1,793 additions & 98 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ rand = "0.9.1"
regex = "1.11.1"
smol = "2.0.2"
tokio = { version = "1.18", features = ["macros", "rt-multi-thread", "time"] }
testcontainers-modules = { version = "0.13.0", features = ["kafka"] }
anyhow = { version = "1.0.100" }

# These features are re-exports of the features that the rdkafka-sys crate
# provides. See the rdkafka-sys documentation for details.
Expand Down
15 changes: 15 additions & 0 deletions src/consumer/base_consumer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Low-level consumers.

use std::ffi::{CStr, CString};
use std::fmt;
use std::mem::ManuallyDrop;
use std::os::raw::c_void;
use std::ptr;
Expand Down Expand Up @@ -41,6 +42,20 @@ where
nonempty_callback: Option<Box<Box<dyn Fn() + Send + Sync>>>,
}

impl<C> fmt::Debug for BaseConsumer<C>
where
C: ConsumerContext,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BaseConsumer")
.field("native_ptr", &self.client.native_ptr())
.field("queue", &self.queue)
.field("group_id", &self.group_id)
.field("has_nonempty_callback", &self.nonempty_callback.is_some())
.finish()
}
}

impl FromClientConfig for BaseConsumer {
fn from_config(config: &ClientConfig) -> KafkaResult<BaseConsumer> {
BaseConsumer::from_config_and_context(config, DefaultConsumerContext)
Expand Down
14 changes: 14 additions & 0 deletions src/producer/base_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
//! should wait and try again.

use std::ffi::{CStr, CString};
use std::fmt;
use std::marker::PhantomData;
use std::mem;
use std::os::raw::c_void;
Expand Down Expand Up @@ -340,6 +341,19 @@ where
_partitioner: PhantomData<Part>,
}

impl<C, Part> fmt::Debug for BaseProducer<C, Part>
where
Part: Partitioner,
C: ProducerContext<Part>,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BaseProducer")
.field("native_ptr", &self.native_ptr())
.field("queue", &self.queue)
.finish()
}
}

impl<C, Part> BaseProducer<C, Part>
where
Part: Partitioner,
Expand Down
Loading
Loading