32
32
#include "migration/migration.h"
33
33
#include "util.h"
34
34
35
+ #include "block/aio-wait.h"
36
+ #include "qemu/coroutine.h"
37
+
35
38
#define TYPE_COLO_COMPARE "colo-compare"
36
39
#define COLO_COMPARE (obj ) \
37
40
OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
@@ -77,6 +80,23 @@ static int event_unhandled_count;
77
80
* |packet | |packet + |packet | |packet +
78
81
* +--------+ +--------+ +--------+ +--------+
79
82
*/
83
+
84
+ typedef struct SendCo {
85
+ Coroutine * co ;
86
+ struct CompareState * s ;
87
+ CharBackend * chr ;
88
+ GQueue send_list ;
89
+ bool notify_remote_frame ;
90
+ bool done ;
91
+ int ret ;
92
+ } SendCo ;
93
+
94
+ typedef struct SendEntry {
95
+ uint32_t size ;
96
+ uint32_t vnet_hdr_len ;
97
+ uint8_t * buf ;
98
+ } SendEntry ;
99
+
80
100
typedef struct CompareState {
81
101
Object parent ;
82
102
@@ -91,6 +111,8 @@ typedef struct CompareState {
91
111
SocketReadState pri_rs ;
92
112
SocketReadState sec_rs ;
93
113
SocketReadState notify_rs ;
114
+ SendCo out_sendco ;
115
+ SendCo notify_sendco ;
94
116
bool vnet_hdr ;
95
117
uint32_t compare_timeout ;
96
118
uint32_t expired_scan_cycle ;
@@ -124,10 +146,11 @@ enum {
124
146
125
147
126
148
static int compare_chr_send (CompareState * s ,
127
- const uint8_t * buf ,
149
+ uint8_t * buf ,
128
150
uint32_t size ,
129
151
uint32_t vnet_hdr_len ,
130
- bool notify_remote_frame );
152
+ bool notify_remote_frame ,
153
+ bool zero_copy );
131
154
132
155
static bool packet_matches_str (const char * str ,
133
156
const uint8_t * buf ,
@@ -145,7 +168,7 @@ static void notify_remote_frame(CompareState *s)
145
168
char msg [] = "DO_CHECKPOINT" ;
146
169
int ret = 0 ;
147
170
148
- ret = compare_chr_send (s , (uint8_t * )msg , strlen (msg ), 0 , true);
171
+ ret = compare_chr_send (s , (uint8_t * )msg , strlen (msg ), 0 , true, false );
149
172
if (ret < 0 ) {
150
173
error_report ("Notify Xen COLO-frame failed" );
151
174
}
@@ -272,12 +295,13 @@ static void colo_release_primary_pkt(CompareState *s, Packet *pkt)
272
295
pkt -> data ,
273
296
pkt -> size ,
274
297
pkt -> vnet_hdr_len ,
275
- false);
298
+ false,
299
+ true);
276
300
if (ret < 0 ) {
277
301
error_report ("colo send primary packet failed" );
278
302
}
279
303
trace_colo_compare_main ("packet same and release packet" );
280
- packet_destroy (pkt , NULL );
304
+ packet_destroy_partial (pkt , NULL );
281
305
}
282
306
283
307
/*
@@ -699,65 +723,115 @@ static void colo_compare_connection(void *opaque, void *user_data)
699
723
}
700
724
}
701
725
702
- static int compare_chr_send (CompareState * s ,
703
- const uint8_t * buf ,
704
- uint32_t size ,
705
- uint32_t vnet_hdr_len ,
706
- bool notify_remote_frame )
726
+ static void coroutine_fn _compare_chr_send (void * opaque )
707
727
{
728
+ SendCo * sendco = opaque ;
729
+ CompareState * s = sendco -> s ;
708
730
int ret = 0 ;
709
- uint32_t len = htonl (size );
710
731
711
- if (!size ) {
712
- return 0 ;
713
- }
732
+ while (!g_queue_is_empty ( & sendco -> send_list ) ) {
733
+ SendEntry * entry = g_queue_pop_tail ( & sendco -> send_list ) ;
734
+ uint32_t len = htonl ( entry -> size );
714
735
715
- if (notify_remote_frame ) {
716
- ret = qemu_chr_fe_write_all (& s -> chr_notify_dev ,
717
- (uint8_t * )& len ,
718
- sizeof (len ));
719
- } else {
720
- ret = qemu_chr_fe_write_all (& s -> chr_out , (uint8_t * )& len , sizeof (len ));
721
- }
736
+ ret = qemu_chr_fe_write_all (sendco -> chr , (uint8_t * )& len , sizeof (len ));
722
737
723
- if (ret != sizeof (len )) {
724
- goto err ;
725
- }
738
+ if (ret != sizeof (len )) {
739
+ g_free (entry -> buf );
740
+ g_slice_free (SendEntry , entry );
741
+ goto err ;
742
+ }
726
743
727
- if (s -> vnet_hdr ) {
728
- /*
729
- * We send vnet header len make other module(like filter-redirector)
730
- * know how to parse net packet correctly.
731
- */
732
- len = htonl (vnet_hdr_len );
744
+ if (! sendco -> notify_remote_frame && s -> vnet_hdr ) {
745
+ /*
746
+ * We send vnet header len make other module(like filter-redirector)
747
+ * know how to parse net packet correctly.
748
+ */
749
+ len = htonl (entry -> vnet_hdr_len );
733
750
734
- if (!notify_remote_frame ) {
735
- ret = qemu_chr_fe_write_all (& s -> chr_out ,
751
+ ret = qemu_chr_fe_write_all (sendco -> chr ,
736
752
(uint8_t * )& len ,
737
753
sizeof (len ));
754
+
755
+ if (ret != sizeof (len )) {
756
+ g_free (entry -> buf );
757
+ g_slice_free (SendEntry , entry );
758
+ goto err ;
759
+ }
738
760
}
739
761
740
- if (ret != sizeof (len )) {
762
+ ret = qemu_chr_fe_write_all (sendco -> chr ,
763
+ (uint8_t * )entry -> buf ,
764
+ entry -> size );
765
+
766
+ if (ret != entry -> size ) {
767
+ g_free (entry -> buf );
768
+ g_slice_free (SendEntry , entry );
741
769
goto err ;
742
770
}
771
+
772
+ g_free (entry -> buf );
773
+ g_slice_free (SendEntry , entry );
743
774
}
744
775
776
+ sendco -> ret = 0 ;
777
+ goto out ;
778
+
779
+ err :
780
+ while (!g_queue_is_empty (& sendco -> send_list )) {
781
+ SendEntry * entry = g_queue_pop_tail (& sendco -> send_list );
782
+ g_free (entry -> buf );
783
+ g_slice_free (SendEntry , entry );
784
+ }
785
+ sendco -> ret = ret < 0 ? ret : - EIO ;
786
+ out :
787
+ sendco -> co = NULL ;
788
+ sendco -> done = true;
789
+ aio_wait_kick ();
790
+ }
791
+
792
+ static int compare_chr_send (CompareState * s ,
793
+ uint8_t * buf ,
794
+ uint32_t size ,
795
+ uint32_t vnet_hdr_len ,
796
+ bool notify_remote_frame ,
797
+ bool zero_copy )
798
+ {
799
+ SendCo * sendco ;
800
+ SendEntry * entry ;
801
+
745
802
if (notify_remote_frame ) {
746
- ret = qemu_chr_fe_write_all (& s -> chr_notify_dev ,
747
- (uint8_t * )buf ,
748
- size );
803
+ sendco = & s -> notify_sendco ;
749
804
} else {
750
- ret = qemu_chr_fe_write_all ( & s -> chr_out , ( uint8_t * ) buf , size ) ;
805
+ sendco = & s -> out_sendco ;
751
806
}
752
807
753
- if (ret != size ) {
754
- goto err ;
808
+ if (! size ) {
809
+ return 0 ;
755
810
}
756
811
757
- return 0 ;
812
+ entry = g_slice_new (SendEntry );
813
+ entry -> size = size ;
814
+ entry -> vnet_hdr_len = vnet_hdr_len ;
815
+ if (zero_copy ) {
816
+ entry -> buf = buf ;
817
+ } else {
818
+ entry -> buf = g_malloc (size );
819
+ memcpy (entry -> buf , buf , size );
820
+ }
821
+ g_queue_push_head (& sendco -> send_list , entry );
822
+
823
+ if (sendco -> done ) {
824
+ sendco -> co = qemu_coroutine_create (_compare_chr_send , sendco );
825
+ sendco -> done = false;
826
+ qemu_coroutine_enter (sendco -> co );
827
+ if (sendco -> done ) {
828
+ /* report early errors */
829
+ return sendco -> ret ;
830
+ }
831
+ }
758
832
759
- err :
760
- return ret < 0 ? ret : - EIO ;
833
+ /* assume success */
834
+ return 0 ;
761
835
}
762
836
763
837
static int compare_chr_can_read (void * opaque )
@@ -1063,6 +1137,7 @@ static void compare_pri_rs_finalize(SocketReadState *pri_rs)
1063
1137
pri_rs -> buf ,
1064
1138
pri_rs -> packet_len ,
1065
1139
pri_rs -> vnet_hdr_len ,
1140
+ false,
1066
1141
false);
1067
1142
} else {
1068
1143
/* compare packet in the specified connection */
@@ -1093,7 +1168,7 @@ static void compare_notify_rs_finalize(SocketReadState *notify_rs)
1093
1168
if (packet_matches_str ("COLO_USERSPACE_PROXY_INIT" ,
1094
1169
notify_rs -> buf ,
1095
1170
notify_rs -> packet_len )) {
1096
- ret = compare_chr_send (s , (uint8_t * )msg , strlen (msg ), 0 , true);
1171
+ ret = compare_chr_send (s , (uint8_t * )msg , strlen (msg ), 0 , true, false );
1097
1172
if (ret < 0 ) {
1098
1173
error_report ("Notify Xen COLO-frame INIT failed" );
1099
1174
}
@@ -1199,6 +1274,20 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
1199
1274
1200
1275
QTAILQ_INSERT_TAIL (& net_compares , s , next );
1201
1276
1277
+ s -> out_sendco .s = s ;
1278
+ s -> out_sendco .chr = & s -> chr_out ;
1279
+ s -> out_sendco .notify_remote_frame = false;
1280
+ s -> out_sendco .done = true;
1281
+ g_queue_init (& s -> out_sendco .send_list );
1282
+
1283
+ if (s -> notify_dev ) {
1284
+ s -> notify_sendco .s = s ;
1285
+ s -> notify_sendco .chr = & s -> chr_notify_dev ;
1286
+ s -> notify_sendco .notify_remote_frame = true;
1287
+ s -> notify_sendco .done = true;
1288
+ g_queue_init (& s -> notify_sendco .send_list );
1289
+ }
1290
+
1202
1291
g_queue_init (& s -> conn_list );
1203
1292
1204
1293
qemu_mutex_init (& event_mtx );
@@ -1225,8 +1314,9 @@ static void colo_flush_packets(void *opaque, void *user_data)
1225
1314
pkt -> data ,
1226
1315
pkt -> size ,
1227
1316
pkt -> vnet_hdr_len ,
1228
- false);
1229
- packet_destroy (pkt , NULL );
1317
+ false,
1318
+ true);
1319
+ packet_destroy_partial (pkt , NULL );
1230
1320
}
1231
1321
while (!g_queue_is_empty (& conn -> secondary_list )) {
1232
1322
pkt = g_queue_pop_head (& conn -> secondary_list );
@@ -1297,10 +1387,23 @@ static void colo_compare_finalize(Object *obj)
1297
1387
}
1298
1388
}
1299
1389
1390
+ AioContext * ctx = iothread_get_aio_context (s -> iothread );
1391
+ aio_context_acquire (ctx );
1392
+ AIO_WAIT_WHILE (ctx , !s -> out_sendco .done );
1393
+ if (s -> notify_dev ) {
1394
+ AIO_WAIT_WHILE (ctx , !s -> notify_sendco .done );
1395
+ }
1396
+ aio_context_release (ctx );
1397
+
1300
1398
/* Release all unhandled packets after compare thead exited */
1301
1399
g_queue_foreach (& s -> conn_list , colo_flush_packets , s );
1400
+ AIO_WAIT_WHILE (NULL , !s -> out_sendco .done );
1302
1401
1303
1402
g_queue_clear (& s -> conn_list );
1403
+ g_queue_clear (& s -> out_sendco .send_list );
1404
+ if (s -> notify_dev ) {
1405
+ g_queue_clear (& s -> notify_sendco .send_list );
1406
+ }
1304
1407
1305
1408
if (s -> connection_track_table ) {
1306
1409
g_hash_table_destroy (s -> connection_track_table );
0 commit comments