Skip to content

Commit f413611

Browse files
Merge branch 'master' into fix/producer-small-futures
2 parents e6044e6 + d1a2df8 commit f413611

File tree

4 files changed

+4
-47
lines changed

4 files changed

+4
-47
lines changed

src/consumer/engine.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ impl<Exe: Executor> ConsumerEngine<Exe> {
110110

111111
let send_end_res = event_tx.send(mapper(None)).await;
112112
if let Err(err) = send_end_res {
113-
log::error!("Error sending end event to channel - {err}");
113+
log::debug!("Error sending close event to channel - {err}");
114114
}
115115

116116
log::warn!("rx terminated");

src/lib.rs

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -252,49 +252,6 @@ mod tests {
252252
}
253253
}
254254

255-
#[derive(Debug)]
256-
enum Error {
257-
Pulsar(PulsarError),
258-
Timeout(std::io::Error),
259-
Serde(serde_json::Error),
260-
Utf8(std::string::FromUtf8Error),
261-
}
262-
263-
impl From<std::io::Error> for Error {
264-
fn from(e: std::io::Error) -> Self {
265-
Error::Timeout(e)
266-
}
267-
}
268-
269-
impl From<PulsarError> for Error {
270-
fn from(e: PulsarError) -> Self {
271-
Error::Pulsar(e)
272-
}
273-
}
274-
275-
impl From<serde_json::Error> for Error {
276-
fn from(e: serde_json::Error) -> Self {
277-
Error::Serde(e)
278-
}
279-
}
280-
281-
impl From<std::string::FromUtf8Error> for Error {
282-
fn from(err: std::string::FromUtf8Error) -> Self {
283-
Error::Utf8(err)
284-
}
285-
}
286-
287-
impl std::fmt::Display for Error {
288-
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
289-
match self {
290-
Error::Pulsar(e) => write!(f, "{e}"),
291-
Error::Timeout(e) => write!(f, "{e}"),
292-
Error::Serde(e) => write!(f, "{e}"),
293-
Error::Utf8(e) => write!(f, "{e}"),
294-
}
295-
}
296-
}
297-
298255
pub struct SimpleLogger {
299256
pub tag: &'static str,
300257
}

src/message.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,7 @@ struct CommandFrame<'a> {
447447
}
448448

449449
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
450-
fn command_frame(i: &[u8]) -> IResult<&[u8], CommandFrame> {
450+
fn command_frame<'a>(i: &'a [u8]) -> IResult<&'a [u8], CommandFrame<'a>> {
451451
let (i, total_size) = be_u32(i)?;
452452
let (i, command_size) = be_u32(i)?;
453453
let (i, command) = take(command_size)(i)?;
@@ -473,7 +473,7 @@ struct PayloadFrame<'a> {
473473
}
474474

475475
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
476-
fn payload_frame(i: &[u8]) -> IResult<&[u8], PayloadFrame> {
476+
fn payload_frame<'a>(i: &'a [u8]) -> IResult<&'a [u8], PayloadFrame<'a>> {
477477
let (i, magic_number) = be_u16(i)?;
478478
let (i, checksum) = be_u32(i)?;
479479
let (i, metadata_size) = be_u32(i)?;

src/producer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ impl<Exe: Executor> Producer<Exe> {
341341
///
342342
/// the created message will ber sent by this producer in [MessageBuilder::send]
343343
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
344-
pub fn create_message(&mut self) -> MessageBuilder<(), Exe> {
344+
pub fn create_message<'a>(&'a mut self) -> MessageBuilder<'a, (), Exe> {
345345
MessageBuilder::new(self)
346346
}
347347

0 commit comments

Comments
 (0)