Skip to content

Commit 624ae74

Browse files
committed
Added new mode of data transfer replaced py4j with unix pipes
1 parent 3779d50 commit 624ae74

File tree

14 files changed

+2365
-505
lines changed

14 files changed

+2365
-505
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ src/test/scripts/functions/pipelines/intermediates/classification/*
146146

147147
venv
148148
venv/*
149-
149+
.venv
150150
# resource optimization
151151
scripts/resource/output
152152
*.pem

src/main/java/org/apache/sysds/api/PythonDMLScript.java

Lines changed: 73 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@
2424
import org.apache.log4j.Level;
2525
import org.apache.log4j.Logger;
2626
import org.apache.sysds.api.jmlc.Connection;
27+
import org.apache.sysds.common.Types.ValueType;
2728

28-
import org.apache.sysds.common.Types;
2929
import org.apache.sysds.runtime.DMLRuntimeException;
30+
import org.apache.sysds.runtime.frame.data.FrameBlock;
31+
import org.apache.sysds.runtime.frame.data.columns.Array;
3032
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
3133
import org.apache.sysds.runtime.util.CommonThreadPool;
3234
import org.apache.sysds.runtime.util.UnixPipeUtils;
@@ -79,7 +81,7 @@ public static void main(String[] args) throws Exception {
7981
* therefore use logging framework. and terminate program.
8082
*/
8183
LOG.info("failed startup", p4e);
82-
System.exit(-1);
84+
exitHandler.exit(-1);
8385
}
8486
catch(Exception e) {
8587
throw new DMLException("Failed startup and maintaining Python gateway", e);
@@ -116,59 +118,59 @@ public void openPipes(String path, int num) throws IOException {
116118
}
117119
}
118120

