@@ -246,6 +246,42 @@ impl<T> Message<T> {
246246 }
247247 }
248248
249+ /// Consumes the message, returns an empty message with committables and the payload without
250+ /// copying
251+ /// # Examples:
252+ /// ```
253+ /// use std::collections::BTreeMap;
254+ /// use sentry_arroyo::types::Message;
255+ /// use sentry_arroyo::types::{Partition, Topic};
256+ ///
257+ /// // Create the message with the committable structure we want to preserve
258+ /// let topic = Topic::new("test");
259+ /// let part = Partition { topic, index: 10 };
260+ /// let committable: BTreeMap<Partition, u64> = vec![
261+ /// (part, 42069)
262+ /// ].into_iter().collect();
263+ /// let message = Message::new_any_message("my_payload".to_string(), committable);
264+ ///
265+ /// fn transform_msg(msg: Message<String>) -> Message<usize> {
266+ /// // transform_msg takes ownership of the Message object
267+ /// let (empty_message_with_commitable, payload) = msg.take();
268+ /// empty_message_with_commitable.replace(payload.len())
269+ /// }
270+ ///
271+ /// let transformed_msg = transform_msg(message);
272+ /// ```
273+ pub fn take ( self ) -> ( Message < ( ) > , T ) {
274+ match self . inner_message {
275+ InnerMessage :: BrokerMessage ( bm) => (
276+ Message :: new_broker_message ( ( ) , bm. partition , bm. offset , bm. timestamp ) ,
277+ bm. payload ,
278+ ) ,
279+ InnerMessage :: AnyMessage ( am) => {
280+ ( Message :: new_any_message ( ( ) , am. committable ) , am. payload )
281+ }
282+ }
283+ }
284+
249285 /// Returns an iterator over this message's committable offsets.
250286 pub fn committable ( & self ) -> Committable {
251287 match & self . inner_message {
@@ -362,8 +398,9 @@ impl Iterator for Committable<'_> {
362398
363399#[ cfg( test) ]
364400mod tests {
365- use super :: { BrokerMessage , Partition , Topic } ;
401+ use super :: { BrokerMessage , InnerMessage , Message , Partition , Topic } ;
366402 use chrono:: Utc ;
403+ use std:: collections:: BTreeMap ;
367404
368405 #[ test]
369406 fn message ( ) {
@@ -379,6 +416,42 @@ mod tests {
379416 assert_eq ! ( message. timestamp, now) ;
380417 }
381418
419+ #[ test]
420+ fn broker_message_take ( ) {
421+ let now = Utc :: now ( ) ;
422+ let topic = Topic :: new ( "test" ) ;
423+ let part = Partition { topic, index : 10 } ;
424+ let committable: BTreeMap < Partition , u64 > = vec ! [ ( part, 42069 ) ] . into_iter ( ) . collect ( ) ;
425+
426+ let b_message = Message :: new_broker_message ( "payload" . to_string ( ) , part, 10 , now) ;
427+ let a_message = Message :: new_any_message ( "payload" . to_string ( ) , committable. clone ( ) ) ;
428+
429+ // need something to take ownership of the message
430+ let transform_func = |bm : Message < String > | -> Message < usize > {
431+ let ( empty_message, payload) = bm. take ( ) ;
432+ return empty_message. replace ( payload. len ( ) ) ;
433+ } ;
434+ let validate_msg = |msg : Message < usize > | -> ( ) {
435+ match msg. inner_message {
436+ InnerMessage :: BrokerMessage ( bm) => {
437+ assert_eq ! ( bm. offset, 10 ) ;
438+ assert_eq ! ( bm. partition, part) ;
439+ }
440+ InnerMessage :: AnyMessage ( am) => {
441+ assert_eq ! ( am. committable, committable) ;
442+ }
443+ }
444+ } ;
445+
446+ let transformed_broker_msg = transform_func ( b_message) ;
447+ let transformed_any_msg = transform_func ( a_message) ;
448+ assert_eq ! ( transformed_any_msg. payload( ) . clone( ) , "payload" . len( ) ) ;
449+ assert_eq ! ( transformed_broker_msg. payload( ) . clone( ) , "payload" . len( ) ) ;
450+
451+ validate_msg ( transformed_broker_msg) ;
452+ validate_msg ( transformed_any_msg) ;
453+ }
454+
382455 #[ test]
383456 fn fmt_display ( ) {
384457 let now = Utc :: now ( ) ;
0 commit comments