Skip to content

Commit 4ad4095

Browse files
mjacob1002UbuntuZwFinkrbuchMathew Jacob
authored
CkIO Reading Capabilities (#3788)
* Added the read_stripe field to the Options struct in ckio.h * Added the read_stripe parameter in Options to the be puped * Added startReadSession function to the ckio.h file. Will be called analagously to the startSession function that is used for writes. * Added the message type ReadReadyMsg, as well as the read function that the user will interact with. read() was declared in the ckio.h file, while I defined the ReadReadyMsg class in the .h file as well. * Added the prepareReadSession function to the director, and laid out some scaffolding for the ReadSession chare array. These changes are in ckio.ciI * Wrote the prepareReadSessionHelper function, which sets up all of the reader chares. * Wrote the ReadSession constructor in ckio.C * implemented the ReadSession constructor that initializes the offset and bytes, as well as the ReadSession::readData function which gets the data from memory into chare array to serve to the user * Got the reads to work in parallel. Must fix the memory leaks and implement a method of ending a read session. * Removed the read_id field of the director as it wasn't being used. * Included a (delete this) call at the of ReadAssembler::shareData so that the chare will clean itself up upon invoking the user provided callback after receiving all the necessary data for the read. * New test suite written in io_read under tests. Giving a linker error with �[200~https://github.com/elamdf/ * add missing def include * Added the new test suite and the test file readtest.txt * Added a clearBuffer() method to the chare array ReadSession in order to clean up that buffer memory. This is a stand-in for an actual freeing of the chare array, as broadcasting ckDestroy seems to break the code. * Commented out the printing code that tracks the where the CkIO library is in the line of execution. * io_read: Use relative paths in Makefile * Added a function that gets the data at offset and num bytes sequentially. Then in testRead will cal that seq read function and verifies that parallel read and sequential read get the same data. * Made the gathering of the information point to point and not a broadcast in order to reduce amount of messages sent. * Added the ckDestroy broadcast once closeReadSession is invoked. * Added some comments to give better insight as to what the test for parallel reads is doing. * Tests: Readd deleted io test * Fixed a comment relating to creating the read assembler Changed comment reading "create read assemble" to "create read assembler" on line 156 Co-authored-by: Ronak Buch <[email protected]> * Switched buffer=data.data() to _data_buffer.data() because _data_buffer is what the full read request, while data simply contains the data from a specific IO chare. * Simply memcpy into the vector::data directly instead of allocating a whole new buffer and then copying into the vector * Removed the clearBuffer broadcast because ckDestroy is being broadcasted instead, which will already free all of the memory * Removed the commented ckDestroy broadcast, as it is already being called. * Removed the empty headers directory and the 'touch headers' from Makefile. Also restored the LUSTRE command in the Makefile * Removed ReadAssembler constructor comments: * Removed debugging comments from ReadAssembler::shareData ; * Removed debugging comments from ReadSession::sendData * Removed comments from ReadSession constructor * Removed comment that wasn't helpful from impl::Director::prepareReadSessionHelper * removed debugging statements from impl::Director::::read(), which is called by Ck::IO::read * Removed the comments from ReadSession::readData that were superfluous: * Cleaned up white space in the ckio.ci file * Removed the -verbose from the lb_test Makefile * Removed the periodic_lb_broadcast_test binary ping * Removed the period_selection binary from the meta_lb_test * Removed the extra lines from comment in the ckio.h file * Changed the shareData method from taking vector<char> to char[] and size_t num_bytes for performance boost. * Removed the ckouts from debugging * Changed the std::cerr to CkAbort if opening the file fails in ReadSession::readData * Removed the unistd.h include that wasn't wrapped in the windows checker * Removed the comment touching headeres in the Makfile on line 21 * Instead of push_back with vector in sendData, set the size of vector directly and then insert * Changed the loop to copy data to a memcpy call. Resulted in huge speedup from 3.1sec to 1.7secs when reading 4GB file using benchmark program. * supported tagging reads in class ReadCompleteMsg, Ck::IO::read, ReadAssembler * Changed Ck::IO::read functions to be handled by the manager on each PE rather than the Director chare that was on PE 0. * Edits to the reading tests * Changed ifstream to fread * Added fseek when converting the ifstream to the FILE pointers. Also added a dd command in the tests/charm++/io_read/Makefile to automatically create the file large_test.txt when make is ran * Avoided the extra memcpy in the ReadAssembler::shareData function. Also inserted some logging statements in order to determine when the disk reads are finished. * Start of implementing the zerocopy API by assigning 0-copy post and assigning tags to the buffers. * Fixed bug with the start_idx in Manager::read. By doing (offset - session -> offset) instead of offset, it will find correct ReadSession chare because new value is wrt the actual readsession instead of just the file. * Introduced the CkPostBuffer by deferring a lot of the heavy lifting of read-serving to read assembler. Added tracking of buffer tags on the manager group with getTag and _curr_tag. Adjusted the sendData method to take the buffer tag so that it can correctly call shareData and do the CkMatch. Still have to adjust the shareData method to tag the buffer tag. * EXPERIMENTAL: trying to find why ckarray:1266 is segfaulting when trying to access pointer locMgr. * Implemented a reduction on managers after adding the session and readassembler key-value pair to the map. Used SDAG to correctly invoke the callback after the read session has been setup correctly in startReadSession call. * when the read is done, remove the ReadInfo struct from the map on the ReadAssembler * Potential bug: was using CkIO library read-tags per ReadAssembler rather than from the manager. Potentially meant if there were multiple sessions open, the Zero Copy would have multiple tags of the same value because 2 separate ReadAssemblers would've assigned the same tag on the PE. By using the manager, even if they're part of different sessions, every read within the CkIO library will have different tags. * Changed the API of reads; instead of specifying the number of bytes each IO chare will own, instead specify the number of IO chares that you would like. CkIO will calculate the read stripe from there. Any extra bytes that don't divide evenly will be owned by the last IO chare.: * Changed the default behavior if Options::num_readers is 0 * Moving debugging statements to comments or the DEBUG macro * Laid out the BufferMap class that will be used to layout how the IO chares will be laid out * Added a startReadSession that allows a vector of PEs to be passed in for mapping of the IO Buffer chares. * Added asynchronous reads by spawning a thread to do the fread instead of doing it in the main thread. Use shared future to manage race conditions. * BUG FIX: If the IO buffers didn't evenly divide the session size and the last buffer stored more data, reads that required that extra bit of data wouldn't work properly because the posting of buffers for the Zero Copy wouldn't include that because they'd only do (read_stripe), when there's extra data on the end. Added a conditional testing if the current buffer being explored is the last one, and then simply set the length of ZC posting to be the read_offset + read_size - (read_stripe * i + session.offset). * Removed random string creation in the sendData function. Also made sendData a threaded entry method * Added a polling of the shared_future to see if the data is ready. If the I/O isn't done, suspend the Charm++ UTL and do other work while the I/O, letting the Charm RTS schedule the thread. Might want to add some sort of limit on number of I/O threads allowed per PE * Fixed edge case in the reads where you read into extra bytes in your read request * Renamed ReadSession to BufferChares * Removed random print statements + code that wasn't being used anymore * Comments for future developers, as well as some ideas for improvements/future work * Wrote a basic unit test with 3 Buffer Chares, 10 readers, and a 100 byte test file. Ensure that some reads cross buffer chare boundaries * Added the test recipe to the Makefile - hopefully that will fix all of the failed PRs * Added CkIO to the conditional that links -pthread iff the platform needs it * removed the binary that was accidentally check in; hopefully that makes smp tests pass * Added flag to the end of the tests for io_Read * Fix potential mistake in the for io_read tests * remove commands not involved in building tests from the iotest rule * Improve the formatting of CkIO Reading tests, as well as get rid of the rrors Co-authored-by: Evan Ramos <[email protected]> * Removed both deprecated and unused functions. * Removed commented out file read * Switched the setting of num_readers to numReaders after changing the variable name * Changed the default value for numReaders if not set * Switched to opening the file in binary mode so that windows doesn't mangle the newline character * Trying to combat slow disk times * got rid of MPI version * fixed the friend read functino to have the new signature * memory leak fixed with buffer chare destructor * debug print statements and things * debug print statements * adding debug things but crashing on round 2 * user events debugging, yield macro * adding io updates with yield and call after implementations * got rid of almost all the debu statements. Now we must see if we can clean up serveRead and also if we can add an example to the examples folder. * Added a working test in the tests/ folder and also cleaned up the ckio source code' * renamed read_stripe to read_stride to reflect it measure how much data buffer chares own * fixing bug in test * cleanup test code and add smp * spacing fix * adding sdag threading * switch from open64 (largefile) to plain open * removing unistd.h * changing prints to aborts * updating open permissions to support windows * adding windows read * adding O_BINARY to windows call * adding touch headers to makefile * merged the conflicts - finally mostly eng ready --------- Co-authored-by: Ubuntu <[email protected]> Co-authored-by: Zane Fink <[email protected]> Co-authored-by: Ronak Buch <[email protected]> Co-authored-by: Mathew Jacob <[email protected]> Co-authored-by: Mathew Jacob <[email protected]> Co-authored-by: Mathew Jacob <[email protected]> Co-authored-by: Mathew Jacob <[email protected]> Co-authored-by: Mathew Jacob <[email protected]> Co-authored-by: Evan Ramos <[email protected]> Co-authored-by: Mathew Jacob <[email protected]> Co-authored-by: mtaylo12 <[email protected]> Co-authored-by: Maya Taylor <[email protected]> Co-authored-by: Maya Taylor <[email protected]> Co-authored-by: Mathew Jacob <[email protected]> Co-authored-by: Maya Taylor <[email protected]> Co-authored-by: mtaylo12 <[email protected]> Co-authored-by: Eric Bohm <[email protected]>
1 parent 4f3eed1 commit 4ad4095

File tree

9 files changed

+1396
-411
lines changed

9 files changed

+1396
-411
lines changed

src/libs/ck-libs/io/ckio.C

Lines changed: 894 additions & 335 deletions
Large diffs are not rendered by default.

src/libs/ck-libs/io/ckio.ci

Lines changed: 171 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,76 +1,151 @@
1-
module CkIO {
2-
3-
namespace Ck { namespace IO {
4-
message FileReadyMsg;
5-
message SessionReadyMsg;
6-
message SessionCommitMsg;
7-
}
8-
}
1+
module CkIO
2+
{
3+
namespace Ck
4+
{
5+
namespace IO
6+
{
7+
message FileReadyMsg;
8+
message SessionReadyMsg;
9+
message SessionCommitMsg;
10+
message ReadCompleteMsg;
11+
// used by the read() CkCallback
12+
} // namespace IO
13+
} // namespace Ck
914

1015
initnode _registerCkIO_impl();
1116
};
1217

13-
module CkIO_impl {
18+
module CkIO_impl
19+
{
1420
include "ckio.h";
1521

16-
namespace Ck { namespace IO {
17-
namespace impl {
18-
readonly CProxy_Director director;
22+
namespace Ck
23+
{
24+
namespace IO
25+
{
26+
namespace impl
27+
{
28+
readonly CProxy_Director director;
1929

20-
mainchare [migratable] Director
21-
{
22-
entry Director(CkArgMsg *);
23-
24-
/// Serialize setting up each file through this chare, so that all PEs
25-
/// have the same sequence
26-
entry void openFile(std::string name, CkCallback opened, Options opts);
27-
entry [reductiontarget] void fileOpened(FileToken file);
28-
entry [reductiontarget] void sessionComplete(FileToken file);
29-
30-
entry void prepareWriteSession(FileToken file, size_t bytes, size_t offset,
31-
CkCallback ready, CkCallback complete) {
32-
serial {
33-
prepareWriteSession_helper(file, bytes, offset, ready, complete);
34-
}
35-
when sessionReady[files[file].sessionID](CkReductionMsg *m) serial {
36-
delete m;
37-
ready.send(new SessionReadyMsg(Session(file, bytes, offset,
38-
files[file].session)));
39-
}
40-
};
41-
entry void prepareWriteSession(FileToken file, size_t bytes, size_t offset,
42-
CkCallback ready, const char commitData[commitBytes],
43-
size_t commitBytes, size_t commitOffset,
44-
CkCallback complete) {
45-
serial {
46-
CkCallback committed(CkIndex_Director::sessionDone(NULL), thisProxy);
47-
committed.setRefnum(++sessionID);
48-
prepareWriteSession_helper(file, bytes, offset, ready, committed);
49-
}
50-
when sessionReady[files[file].sessionID](CkReductionMsg *m) serial {
51-
delete m;
52-
ready.send(new SessionReadyMsg(Session(file, bytes, offset,
53-
files[file].session)));
54-
}
55-
when sessionDone[files[file].sessionID](CkReductionMsg *m) serial {
56-
delete m;
57-
impl::FileInfo* info = CkpvAccess(manager)->get(file);
58-
CmiInt8 ret = CmiPwrite(info->fd, commitData, commitBytes, commitOffset);
59-
if (ret != commitBytes)
60-
fatalError("Commit write failed", info->name);
61-
complete.send(CkReductionMsg::buildNew(0, NULL, CkReduction::nop));
62-
}
63-
};
64-
entry void sessionReady(CkReductionMsg *);
65-
entry void sessionDone(CkReductionMsg *);
66-
entry void close(FileToken token, CkCallback closed);
67-
}
30+
mainchare[migratable] Director
31+
{
32+
entry Director(CkArgMsg*);
6833

69-
group [migratable] Manager
70-
{
71-
entry Manager();
72-
entry void run() {
73-
while (true) {
34+
/// Serialize setting up each file through this chare, so that all PEs
35+
/// have the same sequence
36+
37+
entry void openFile(std::string name, CkCallback opened, Options opts);
38+
entry[reductiontarget] void fileOpened(FileToken file);
39+
entry[reductiontarget] void sessionComplete(FileToken file);
40+
41+
// the method used by the director which is used to close a readsession
42+
entry void closeReadSession(Session, CkCallback);
43+
44+
entry void prepareReadSession(FileToken file, size_t bytes, size_t offset,
45+
CkCallback ready){
46+
// the director sets up a read session
47+
serial{prepareReadSessionHelper(file, bytes, offset, ready, std::vector<int>());
48+
}
49+
when sessionReady[files[file].sessionID](CkReductionMsg* m) serial
50+
{ // after the ReadSession chare array is set up, invoke the ready callback
51+
delete m;
52+
Session s(file, bytes, offset, files[file].read_session);
53+
CProxy_ReadAssembler ra = CProxy_ReadAssembler::ckNew(s);
54+
managers.addSessionReadAssemblerMapping(s, ra, ready);
55+
}
56+
57+
when addSessionReadAssemblerFinished(CkReductionMsg* msg) serial
58+
{
59+
ready.send(
60+
new SessionReadyMsg(Session(file, bytes, offset, files[file].read_session)));
61+
delete msg;
62+
}
63+
64+
}; // namespace impl
65+
66+
entry void prepareReadSession(FileToken file, size_t bytes, size_t offset,
67+
CkCallback ready, std::vector<int> pes_to_map){
68+
serial{prepareReadSessionHelper(file, bytes, offset, ready, pes_to_map);
69+
} // namespace IO
70+
71+
when sessionReady[files[file].sessionID](CkReductionMsg* m) serial
72+
{ // after the ReadSession chare array is set up, invoke the ready callback
73+
delete m;
74+
Session s(file, bytes, offset, files[file].read_session);
75+
CProxy_ReadAssembler ra = CProxy_ReadAssembler::ckNew(s);
76+
managers.addSessionReadAssemblerMapping(s, ra, ready);
77+
}
78+
79+
when addSessionReadAssemblerFinished(CkReductionMsg* msg) serial
80+
{
81+
ready.send(
82+
new SessionReadyMsg(Session(file, bytes, offset, files[file].read_session)));
83+
delete msg;
84+
}
85+
86+
}; // namespace Ck
87+
88+
entry void prepareWriteSession(FileToken file, size_t bytes, size_t offset,
89+
CkCallback ready, CkCallback complete){
90+
serial{prepareWriteSession_helper(file, bytes, offset, ready, complete);
91+
}
92+
when sessionReady[files[file].sessionID](CkReductionMsg* m) serial
93+
{
94+
delete m;
95+
ready.send(new SessionReadyMsg(Session(file, bytes, offset, files[file].session)));
96+
}
97+
}
98+
;
99+
entry void prepareWriteSession(FileToken file, size_t bytes, size_t offset,
100+
CkCallback ready, const char commitData[commitBytes],
101+
size_t commitBytes, size_t commitOffset,
102+
CkCallback complete){
103+
serial{CkCallback committed(CkIndex_Director::sessionDone(NULL), thisProxy);
104+
committed.setRefnum(++sessionID);
105+
prepareWriteSession_helper(file, bytes, offset, ready, committed);
106+
}
107+
when sessionReady[files[file].sessionID](CkReductionMsg* m) serial
108+
{
109+
delete m;
110+
ready.send(new SessionReadyMsg(Session(file, bytes, offset, files[file].session)));
111+
}
112+
when sessionDone[files[file].sessionID](CkReductionMsg* m) serial
113+
{
114+
delete m;
115+
impl::FileInfo* info = CkpvAccess(manager)->get(file);
116+
CmiInt8 ret = CmiPwrite(info->fd, commitData, commitBytes, commitOffset);
117+
if (ret != commitBytes)
118+
fatalError("Commit write failed", info->name);
119+
complete.send(CkReductionMsg::buildNew(0, NULL, CkReduction::nop));
120+
}
121+
}
122+
;
123+
124+
entry void sessionReady(CkReductionMsg*);
125+
entry void sessionDone(CkReductionMsg*);
126+
entry void close(FileToken token, CkCallback closed);
127+
entry void addSessionReadAssemblerFinished(CkReductionMsg* msg);
128+
}
129+
130+
// class tht will be used to assemble a specific read call
131+
group ReadAssembler
132+
{
133+
// stores the parameters of the read call it is tasked with building
134+
entry ReadAssembler(Session session);
135+
136+
// the method by which ReadSession objects can send their data for the read
137+
// when invoked, avoid a sender-side copy
138+
entry void shareData(int read_tag, int buffer_tag, size_t read_chare_offset,
139+
size_t num_bytes, nocopypost char data[num_bytes]);
140+
};
141+
142+
group[migratable] Manager
143+
{
144+
entry Manager();
145+
entry void run()
146+
{
147+
while (true)
148+
{
74149
case {
75150
when openFile[opnum](unsigned int opnum_,
76151
FileToken token, std::string name, Options opts)
@@ -84,6 +159,8 @@ module CkIO_impl {
84159
entry void openFile(unsigned int opnum,
85160
FileToken token, std::string name, Options opts);
86161
entry void close(unsigned int opnum, FileToken token, CkCallback closed);
162+
163+
entry void addSessionReadAssemblerMapping(Session session, CProxy_ReadAssembler ra, CkCallback ready);
87164
};
88165

89166
array [1D] WriteSession
@@ -92,12 +169,36 @@ module CkIO_impl {
92169
entry void forwardData(const char data[bytes], size_t bytes, size_t offset);
93170
entry void syncData();
94171
};
172+
95173

96-
group Map : CkArrayMap
97-
{
98-
entry Map();
99-
};
100-
}
174+
array [1D] BufferChares{
175+
entry BufferChares(FileToken file, size_t offset, size_t bytes, size_t num_readers);
176+
// way for the BufferChares object to send bytes over to the ReadAssembler object ra when serving a specific read
177+
178+
entry void sendData(int read_tag, int buffer_tag, size_t offset, size_t bytes, CProxy_ReadAssembler ra, int pe);
179+
180+
entry void sendDataHandler(int read_tag, int buffer_tag, size_t offset, size_t bytes, CProxy_ReadAssembler ra, int pe);
181+
entry [threaded] void monitorRead();
182+
183+
entry void bufferReady() {
184+
while (true) {
185+
when sendData(int read_tag, int buffer_tag, size_t offset, size_t bytes, CProxy_ReadAssembler ra, int pe) {
186+
serial{thisProxy[thisIndex].sendDataHandler(read_tag, buffer_tag, offset, bytes, ra, pe);}
187+
}
188+
}
101189
}
190+
191+
entry [reductiontarget] void printTime(double time_taken);
192+
};
193+
194+
group Map :
195+
CkArrayMap { entry Map(); };
196+
}
197+
group BufferNodeMap : CkArrayMap
198+
{
199+
entry BufferNodeMap();
200+
entry BufferNodeMap(std::vector<int> processors);
201+
};
102202
}
103203
}
204+
}

0 commit comments

Comments
 (0)