1
+ /*
2
+ * Copyright (c) 2018 DataDirect Networks. All rights reserved.
3
+ * $COPYRIGHT$
4
+ *
5
+ * Additional copyrights may follow
6
+ *
7
+ * $HEADER$
8
+ */
9
+
10
+ #include "ompi_config.h"
11
+ #include "mpi.h"
12
+
13
+ #include "ompi/mca/fbtl/fbtl.h"
14
+ #include "ompi/mca/fbtl/ime/fbtl_ime.h"
15
+
16
+ /*
17
+ * *******************************************************************
18
+ * ************************ actions structure ************************
19
+ * *******************************************************************
20
+ */
21
+ static mca_fbtl_base_module_1_0_0_t ime = {
22
+ mca_fbtl_ime_module_init , /* initalise after being selected */
23
+ mca_fbtl_ime_module_finalize , /* close a module on a communicator */
24
+ mca_fbtl_ime_preadv , /* blocking read */
25
+ mca_fbtl_ime_ipreadv , /* non-blocking read*/
26
+ mca_fbtl_ime_pwritev , /* blocking write */
27
+ mca_fbtl_ime_ipwritev , /* non-blocking write */
28
+ mca_fbtl_ime_progress , /* module specific progress */
29
+ mca_fbtl_ime_request_free /* free module specific data items on the request */
30
+ };
31
+ /*
32
+ * *******************************************************************
33
+ * ************************* structure ends **************************
34
+ * *******************************************************************
35
+ */
36
+
37
+ int mca_fbtl_ime_component_init_query (bool enable_progress_threads ,
38
+ bool enable_mpi_threads )
39
+ {
40
+ /* Nothing to do */
41
+ return OMPI_SUCCESS ;
42
+ }
43
+
44
+ struct mca_fbtl_base_module_1_0_0_t *
45
+ mca_fbtl_ime_component_file_query (ompio_file_t * fh , int * priority )
46
+ {
47
+ * priority = mca_fbtl_ime_priority ;
48
+
49
+ /* Do the same as the FS component:
50
+ Only return a non-null component if IME
51
+ can handle the IO operations. */
52
+ if (IME == fh -> f_fstype ) {
53
+ if (* priority < FBTL_IME_INCREASED_PRIORITY ) {
54
+ * priority = FBTL_IME_INCREASED_PRIORITY ;
55
+ }
56
+ return & ime ;
57
+ }
58
+
59
+ return NULL ;
60
+ }
61
+
62
+ int mca_fbtl_ime_component_file_unquery (ompio_file_t * file )
63
+ {
64
+ /* This function might be needed for some purposes later. for now it
65
+ * does not have anything to do since there are no steps which need
66
+ * to be undone if this module is not selected */
67
+
68
+ return OMPI_SUCCESS ;
69
+ }
70
+
71
+ int mca_fbtl_ime_module_init (ompio_file_t * file )
72
+ {
73
+ return OMPI_SUCCESS ;
74
+ }
75
+
76
+
77
+ int mca_fbtl_ime_module_finalize (ompio_file_t * file )
78
+ {
79
+ return OMPI_SUCCESS ;
80
+ }
81
+
82
+ bool mca_fbtl_ime_progress ( mca_ompio_request_t * req )
83
+ {
84
+ int i = 0 , lcount = 0 , ret_code = 0 ;
85
+ mca_fbtl_ime_request_data_t * data = (mca_fbtl_ime_request_data_t * )req -> req_data ;
86
+
87
+ /* Go through all the requests in the current batch to check
88
+ * if they have finished. */
89
+ for (i = data -> aio_first_active_req ; i < data -> aio_last_active_req ; i ++ ) {
90
+ if ( data -> aio_req_status [i ] == FBTL_IME_REQ_CLOSED ) {
91
+ lcount ++ ;
92
+ }
93
+ else if ( data -> aio_req_status [i ] >= 0 ) {
94
+ /* request has finished */
95
+ data -> aio_open_reqs -- ;
96
+ lcount ++ ;
97
+ data -> aio_total_len += data -> aio_req_status [i ];
98
+ data -> aio_req_status [i ] = FBTL_IME_REQ_CLOSED ;
99
+ }
100
+ else if ( data -> aio_req_status [i ] == FBTL_IME_REQ_ERROR ) {
101
+ /* an error occured. */
102
+ data -> aio_open_reqs -- ;
103
+ lcount ++ ;
104
+ data -> aio_req_fail_count ++ ;
105
+ data -> aio_req_status [i ] = FBTL_IME_REQ_CLOSED ;
106
+ }
107
+ else {
108
+ /* not yet done */
109
+ }
110
+ }
111
+
112
+ /* In case the current batch of requests terminated, exit if an error
113
+ * happened for any request.
114
+ */
115
+ if ( data -> aio_req_fail_count > 0 &&
116
+ lcount == data -> aio_last_active_req - data -> aio_first_active_req ) {
117
+ goto error_exit ;
118
+ }
119
+
120
+ /* In case some requests are pending, and no error happened in any of the
121
+ * previous requests, then the next batch of operations should be prepared.
122
+ */
123
+ if ( (lcount == data -> aio_req_chunks ) && (0 != data -> aio_open_reqs ) ) {
124
+
125
+ /* prepare the next batch of operations */
126
+ data -> aio_first_active_req = data -> aio_last_active_req ;
127
+ if ( (data -> aio_req_count - data -> aio_last_active_req ) > data -> aio_req_chunks ) {
128
+ data -> aio_last_active_req += data -> aio_req_chunks ;
129
+ }
130
+ else {
131
+ data -> aio_last_active_req = data -> aio_req_count ;
132
+ }
133
+
134
+ /* Send the requests. */
135
+ for ( i = data -> aio_first_active_req ; i < data -> aio_last_active_req ; i ++ ) {
136
+ if ( FBTL_IME_READ == data -> aio_req_type &&
137
+ ime_native_aio_read (& data -> aio_reqs [i ]) < 0 ) {
138
+ opal_output (1 , "mca_fbtl_ime_progress: error in aio_read()" );
139
+ data -> aio_req_status [i ] = FBTL_IME_REQ_ERROR ;
140
+ data -> aio_last_active_req = i + 1 ;
141
+ break ;
142
+ }
143
+ else if ( FBTL_IME_WRITE == data -> aio_req_type &&
144
+ ime_native_aio_write (& data -> aio_reqs [i ]) < 0 ) {
145
+ opal_output (1 , "mca_fbtl_ime_progress: error in aio_write()" );
146
+ data -> aio_req_status [i ] = FBTL_IME_REQ_ERROR ;
147
+ data -> aio_last_active_req = i + 1 ;
148
+ break ;
149
+ }
150
+ }
151
+ }
152
+
153
+ if ( 0 == data -> aio_open_reqs ) {
154
+ /* all pending operations are finished for this request */
155
+ req -> req_ompi .req_status .MPI_ERROR = OMPI_SUCCESS ;
156
+ req -> req_ompi .req_status ._ucount = data -> aio_total_len ;
157
+ return true;
158
+ }
159
+ return false;
160
+
161
+ error_exit :
162
+ req -> req_ompi .req_status .MPI_ERROR = OMPI_ERROR ;
163
+ req -> req_ompi .req_status ._ucount = data -> aio_total_len ;
164
+ return true;
165
+ }
166
+
167
+ void mca_fbtl_ime_request_free ( mca_ompio_request_t * req )
168
+ {
169
+ /* Free the fbtl specific data structures */
170
+ mca_fbtl_ime_request_data_t * data = (mca_fbtl_ime_request_data_t * )req -> req_data ;
171
+ if (NULL != data ) {
172
+ free (data -> allocated_data );
173
+ free (data );
174
+ req -> req_data = NULL ;
175
+ }
176
+ }
177
+
178
+ void mca_fbtl_ime_complete_cb (struct ime_aiocb * aiocb , int err , ssize_t bytes )
179
+ {
180
+ ssize_t * req_status = (ssize_t * ) aiocb -> user_context ;
181
+ * req_status = err == 0 ? bytes : FBTL_IME_REQ_ERROR ;
182
+ }
0 commit comments