119-
public MatrixBlock startReadingMbFromPipe(int id, int rlen, int clen, Types.ValueType type) throws IOException {
121+
public MatrixBlock startReadingMbFromPipe(int id, int rlen, int clen, ValueType type) throws IOException {
120122
long limit = (long) rlen * clen;
121123
LOG.debug("trying to read matrix from "+id+" with "+rlen+" rows and "+clen+" columns. Total size: "+limit);
122124
if(limit > Integer.MAX_VALUE)
123125
throw new DMLRuntimeException("Dense NumPy array of size " + limit +
124126
" cannot be converted to MatrixBlock");
125-
MatrixBlock mb = new MatrixBlock(rlen, clen, false, -1);
127+
MatrixBlock mb;
126128
if(fromPython != null){
127129
BufferedInputStream pipe = fromPython.get(id);
128130
double[] denseBlock = new double[(int) limit];
129-
UnixPipeUtils.readNumpyArrayInBatches(pipe, id, BATCH_SIZE, (int) limit, type, denseBlock, 0);
130-
mb.init(denseBlock, rlen, clen);
131+
long nnz = UnixPipeUtils.readNumpyArrayInBatches(pipe, id, BATCH_SIZE, (int) limit, type, denseBlock, 0);
132+
mb = new MatrixBlock(rlen, clen, denseBlock);
133+
mb.setNonZeros(nnz);
131134
} else {
132135
throw new DMLRuntimeException("FIFO Pipes are not initialized.");
133136
}
134-
mb.recomputeNonZeros();
135-
mb.examSparsity();
136137
LOG.debug("Reading from Python finished");
138+
mb.examSparsity();
137139
return mb;
138140
}
139141

140-
public MatrixBlock startReadingMbFromPipes(int[] blockSizes, int rlen, int clen, Types.ValueType type) throws ExecutionException, InterruptedException {
142+
public MatrixBlock startReadingMbFromPipes(int[] blockSizes, int rlen, int clen, ValueType type) throws ExecutionException, InterruptedException {
141143
long limit = (long) rlen * clen;
142144
if(limit > Integer.MAX_VALUE)
143145
throw new DMLRuntimeException("Dense NumPy array of size " + limit +
144146
" cannot be converted to MatrixBlock");
145-
MatrixBlock mb = new MatrixBlock(rlen, clen, false, -1);
147+
MatrixBlock mb = new MatrixBlock(rlen, clen, false, rlen*clen);
146148
if(fromPython != null){
147149
ExecutorService pool = CommonThreadPool.get();
148150
double[] denseBlock = new double[(int) limit];
149151
int offsetOut = 0;
150-
List<Future<Void>> futures = new ArrayList<>();
152+
List<Future<Long>> futures = new ArrayList<>();
151153
for (int i = 0; i < blockSizes.length; i++) {
152154
BufferedInputStream pipe = fromPython.get(i);
153155
int id = i, blockSize = blockSizes[i], _offsetOut = offsetOut;
154-
Callable<Void> task = () -> {
155-
UnixPipeUtils.readNumpyArrayInBatches(pipe, id, BATCH_SIZE, blockSize, type, denseBlock, _offsetOut);
156-
return null;
156+
Callable<Long> task = () -> {
157+
return UnixPipeUtils.readNumpyArrayInBatches(pipe, id, BATCH_SIZE, blockSize, type, denseBlock, _offsetOut);
157158
};
158159

159160
futures.add(pool.submit(task));
160161
offsetOut += blockSize;
161162
}
162-
// Wait for all tasks and propagate exceptions
163-
for (Future<Void> f : futures) {
164-
f.get();
163+
// Wait for all tasks and propagate exceptions, sum up nonzeros
164+
long nnz = 0;
165+
for (Future<Long> f : futures) {
166+
nnz += f.get();
165167
}
166168

167-
mb.init(denseBlock, rlen, clen);
169+
mb = new MatrixBlock(rlen, clen, denseBlock);
170+
mb.setNonZeros(nnz);
168171
} else {
169172
throw new DMLRuntimeException("FIFO Pipes are not initialized.");
170173
}
171-
mb.recomputeNonZeros();
172174
mb.examSparsity();
173175
return mb;
174176
}
@@ -181,14 +183,51 @@ public void startWritingMbToPipe(int id, MatrixBlock mb) throws IOException {
181183
LOG.debug("Trying to write matrix ["+baseDir + "-"+ id+"] with "+rlen+" rows and "+clen+" columns. Total size: "+numElem*8);
182184

183185
BufferedOutputStream out = toPython.get(id);
184-
long bytes = UnixPipeUtils.writeNumpyArrayInBatches(out, id, BATCH_SIZE, numElem, Types.ValueType.FP64, mb);
186+
long bytes = UnixPipeUtils.writeNumpyArrayInBatches(out, id, BATCH_SIZE, numElem, ValueType.FP64, mb);
185187

186188
LOG.debug("Writing of " + bytes +" Bytes to Python ["+baseDir + "-"+ id+"] finished");
187189
} else {
188190
throw new DMLRuntimeException("FIFO Pipes are not initialized.");
189191
}
190192
}
191193

194+
public void startReadingColFromPipe(int id, FrameBlock fb, int rows, int totalBytes, int col, ValueType type, boolean any) throws IOException {
195+
if (fromPython == null) {
196+
throw new DMLRuntimeException("FIFO Pipes are not initialized.");
197+
}
198+
199+
BufferedInputStream pipe = fromPython.get(id);
200+
LOG.debug("Start reading FrameBlock column from pipe #" + id + " with type " + type);
201+
202+
// Delegate to UnixPipeUtils
203+
Array<?> arr = UnixPipeUtils.readFrameColumnFromPipe(pipe, id, rows, totalBytes, BATCH_SIZE, type);
204+
// Set column into FrameBlock
205+
fb.setColumn(col, arr);
206+
ValueType[] schema = fb.getSchema();
207+
// inplace update the schema for cases: int8 -> int32
208+
schema[col] = arr.getValueType();
209+
210+
LOG.debug("Finished reading FrameBlock column from pipe #" + id);
211+
}
212+
213+
public void startWritingColToPipe(int id, FrameBlock fb, int col) throws IOException {
214+
if (toPython == null) {
215+
throw new DMLRuntimeException("FIFO Pipes are not initialized.");
216+
}
217+
218+
BufferedOutputStream pipe = toPython.get(id);
219+
ValueType type = fb.getSchema()[col];
220+
int rows = fb.getNumRows();
221+
Array<?> array = fb.getColumn(col);
222+
223+
LOG.debug("Start writing FrameBlock column #" + col + " to pipe #" + id + " with type " + type + " and " + rows + " rows");
224+
225+
// Delegate to UnixPipeUtils
226+
long bytes = UnixPipeUtils.writeFrameColumnToPipe(pipe, id, BATCH_SIZE, array, type);
227+
228+
LOG.debug("Finished writing FrameBlock column #" + col + " to pipe #" + id + ". Total bytes: " + bytes);
229+
}
230+
192231
public void closePipes() throws IOException {
193232
LOG.debug("Closing all pipes in Java");
194233
for (BufferedInputStream pipe : fromPython.values())
@@ -198,6 +237,20 @@ public void closePipes() throws IOException {
198237
LOG.debug("Closed all pipes in Java");
199238
}
200239

240+
@FunctionalInterface
241+
public interface ExitHandler {
242+
void exit(int status);
243+
}
244+
245+
private static volatile ExitHandler exitHandler = System::exit;
246+
247+
public static void setExitHandler(ExitHandler handler) {
248+
exitHandler = handler == null ? System::exit : handler;
249+
}
250+
251+
public static void resetExitHandler() {
252+
exitHandler = System::exit;
253+
}
201254
protected static class DMLGateWayListener extends DefaultGatewayServerListener {
202255
private static final Log LOG = LogFactory.getLog(DMLGateWayListener.class.getName());
203256

src/main/java/org/apache/sysds/runtime/frame/data/columns/FloatArray.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ public void reset(int size) {
174174

175175
@Override
176176
public byte[] getAsByteArray() {
177-
ByteBuffer floatBuffer = ByteBuffer.allocate(8 * _size);
177+
ByteBuffer floatBuffer = ByteBuffer.allocate(4 * _size);
178178
floatBuffer.order(ByteOrder.nativeOrder());
179179
for(int i = 0; i < _size; i++)
180180
floatBuffer.putFloat(_data[i]);

src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
* with the License. You may obtain a copy of the License at
99
*
1010
* http://www.apache.org/licenses/LICENSE-2.0
11-
*
11+
*
1212
* Unless required by applicable law or agreed to in writing,
1313
* software distributed under the License is distributed on an
1414
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -91,7 +91,7 @@ public void processInstruction(ExecutionContext ec) {
9191
FrameBlock fin = ec.getFrameInput(input1.getName());
9292
String spec = ec.getScalarInput(input2).getStringValue();
9393
String[] colnames = fin.getColumnNames();
94-
94+
9595
// execute block transform encode
9696
MultiColumnEncoder encoder = EncoderFactory.createEncoder(spec, colnames, fin.getNumColumns(), null);
9797
// TODO: Assign #threads in compiler and pass via the instruction string

0 commit comments

Comments
 (0)