9
9
10
10
#include <binsparse/detail/shm_tools.h>
11
11
12
+ #if __STDC_VERSION__ >= 201112L
13
+ #include <stdatomic.h>
14
+ #endif
15
+
12
16
// Write an array to a dataset / file
13
17
// Returns 0 on success, nonzero on error.
14
18
int bsp_write_array (hid_t f , const char * label , bsp_array_t array ,
@@ -65,6 +69,7 @@ int bsp_write_array(hid_t f, const char* label, bsp_array_t array,
65
69
return 0 ;
66
70
}
67
71
72
+ #if __STDC_VERSION__ >= 201112L
68
73
bsp_array_t bsp_read_array_parallel (hid_t f , const char * label ,
69
74
int num_threads ) {
70
75
hid_t dset = H5Dopen2 (f , label , H5P_DEFAULT );
@@ -91,12 +96,19 @@ bsp_array_t bsp_read_array_parallel(hid_t f, const char* label,
91
96
92
97
bsp_type_t type = bsp_get_bsp_type (hdf5_type );
93
98
99
+ // Array will be written into a POSIX shared memory.
94
100
bsp_shm_t array_shm = bsp_shm_new (dims [0 ] * bsp_type_size (type ));
95
101
bsp_array_t array ;
96
102
array .type = type ;
97
103
array .size = dims [0 ];
98
- array .shm = array_shm ;
99
- array .shmat_memory = true;
104
+ array .allocator = bsp_shm_allocator ;
105
+
106
+ bsp_shm_t active_children_shm = bsp_shm_new (sizeof (_Atomic int ));
107
+
108
+ _Atomic int * active_children = bsp_shm_attach (active_children_shm );
109
+ bsp_shm_delete (active_children_shm );
110
+
111
+ * active_children = num_threads - 1 ;
100
112
101
113
pid_t * pids = (pid_t * ) malloc (sizeof (pid_t ) * num_threads );
102
114
@@ -115,7 +127,7 @@ bsp_array_t bsp_read_array_parallel(hid_t f, const char* label,
115
127
116
128
array .data = bsp_shm_attach (array_shm );
117
129
if (thread_num == 0 ) {
118
- bsp_shm_delete (array . shm );
130
+ bsp_shm_delete (array_shm );
119
131
}
120
132
121
133
hsize_t chunk_size = (array .size + num_threads - 1 ) / num_threads ;
@@ -131,22 +143,29 @@ bsp_array_t bsp_read_array_parallel(hid_t f, const char* label,
131
143
hid_t memspace_id = H5Screate_simple (1 , & count , NULL );
132
144
133
145
H5Dread (dset , bsp_get_hdf5_native_type (type ), memspace_id , fspace ,
134
- H5P_DEFAULT , array .data + start * bsp_type_size (type ));
146
+ H5P_DEFAULT , (( char * ) array .data ) + start * bsp_type_size (type ));
135
147
H5Sclose (memspace_id );
136
148
}
137
149
138
150
H5Dclose (dset );
139
151
H5Sclose (fspace );
140
152
141
153
if (thread_num > 0 ) {
154
+ atomic_fetch_add_explicit (active_children , -1 , memory_order_relaxed );
155
+ bsp_shm_detach (active_children );
142
156
bsp_shm_detach (array .data );
143
157
exit (0 );
144
158
}
145
159
146
160
free (pids );
147
161
162
+ while (atomic_load (active_children ) > 0 ) {
163
+ }
164
+ bsp_shm_detach (active_children );
165
+
148
166
return array ;
149
167
}
168
+ #endif
150
169
151
170
bsp_array_t bsp_read_array (hid_t f , const char * label ) {
152
171
hid_t dset = H5Dopen2 (f , label , H5P_DEFAULT );
0 commit comments