@@ -135,40 +135,32 @@ pub struct Message {
135
135
136
136
impl Message {
137
137
// checks if message (with a column name) is valid (i.e. the column name is present in the schema)
138
- pub fn valid ( & self , schema : & Schema , column : Option < & str > ) -> bool {
139
- if let Some ( col) = column {
140
- return get_field ( & schema. fields , col) . is_some ( ) ;
141
- }
142
- true
138
+ pub fn valid ( & self , schema : & Schema , column : & str ) -> bool {
139
+ return get_field ( & schema. fields , column) . is_some ( ) ;
143
140
}
144
141
145
- pub fn extract_column_name ( & self ) -> Option < & str > {
146
- let re = Regex :: new ( r"\{(.*?)\}" ) . unwrap ( ) ;
147
- let tokens: Vec < & str > = re
142
+ pub fn extract_column_names ( & self ) -> Vec < & str > {
143
+ // the message can have either no column name ({column_name} not present) or any number of {column_name} present
144
+ Regex :: new ( r"\{(.*?)\}" )
145
+ . unwrap ( )
148
146
. captures_iter ( self . message . as_str ( ) )
149
147
. map ( |cap| cap. get ( 1 ) . unwrap ( ) . as_str ( ) )
150
- . collect ( ) ;
151
- // the message can have either no column name ({column_name} not present) or one column name
152
- // return Some only if there is exactly one column name present
153
- if tokens. len ( ) == 1 {
154
- return Some ( tokens[ 0 ] ) ;
155
- }
156
- None
148
+ . collect ( )
157
149
}
158
150
159
- // returns the message with the column name replaced with the value of the column
151
+ /// Returns the message with the column names replaced with the values in the column.
160
152
fn get ( & self , event : RecordBatch ) -> String {
161
- if let Some ( column) = self . extract_column_name ( ) {
153
+ let mut replace_message = self . message . clone ( ) ;
154
+ for column in self . extract_column_names ( ) {
162
155
if let Some ( value) = event. column_by_name ( column) {
163
156
let arr = cast ( value, & DataType :: Utf8 ) . unwrap ( ) ;
164
157
let value = as_string_array ( & arr) . value ( 0 ) ;
165
158
166
- return self
167
- . message
168
- . replace ( & format ! ( "{{{column}}}" ) , value. to_string ( ) . as_str ( ) ) ;
159
+ replace_message =
160
+ replace_message. replace ( & format ! ( "{{{column}}}" ) , value. to_string ( ) . as_str ( ) ) ;
169
161
}
170
162
}
171
- self . message . clone ( )
163
+ replace_message
172
164
}
173
165
}
174
166
0 commit comments