@@ -118,6 +118,7 @@ struct writequeue_entry {
118
118
int len ;
119
119
int end ;
120
120
int users ;
121
+ bool dirty ;
121
122
struct connection * con ;
122
123
struct list_head msgs ;
123
124
struct kref ref ;
@@ -700,6 +701,42 @@ static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
700
701
memset ((char * )saddr + * addr_len , 0 , sizeof (struct sockaddr_storage ) - * addr_len );
701
702
}
702
703
704
+ static void dlm_page_release (struct kref * kref )
705
+ {
706
+ struct writequeue_entry * e = container_of (kref , struct writequeue_entry ,
707
+ ref );
708
+
709
+ __free_page (e -> page );
710
+ kfree (e );
711
+ }
712
+
713
+ static void dlm_msg_release (struct kref * kref )
714
+ {
715
+ struct dlm_msg * msg = container_of (kref , struct dlm_msg , ref );
716
+
717
+ kref_put (& msg -> entry -> ref , dlm_page_release );
718
+ kfree (msg );
719
+ }
720
+
721
+ static void free_entry (struct writequeue_entry * e )
722
+ {
723
+ struct dlm_msg * msg , * tmp ;
724
+
725
+ list_for_each_entry_safe (msg , tmp , & e -> msgs , list ) {
726
+ if (msg -> orig_msg ) {
727
+ msg -> orig_msg -> retransmit = false;
728
+ kref_put (& msg -> orig_msg -> ref , dlm_msg_release );
729
+ }
730
+
731
+ list_del (& msg -> list );
732
+ kref_put (& msg -> ref , dlm_msg_release );
733
+ }
734
+
735
+ list_del (& e -> list );
736
+ atomic_dec (& e -> con -> writequeue_cnt );
737
+ kref_put (& e -> ref , dlm_page_release );
738
+ }
739
+
703
740
static void dlm_close_sock (struct socket * * sock )
704
741
{
705
742
if (* sock ) {
@@ -714,6 +751,7 @@ static void close_connection(struct connection *con, bool and_other,
714
751
bool tx , bool rx )
715
752
{
716
753
bool closing = test_and_set_bit (CF_CLOSING , & con -> flags );
754
+ struct writequeue_entry * e ;
717
755
718
756
if (tx && !closing && cancel_work_sync (& con -> swork )) {
719
757
log_print ("canceled swork for node %d" , con -> nodeid );
@@ -732,6 +770,26 @@ static void close_connection(struct connection *con, bool and_other,
732
770
close_connection (con -> othercon , false, tx , rx );
733
771
}
734
772
773
+ /* if we send a writequeue entry only a half way, we drop the
774
+ * whole entry because reconnection and that we not start of the
775
+ * middle of a msg which will confuse the other end.
776
+ *
777
+ * we can always drop messages because retransmits, but what we
778
+ * cannot allow is to transmit half messages which may be processed
779
+ * at the other side.
780
+ *
781
+ * our policy is to start on a clean state when disconnects, we don't
782
+ * know what's send/received on transport layer in this case.
783
+ */
784
+ spin_lock (& con -> writequeue_lock );
785
+ if (!list_empty (& con -> writequeue )) {
786
+ e = list_first_entry (& con -> writequeue , struct writequeue_entry ,
787
+ list );
788
+ if (e -> dirty )
789
+ free_entry (e );
790
+ }
791
+ spin_unlock (& con -> writequeue_lock );
792
+
735
793
con -> rx_leftover = 0 ;
736
794
con -> retries = 0 ;
737
795
clear_bit (CF_CONNECTED , & con -> flags );
@@ -1026,41 +1084,6 @@ static int accept_from_sock(struct listen_connection *con)
1026
1084
return result ;
1027
1085
}
1028
1086
1029
- static void dlm_page_release (struct kref * kref )
1030
- {
1031
- struct writequeue_entry * e = container_of (kref , struct writequeue_entry ,
1032
- ref );
1033
-
1034
- __free_page (e -> page );
1035
- kfree (e );
1036
- }
1037
-
1038
- static void dlm_msg_release (struct kref * kref )
1039
- {
1040
- struct dlm_msg * msg = container_of (kref , struct dlm_msg , ref );
1041
-
1042
- kref_put (& msg -> entry -> ref , dlm_page_release );
1043
- kfree (msg );
1044
- }
1045
-
1046
- static void free_entry (struct writequeue_entry * e )
1047
- {
1048
- struct dlm_msg * msg , * tmp ;
1049
-
1050
- list_for_each_entry_safe (msg , tmp , & e -> msgs , list ) {
1051
- if (msg -> orig_msg ) {
1052
- msg -> orig_msg -> retransmit = false;
1053
- kref_put (& msg -> orig_msg -> ref , dlm_msg_release );
1054
- }
1055
- list_del (& msg -> list );
1056
- kref_put (& msg -> ref , dlm_msg_release );
1057
- }
1058
-
1059
- list_del (& e -> list );
1060
- atomic_dec (& e -> con -> writequeue_cnt );
1061
- kref_put (& e -> ref , dlm_page_release );
1062
- }
1063
-
1064
1087
/*
1065
1088
* writequeue_entry_complete - try to delete and free write queue entry
1066
1089
* @e: write queue entry to try to delete
@@ -1072,6 +1095,8 @@ static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
1072
1095
{
1073
1096
e -> offset += completed ;
1074
1097
e -> len -= completed ;
1098
+ /* signal that page was half way transmitted */
1099
+ e -> dirty = true;
1075
1100
1076
1101
if (e -> len == 0 && e -> users == 0 )
1077
1102
free_entry (e );
0 commit comments