@@ -29,7 +29,6 @@ mutable struct WindowIO <: IO
2929 header:: BufferHeader
3030 remote_header:: BufferHeader
3131 header_win:: Win
32- header_cwin:: CWin
3332 is_open:: Bool
3433 # Current read position
3534 ptr:: WinCountT
@@ -56,7 +55,6 @@ mutable struct WindowIO <: IO
5655 header,
5756 remote_header,
5857 header_win,
59- CWin (header_win),
6058 true ,
6159 0 ,
6260 Condition (),
@@ -92,23 +90,23 @@ function has_data_available(w::WindowIO)
9290 return false
9391 end
9492
95- if w. header. count > w. ptr && w. header. needed_length == w. header. length # fast check without window sync
96- return true
97- end
98-
99- # Check if we need to grow the buffer
100- MPI. Win_lock (MPI. LOCK_EXCLUSIVE, w. myrank, 0 , w. header_win)
101- MPI. Win_sync (w. header_cwin) # CWin version doesn't allocate
102- if w. header. needed_length > w. header. length
93+ MPI. Win_lock (MPI. LOCK_SHARED, w. myrank, 0 , w. header_win)
94+ have_data = w. header. count > w. ptr
95+ need_grow = w. header. needed_length > w. header. length
96+ MPI. Win_unlock (w. myrank, w. header_win)
97+
98+ # Grow buffer if needed
99+ if need_grow
103100 MPI. Win_detach (w. win, w. buffer)
104101 resize! (w. buffer, w. header. needed_length)
105102 MPI. Win_attach (w. win, w. buffer)
103+ MPI. Win_lock (MPI. LOCK_EXCLUSIVE, w. myrank, 0 , w. header_win)
106104 w. header. address = MPI. Get_address (w. buffer)
107105 w. header. length = w. header. needed_length
106+ MPI. Win_unlock (w. myrank, w. header_win)
108107 end
109- MPI. Win_unlock (w. myrank, w. header_win)
110108
111- return w . header . count > w . ptr
109+ return have_data
112110end
113111
114112function Base. wait (w:: WindowIO )
126124
127125# wait until the specified number of bytes is available or the stream is closed
128126function wait_nb_available (w, nb)
129- nb_found = wait_nb_available (w)
127+ nb_found = 0
130128 while nb_found < nb && w. is_open
131- MPI. Win_lock (MPI. LOCK_SHARED, w. myrank, 0 , w. header_win)
132- MPI. Win_sync (w. header_cwin) # sync every loop, to make sure we get updates
133- MPI. Win_unlock (w. myrank, w. header_win)
134129 nb_found = wait_nb_available (w)
135130 end
136131 return nb_found
0 commit comments