@@ -11,7 +11,7 @@ use crate::message_queue::{MessageQueues, QueueMessage};
1111use crate :: mysql:: { Decoder , MySQLConnection } ;
1212use std:: sync:: mpsc:: { channel, Sender , Receiver } ;
1313use nom:: AsBytes ;
14- use crate :: binlog:: { DeleteRowEvent , EventRaw , TableMap , TableMapEvent , UpdateRowEvent , WriteRowEvent } ;
14+ use crate :: binlog:: { ColMeta , DeleteRowEvent , EventRaw , TableMap , TableMapEvent , UpdateRowEvent , WriteRowEvent } ;
1515
1616fn current_ms_ts ( ) -> u128 {
1717 let now = SystemTime :: now ( ) ;
@@ -105,60 +105,188 @@ pub struct DmlMessage {
105105}
106106
107107impl DmlMessage {
108+ fn format_val ( val : & Value ) -> String {
109+ if val. is_null ( ) {
110+ "null" . to_string ( )
111+ } else {
112+ if val. is_string ( ) {
113+ format ! ( "{}" , val. to_string( ) )
114+ } else {
115+ format ! ( "\" {}\" " , val. to_string( ) )
116+ }
117+ }
118+ }
119+
120+ fn format_json ( & mut self , fields : & mut Vec < FieldMeta > ) -> String {
121+ let mut buffer: Vec < String > = Vec :: new ( ) ;
122+ buffer. push ( "{" . to_string ( ) ) ;
123+ buffer. push ( "\" data\" :[" . to_string ( ) ) ;
124+ let data_count = & self . data . len ( ) ;
125+ for ( i, record) in self . data . iter_mut ( ) . enumerate ( ) {
126+ buffer. push ( "{" . to_string ( ) ) ;
127+ let mut key_buffer: Vec < String > = Vec :: new ( ) ;
128+ for ( idx, meta) in fields. iter ( ) . enumerate ( ) {
129+ let val = record. get ( & meta. name ) . unwrap ( ) ;
130+ let data_value = format ! ( "\" {}\" :{}" , & meta. name, Self :: format_val( val) ) ;
131+ key_buffer. push ( data_value) ;
132+ }
133+ let dict_inner = key_buffer. join ( "," ) ;
134+ buffer. push ( dict_inner) ;
135+ if i != ( * data_count - 1usize ) {
136+ buffer. push ( "}," . to_string ( ) ) ;
137+ } else {
138+ buffer. push ( "}" . to_string ( ) ) ;
139+ }
140+ }
141+ buffer. push ( "]," . to_string ( ) ) ;
142+ buffer. push ( format ! ( "\" database\" :\" {}\" ," , & self . database) ) ;
143+ buffer. push ( format ! ( "\" es\" :{}," , self . es) ) ;
144+ buffer. push ( format ! ( "\" id\" :{}," , self . id) ) ;
145+ buffer. push ( "\" isDdl\" : false," . to_string ( ) ) ;
146+ buffer. push ( "\" mysqlType\" :{" . to_string ( ) ) ;
147+ for ( idx, meta) in fields. iter ( ) . enumerate ( ) {
148+ let tp = self . mysqlType . get ( & meta. name ) . unwrap ( ) ;
149+ if idx != ( fields. len ( ) - 1usize ) {
150+ buffer. push ( format ! ( "\" {}\" :\" {}\" ," , meta. name, tp) ) ;
151+ } else {
152+ buffer. push ( format ! ( "\" {}\" :\" {}\" " , meta. name, tp) ) ;
153+ }
154+ }
155+ buffer. push ( "}," . to_string ( ) ) ;
156+ if self . old . is_some ( ) {
157+ buffer. push ( "\" old\" :[" . to_string ( ) ) ;
158+ let old_count = self . old . clone ( ) . unwrap ( ) . len ( ) ;
159+ for ( i, record) in self . old . clone ( ) . unwrap ( ) . iter ( ) . enumerate ( ) {
160+ buffer. push ( "{" . to_string ( ) ) ;
161+ let mut key_buffer: Vec < String > = Vec :: new ( ) ;
162+ for ( idx, meta) in fields. iter ( ) . enumerate ( ) {
163+ let val = & record. get :: < String > ( & meta. name ) ;
164+ if let Some ( val_ok) = val {
165+ let val_str = Self :: format_val ( val_ok) ;
166+ key_buffer. push ( format ! ( "\" {}\" :{}" , meta. name, val_str) ) ;
167+ }
168+ }
169+ let inner_str = key_buffer. join ( "," ) ;
170+ buffer. push ( inner_str) ;
171+ if i != ( old_count - 1usize ) {
172+ buffer. push ( "}," . to_string ( ) ) ;
173+ } else {
174+ buffer. push ( "}" . to_string ( ) ) ;
175+ }
176+ }
177+ buffer. push ( "]," . to_string ( ) ) ;
178+ }
179+ if self . pkNames . is_none ( ) {
180+ buffer. push ( "\" pkNames\" :null," . to_string ( ) ) ;
181+ } else {
182+ buffer. push ( "\" pkNames\" :[" . to_string ( ) ) ;
183+ for ( idx, name) in self . pkNames . clone ( ) . unwrap ( ) . iter ( ) . enumerate ( ) {
184+ if idx != ( self . pkNames . clone ( ) . unwrap ( ) . len ( ) - 1usize ) {
185+ buffer. push ( format ! ( "\" {name}\" ," ) ) ;
186+ } else {
187+ buffer. push ( format ! ( "\" {name}\" " ) ) ;
188+ }
189+ }
190+ buffer. push ( "]," . to_string ( ) )
191+ }
192+ buffer. push ( "\" sql\" :\" \" ," . to_string ( ) ) ;
193+ buffer. push ( "\" sqlType\" :{" . to_string ( ) ) ;
194+ for ( idx, meta) in fields. iter ( ) . enumerate ( ) {
195+ let sqlType = self . sqlType . get :: < String > ( & meta. name ) . unwrap ( ) ;
196+ if idx!= ( fields. len ( ) - 1usize ) {
197+ buffer. push ( format ! ( "\" {}\" :{}," , meta. name, sqlType) ) ;
198+ } else {
199+ buffer. push ( format ! ( "\" {}\" :{}" , meta. name, sqlType) ) ;
200+ }
201+ }
202+ buffer. push ( "}," . to_string ( ) ) ;
203+ buffer. push ( format ! ( "\" table\" :\" {}\" ," , self . table) ) ;
204+ buffer. push ( format ! ( "\" ts\" :{}," , self . ts) ) ;
205+ buffer. push ( format ! ( "\" type\" :\" {}\" " , & self . r#type) ) ;
206+ buffer. push ( "}" . to_string ( ) ) ;
207+ buffer. join ( "" )
208+ }
209+
210+ fn text_field_data ( val : & Value ) -> String {
211+ match val. as_array ( ) {
212+ Some ( s) =>{ String :: from_utf8_lossy ( s. iter ( ) . map ( |n| n. as_u64 ( ) . unwrap ( ) as u8 ) . collect :: < Vec < u8 > > ( ) . as_slice ( ) ) . to_string ( ) } ,
213+ None =>String :: from ( "" )
214+ }
215+ }
216+
217+ fn blob_field_data ( val : & Value ) -> String {
218+ match val. as_array ( ) {
219+ Some ( s) =>{ String :: from_utf16 ( s. iter ( ) . map ( |n| n. as_u64 ( ) . unwrap ( ) as u16 ) . collect :: < Vec < u16 > > ( ) . as_slice ( ) ) . unwrap ( ) } ,
220+ None =>String :: from ( "" )
221+ }
222+ }
223+
108224 fn from_dml ( mut dml : DmlData , fields : & mut Vec < FieldMeta > ) -> Self {
109225 let mut ins = Self :: new ( dml. id , dml. database , dml. table , dml. dml_type , dml. es ) ;
110226 let mut pks: Vec < String > = Vec :: new ( ) ;
111-
112- for vals in dml. data . iter_mut ( ) {
113- let mut record: HashMap < String , Value > = HashMap :: new ( ) ;
114- for ( idx, val) in vals. iter ( ) . enumerate ( ) {
115- if let Some ( meta) = fields. get_mut ( idx) {
116- ins. mysqlType . insert ( meta. name . clone ( ) , meta. field_type . clone ( ) ) ;
117- let sql_tp = meta. get_sql_type ( ) ;
118- ins. sqlType . insert ( meta. name . clone ( ) , sql_tp) ;
119- if sql_tp == 2005 {
120- let val_s = match val. as_array ( ) {
121- Some ( s) =>{ String :: from_utf8_lossy ( s. iter ( ) . map ( |n| n. as_u64 ( ) . unwrap ( ) as u8 ) . collect :: < Vec < u8 > > ( ) . as_slice ( ) ) . to_string ( ) } ,
122- None =>String :: from ( "" )
123- } ;
124- record. insert ( meta. name . clone ( ) , Value :: from ( val_s) ) ;
125- } else {
126- if sql_tp == 2004 {
127- let val_s = match val. as_array ( ) {
128- Some ( s) =>{ String :: from_utf16 ( s. iter ( ) . map ( |n| n. as_u64 ( ) . unwrap ( ) as u16 ) . collect :: < Vec < u16 > > ( ) . as_slice ( ) ) . unwrap ( ) } ,
129- None =>String :: from ( "" )
130- } ;
131- record. insert ( meta. name . clone ( ) , Value :: from ( val_s) ) ;
132- } else {
133- record. insert ( meta. name . clone ( ) , val. clone ( ) ) ;
227+ let record_count = dml. data . len ( ) ;
228+ let mut old_record_vec: Vec < HashMap < String , Value > > = Vec :: new ( ) ;
229+ for record_id in 0 ..record_count{
230+ let old_vals = dml. old_data . get ( record_id) ;
231+ let hash_old_val = old_vals. is_none ( ) ;
232+ let new_vals = dml. data . get ( record_id) . unwrap ( ) ;
233+
234+ let mut record_data: HashMap < String , Value > = HashMap :: new ( ) ;
235+ let mut record_old: HashMap < String , Value > = HashMap :: new ( ) ;
236+
237+ for ( idx, file_meta) in fields. iter_mut ( ) . enumerate ( ) {
238+ let sql_tp = file_meta. get_sql_type ( ) ;
239+ if record_id == 0 {
240+ ins. mysqlType . insert ( file_meta. name . clone ( ) , file_meta. field_type . clone ( ) ) ;
241+ ins. sqlType . insert ( file_meta. name . clone ( ) , sql_tp) ;
242+ if file_meta. is_pk {
243+ if !pks. contains ( & file_meta. name ) {
244+ pks. insert ( 0 , file_meta. name . clone ( ) ) ;
134245 }
135246 }
136-
137- if meta. is_pk {
138- if !pks. contains ( & meta. name ) {
139- pks. insert ( 0 , meta. name . clone ( ) ) ;
247+ }
248+ let data_val = new_vals. get ( idx) . unwrap ( ) ;
249+ let is_same = match old_vals {
250+ Some ( old_values) =>{
251+ let ov = old_values. get ( idx) . unwrap ( ) ;
252+ ov. eq ( data_val)
253+ } ,
254+ None =>true
255+ } ;
256+
257+ if sql_tp == 2005 {
258+ let val_s = Self :: text_field_data ( data_val) ;
259+ record_data. insert ( file_meta. name . clone ( ) , Value :: from ( val_s) ) ;
260+ if !is_same{
261+ let old_val = old_vals. unwrap ( ) . get ( idx) . unwrap ( ) ;
262+ let old_val_s = Self :: text_field_data ( old_val) ;
263+ record_old. insert ( file_meta. name . clone ( ) , Value :: from ( old_val_s) ) ;
264+ }
265+ } else {
266+ if sql_tp == 2004 {
267+ let val_s = Self :: blob_field_data ( data_val) ;
268+ record_data. insert ( file_meta. name . clone ( ) , Value :: from ( val_s) ) ;
269+ if !is_same {
270+ let old_val = old_vals. unwrap ( ) . get ( idx) . unwrap ( ) ;
271+ let old_val_s = Self :: blob_field_data ( old_val) ;
272+ record_data. insert ( file_meta. name . clone ( ) , Value :: from ( old_val_s) ) ;
273+ }
274+ } else {
275+ record_data. insert ( file_meta. name . clone ( ) , data_val. clone ( ) ) ;
276+ if !is_same {
277+ let old_val = old_vals. unwrap ( ) . get ( idx) . unwrap ( ) ;
278+ record_old. insert ( file_meta. name . clone ( ) , old_val. clone ( ) ) ;
140279 }
141280 }
142281 }
143282 }
144- ins. data . push ( record) ;
283+ ins. data . push ( record_data) ;
284+ old_record_vec. push ( record_old) ;
145285 }
146- let mut data_old: Vec < HashMap < String , Value > > = Vec :: new ( ) ;
147- for vals in dml. old_data . iter_mut ( ) {
148- let mut record: HashMap < String , Value > = HashMap :: new ( ) ;
149- for ( idx, val) in vals. iter ( ) . enumerate ( ) {
150- if let Some ( meta) = fields. get ( idx) {
151- record. insert ( meta. name . clone ( ) , val. clone ( ) . take ( ) ) ;
152- }
153- }
154- data_old. push ( record)
155- }
156- if data_old. len ( ) > 0usize {
157- ins. old = Some ( data_old) ;
158- }
159- if pks. len ( ) > 0usize {
286+ if pks. len ( ) > 0 {
160287 ins. pkNames = Some ( pks) ;
161288 }
289+ ins. old = Some ( old_record_vec) ;
162290 ins
163291 }
164292
@@ -201,7 +329,7 @@ impl FieldMeta{
201329 return 4
202330 }
203331 if self . field_type . starts_with ( "bigint" ) {
204- return 5 ;
332+ return - 5 ;
205333 }
206334 if self . field_type . starts_with ( "float" ) {
207335 return 7 ;
@@ -221,7 +349,7 @@ impl FieldMeta{
221349 if self . field_type . starts_with ( "year" ) {
222350 return 12 ;
223351 }
224- if self . field_type . eq ( "datetime" ) || self . field_type . eq ( "timestamp" ) {
352+ if self . field_type . starts_with ( "datetime" ) || self . field_type . starts_with ( "timestamp" ) {
225353 return 93
226354 }
227355 if self . field_type . starts_with ( "char" ) {
@@ -236,6 +364,7 @@ impl FieldMeta{
236364 if self . field_type . ends_with ( "text" ) {
237365 return 2005 ;
238366 }
367+ println ! ( "invalid:{:?}" , self ) ;
239368 -999
240369 }
241370}
@@ -251,13 +380,13 @@ impl TableMetaMapping {
251380 Self { mapping : Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) }
252381 }
253382
254- fn update_mapping ( & mut self , conn : & mut MySQLConnection , tid : u32 , db : String , table : String ) -> Result < Vec < FieldMeta > , ( ) > {
383+ fn update_mapping ( & mut self , conn : & mut MySQLConnection , tid : u32 , db : String , table : String , table_map : & Vec < ColMeta > ) -> Result < Vec < FieldMeta > , ( ) > {
255384 loop {
256385 if let Ok ( mut mp) = self . mapping . lock ( ) {
257386 if !mp. contains_key ( & tid) {
258387 let mut cols = Vec :: new ( ) ;
259388 //println!("Check Mapping {tid} {db} {table} in {:?}", mp.contains_key(&tid));
260- if conn. desc_table ( db. clone ( ) , table. clone ( ) , & mut cols) {
389+ if conn. desc_table ( db. clone ( ) , table. clone ( ) , & mut cols, table_map ) {
261390 mp. insert ( tid, cols. clone ( ) ) ;
262391 return Ok ( cols. clone ( ) ) ;
263392 } else {
@@ -345,15 +474,16 @@ fn worker_body(thread_id: usize, rx: Receiver<RowEvents>, mapping: &mut TableMet
345474 if let Ok ( data) = rx. recv ( ) {
346475 let mut ports: Vec < ( String , String ) > = Vec :: new ( ) ;
347476 let ( _, tablemap) = TableMapEvent :: decode ( data. table_map . payload . as_slice ( ) ) . expect ( "table map error" ) ;
348- table_map. decode_columns ( tablemap. header . table_id , tablemap. column_types , tablemap. column_metas . as_bytes ( ) ) ;
477+ let tm = tablemap. clone ( ) ;
478+ table_map. decode_columns ( tm. header . table_id , tm. column_types , tm. column_metas . as_bytes ( ) ) ;
349479 let mut current_data = DmlData :: new_data ( tablemap. header . table_id as u32 , tablemap. schema_name . clone ( ) , tablemap. table_name . clone ( ) ) ;
350480 for instance in instances. iter_mut ( ) {
351481 if let Some ( ( mq_name, topic) ) = instance. check_if_need_a_mq ( current_data. database . clone ( ) , current_data. table . clone ( ) ) {
352482 ports. push ( ( mq_name, topic) ) ;
353483 }
354484 }
355485 if ports. len ( ) < 1 {
356- //println!("未匹配到实例:{}.{}", &data .database, &data .table);
486+ //println!("未匹配到实例:{}.{}", ¤t_data .database, ¤t_data .table);
357487 continue ;
358488 }
359489 if let Some ( ev) = data. row_event {
@@ -380,18 +510,27 @@ fn worker_body(thread_id: usize, rx: Receiver<RowEvents>, mapping: &mut TableMet
380510 current_data. append_data ( data. seq_idx , "DELETE" . to_string ( ) , Vec :: new ( ) , old_values, ev. header . log_pos ) ;
381511 }
382512 if vec ! [ 32u8 , 31u8 , 30u8 ] . contains ( & ev. header . event_type ) {
383- if let Ok ( mut meta) = mapping. update_mapping ( & mut conn, current_data. table_id , current_data. database . clone ( ) , current_data. table . clone ( ) ) {
513+ let tm = tablemap. clone ( ) ;
514+ if let Ok ( mut meta) = mapping. update_mapping ( & mut conn,
515+ current_data. table_id ,
516+ current_data. database . clone ( ) ,
517+ current_data. table . clone ( ) ,
518+ & table_map. metas [ & tm. header . table_id ]
519+
520+ ) {
384521 if meta. len ( ) == 0usize {
385522 println ! ( "表{}.{} 不存在" , current_data. database, current_data. table) ;
386523 continue
387524 }
388- let message = DmlMessage :: from_dml ( current_data, & mut meta) ;
389- if let Ok ( json_message ) = serde_json :: to_string ( & message ) {
390- //println!("Canal JSON:\n{}", &json_message);
525+ let mut message = DmlMessage :: from_dml ( current_data, & mut meta) ;
526+ let json_str = message . format_json ( & mut meta ) ;
527+ if ports . len ( ) > 0 {
391528 for ( mq_name, topic) in ports {
392- let msg_qu = QueueMessage { topic, payload : json_message . clone ( ) , pos } ;
529+ let msg_qu = QueueMessage { topic, payload : json_str . clone ( ) , pos } ;
393530 queue. push ( & mq_name, msg_qu) ;
394531 }
532+ } else {
533+ println ! ( "没有可用发送端口" ) ;
395534 }
396535 }
397536
0 commit comments