15
15
16
16
#include <binsparse/detail/shm_tools.h>
17
17
18
+ #if __STDC_VERSION__ >= 201112L
19
+ #include <stdatomic.h>
20
+ #endif
21
+
18
22
// Write an array to a dataset / file
19
23
// Returns 0 on success, nonzero on error.
20
24
int bsp_write_array (hid_t f , const char * label , bsp_array_t array ,
@@ -71,6 +75,7 @@ int bsp_write_array(hid_t f, const char* label, bsp_array_t array,
71
75
return 0 ;
72
76
}
73
77
78
+ #if __STDC_VERSION__ >= 201112L
74
79
bsp_array_t bsp_read_array_parallel (hid_t f , const char * label ,
75
80
int num_threads ) {
76
81
hid_t dset = H5Dopen2 (f , label , H5P_DEFAULT );
@@ -97,12 +102,19 @@ bsp_array_t bsp_read_array_parallel(hid_t f, const char* label,
97
102
98
103
bsp_type_t type = bsp_get_bsp_type (hdf5_type );
99
104
105
+ // Array will be written into a POSIX shared memory.
100
106
bsp_shm_t array_shm = bsp_shm_new (dims [0 ] * bsp_type_size (type ));
101
107
bsp_array_t array ;
102
108
array .type = type ;
103
109
array .size = dims [0 ];
104
- array .shm = array_shm ;
105
- array .shmat_memory = true;
110
+ array .allocator = bsp_shm_allocator ;
111
+
112
+ bsp_shm_t active_children_shm = bsp_shm_new (sizeof (_Atomic int ));
113
+
114
+ _Atomic int * active_children = bsp_shm_attach (active_children_shm );
115
+ bsp_shm_delete (active_children_shm );
116
+
117
+ * active_children = num_threads - 1 ;
106
118
107
119
pid_t * pids = (pid_t * ) malloc (sizeof (pid_t ) * num_threads );
108
120
@@ -121,7 +133,7 @@ bsp_array_t bsp_read_array_parallel(hid_t f, const char* label,
121
133
122
134
array .data = bsp_shm_attach (array_shm );
123
135
if (thread_num == 0 ) {
124
- bsp_shm_delete (array . shm );
136
+ bsp_shm_delete (array_shm );
125
137
}
126
138
127
139
hsize_t chunk_size = (array .size + num_threads - 1 ) / num_threads ;
@@ -137,22 +149,29 @@ bsp_array_t bsp_read_array_parallel(hid_t f, const char* label,
137
149
hid_t memspace_id = H5Screate_simple (1 , & count , NULL );
138
150
139
151
H5Dread (dset , bsp_get_hdf5_native_type (type ), memspace_id , fspace ,
140
- H5P_DEFAULT , array .data + start * bsp_type_size (type ));
152
+ H5P_DEFAULT , (( char * ) array .data ) + start * bsp_type_size (type ));
141
153
H5Sclose (memspace_id );
142
154
}
143
155
144
156
H5Dclose (dset );
145
157
H5Sclose (fspace );
146
158
147
159
if (thread_num > 0 ) {
160
+ atomic_fetch_add_explicit (active_children , -1 , memory_order_relaxed );
161
+ bsp_shm_detach (active_children );
148
162
bsp_shm_detach (array .data );
149
163
exit (0 );
150
164
}
151
165
152
166
free (pids );
153
167
168
+ while (atomic_load (active_children ) > 0 ) {
169
+ }
170
+ bsp_shm_detach (active_children );
171
+
154
172
return array ;
155
173
}
174
+ #endif
156
175
157
176
bsp_array_t bsp_read_array (hid_t f , const char * label ) {
158
177
hid_t dset = H5Dopen2 (f , label , H5P_DEFAULT );
0 commit comments