Skip to content

Commit 0e8e966

Browse files
committed
[SYSTEMDS-3902] Faster data transfer for Python Frames to Java Runtime by using pipes instead of py4j
This patch extends the previously added data transfer with unix pipes to SystemDS frame transfer capabilities. Additionally, the matrix transfer was further improved by fusing the non-zero value computation into data reading and reducing unnecessary array allocations. Closes #2363.
1 parent f885840 commit 0e8e966

File tree

14 files changed

+2318
-451
lines changed

14 files changed

+2318
-451
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ src/test/scripts/functions/pipelines/intermediates/classification/*
146146

147147
venv
148148
venv/*
149-
149+
.venv
150150
# resource optimization
151151
scripts/resource/output
152152
*.pem

src/main/java/org/apache/sysds/api/PythonDMLScript.java

Lines changed: 76 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
/*
22
* Licensed to the Apache Software Foundation (ASF) under one
3-
* or more contributor license agreements. See the NOTICE file
3+
* or more contributor license agreements. See the NOTICE file
44
* distributed with this work for additional information
55
* regarding copyright ownership. The ASF licenses this file
66
* to you under the Apache License, Version 2.0 (the
77
* "License"); you may not use this file except in compliance
88
* with the License. You may obtain a copy of the License at
99
*
10-
* http://www.apache.org/licenses/LICENSE-2.0
10+
* http://www.apache.org/licenses/LICENSE-2.0
1111
*
1212
* Unless required by applicable law or agreed to in writing,
1313
* software distributed under the License is distributed on an
1414
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15-
* KIND, either express or implied. See the License for the
15+
* KIND, either express or implied. See the License for the
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
@@ -24,9 +24,11 @@
2424
import org.apache.log4j.Level;
2525
import org.apache.log4j.Logger;
2626
import org.apache.sysds.api.jmlc.Connection;
27+
import org.apache.sysds.common.Types.ValueType;
2728

28-
import org.apache.sysds.common.Types;
2929
import org.apache.sysds.runtime.DMLRuntimeException;
30+
import org.apache.sysds.runtime.frame.data.FrameBlock;
31+
import org.apache.sysds.runtime.frame.data.columns.Array;
3032
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
3133
import org.apache.sysds.runtime.util.CommonThreadPool;
3234
import org.apache.sysds.runtime.util.UnixPipeUtils;
@@ -79,7 +81,7 @@ public static void main(String[] args) throws Exception {
7981
* therefore use logging framework. and terminate program.
8082
*/
8183
LOG.info("failed startup", p4e);
82-
System.exit(-1);
84+
exitHandler.exit(-1);
8385
}
8486
catch(Exception e) {
8587
throw new DMLException("Failed startup and maintaining Python gateway", e);
@@ -116,59 +118,59 @@ public void openPipes(String path, int num) throws IOException {
116118
}
117119
}
118120

119-
public MatrixBlock startReadingMbFromPipe(int id, int rlen, int clen, Types.ValueType type) throws IOException {
121+
public MatrixBlock startReadingMbFromPipe(int id, int rlen, int clen, ValueType type) throws IOException {
120122
long limit = (long) rlen * clen;
121123
LOG.debug("trying to read matrix from "+id+" with "+rlen+" rows and "+clen+" columns. Total size: "+limit);
122124
if(limit > Integer.MAX_VALUE)
123125
throw new DMLRuntimeException("Dense NumPy array of size " + limit +
124126
" cannot be converted to MatrixBlock");
125-
MatrixBlock mb = new MatrixBlock(rlen, clen, false, -1);
127+
MatrixBlock mb;
126128
if(fromPython != null){
127129
BufferedInputStream pipe = fromPython.get(id);
128130
double[] denseBlock = new double[(int) limit];
129-
UnixPipeUtils.readNumpyArrayInBatches(pipe, id, BATCH_SIZE, (int) limit, type, denseBlock, 0);
130-
mb.init(denseBlock, rlen, clen);
131+
long nnz = UnixPipeUtils.readNumpyArrayInBatches(pipe, id, BATCH_SIZE, (int) limit, type, denseBlock, 0);
132+
mb = new MatrixBlock(rlen, clen, denseBlock);
133+
mb.setNonZeros(nnz);
131134
} else {
132135
throw new DMLRuntimeException("FIFO Pipes are not initialized.");
133136
}
134-
mb.recomputeNonZeros();
135-
mb.examSparsity();
136137
LOG.debug("Reading from Python finished");
138+
mb.examSparsity();
137139
return mb;
138140
}
139141

140-
public MatrixBlock startReadingMbFromPipes(int[] blockSizes, int rlen, int clen, Types.ValueType type) throws ExecutionException, InterruptedException {
142+
public MatrixBlock startReadingMbFromPipes(int[] blockSizes, int rlen, int clen, ValueType type) throws ExecutionException, InterruptedException {
141143
long limit = (long) rlen * clen;
142144
if(limit > Integer.MAX_VALUE)
143145
throw new DMLRuntimeException("Dense NumPy array of size " + limit +
144146
" cannot be converted to MatrixBlock");
145-
MatrixBlock mb = new MatrixBlock(rlen, clen, false, -1);
147+
MatrixBlock mb = new MatrixBlock(rlen, clen, false, rlen*clen);
146148
if(fromPython != null){
147149
ExecutorService pool = CommonThreadPool.get();
148150
double[] denseBlock = new double[(int) limit];
149151
int offsetOut = 0;
150-
List<Future<Void>> futures = new ArrayList<>();
152+
List<Future<Long>> futures = new ArrayList<>();
151153
for (int i = 0; i < blockSizes.length; i++) {
152154
BufferedInputStream pipe = fromPython.get(i);
153155
int id = i, blockSize = blockSizes[i], _offsetOut = offsetOut;
154-
Callable<Void> task = () -> {
155-
UnixPipeUtils.readNumpyArrayInBatches(pipe, id, BATCH_SIZE, blockSize, type, denseBlock, _offsetOut);
156-
return null;
156+
Callable<Long> task = () -> {
157+
return UnixPipeUtils.readNumpyArrayInBatches(pipe, id, BATCH_SIZE, blockSize, type, denseBlock, _offsetOut);
157158
};
158159

159160
futures.add(pool.submit(task));
160161
offsetOut += blockSize;
161162
}
162-
// Wait for all tasks and propagate exceptions
163-
for (Future<Void> f : futures) {
164-
f.get();
163+
// Wait for all tasks and propagate exceptions, sum up nonzeros
164+
long nnz = 0;
165+
for (Future<Long> f : futures) {
166+
nnz += f.get();
165167
}
166168

167-
mb.init(denseBlock, rlen, clen);
169+
mb = new MatrixBlock(rlen, clen, denseBlock);
170+
mb.setNonZeros(nnz);
168171
} else {
169172
throw new DMLRuntimeException("FIFO Pipes are not initialized.");
170173
}
171-
mb.recomputeNonZeros();
172174
mb.examSparsity();
173175
return mb;
174176
}
@@ -181,14 +183,51 @@ public void startWritingMbToPipe(int id, MatrixBlock mb) throws IOException {
181183
LOG.debug("Trying to write matrix ["+baseDir + "-"+ id+"] with "+rlen+" rows and "+clen+" columns. Total size: "+numElem*8);
182184

183185
BufferedOutputStream out = toPython.get(id);
184-
long bytes = UnixPipeUtils.writeNumpyArrayInBatches(out, id, BATCH_SIZE, numElem, Types.ValueType.FP64, mb);
186+
long bytes = UnixPipeUtils.writeNumpyArrayInBatches(out, id, BATCH_SIZE, numElem, ValueType.FP64, mb);
185187

186188
LOG.debug("Writing of " + bytes +" Bytes to Python ["+baseDir + "-"+ id+"] finished");
187189
} else {
188190
throw new DMLRuntimeException("FIFO Pipes are not initialized.");
189191
}
190192
}
191193

194+
public void startReadingColFromPipe(int id, FrameBlock fb, int rows, int totalBytes, int col, ValueType type, boolean any) throws IOException {
195+
if (fromPython == null) {
196+
throw new DMLRuntimeException("FIFO Pipes are not initialized.");
197+
}
198+
199+
BufferedInputStream pipe = fromPython.get(id);
200+
LOG.debug("Start reading FrameBlock column from pipe #" + id + " with type " + type);
201+
202+
// Delegate to UnixPipeUtils
203+
Array<?> arr = UnixPipeUtils.readFrameColumnFromPipe(pipe, id, rows, totalBytes, BATCH_SIZE, type);
204+
// Set column into FrameBlock
205+
fb.setColumn(col, arr);
206+
ValueType[] schema = fb.getSchema();
207+
// inplace update the schema for cases: int8 -> int32
208+
schema[col] = arr.getValueType();
209+
210+
LOG.debug("Finished reading FrameBlock column from pipe #" + id);
211+
}
212+
213+
public void startWritingColToPipe(int id, FrameBlock fb, int col) throws IOException {
214+
if (toPython == null) {
215+
throw new DMLRuntimeException("FIFO Pipes are not initialized.");
216+
}
217+
218+
BufferedOutputStream pipe = toPython.get(id);
219+
ValueType type = fb.getSchema()[col];
220+
int rows = fb.getNumRows();
221+
Array<?> array = fb.getColumn(col);
222+
223+
LOG.debug("Start writing FrameBlock column #" + col + " to pipe #" + id + " with type " + type + " and " + rows + " rows");
224+
225+
// Delegate to UnixPipeUtils
226+
long bytes = UnixPipeUtils.writeFrameColumnToPipe(pipe, id, BATCH_SIZE, array, type);
227+
228+
LOG.debug("Finished writing FrameBlock column #" + col + " to pipe #" + id + ". Total bytes: " + bytes);
229+
}
230+
192231
public void closePipes() throws IOException {
193232
LOG.debug("Closing all pipes in Java");
194233
for (BufferedInputStream pipe : fromPython.values())
@@ -198,6 +237,20 @@ public void closePipes() throws IOException {
198237
LOG.debug("Closed all pipes in Java");
199238
}
200239

240+
@FunctionalInterface
241+
public interface ExitHandler {
242+
void exit(int status);
243+
}
244+
245+
private static volatile ExitHandler exitHandler = System::exit;
246+
247+
public static void setExitHandler(ExitHandler handler) {
248+
exitHandler = handler == null ? System::exit : handler;
249+
}
250+
251+
public static void resetExitHandler() {
252+
exitHandler = System::exit;
253+
}
201254
protected static class DMLGateWayListener extends DefaultGatewayServerListener {
202255
private static final Log LOG = LogFactory.getLog(DMLGateWayListener.class.getName());
203256

src/main/java/org/apache/sysds/runtime/frame/data/columns/FloatArray.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ public void reset(int size) {
174174

175175
@Override
176176
public byte[] getAsByteArray() {
177-
ByteBuffer floatBuffer = ByteBuffer.allocate(8 * _size);
177+
ByteBuffer floatBuffer = ByteBuffer.allocate(4 * _size);
178178
floatBuffer.order(ByteOrder.nativeOrder());
179179
for(int i = 0; i < _size; i++)
180180
floatBuffer.putFloat(_data[i]);

src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
* with the License. You may obtain a copy of the License at
99
*
1010
* http://www.apache.org/licenses/LICENSE-2.0
11-
*
11+
*
1212
* Unless required by applicable law or agreed to in writing,
1313
* software distributed under the License is distributed on an
1414
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -91,7 +91,7 @@ public void processInstruction(ExecutionContext ec) {
9191
FrameBlock fin = ec.getFrameInput(input1.getName());
9292
String spec = ec.getScalarInput(input2).getStringValue();
9393
String[] colnames = fin.getColumnNames();
94-
94+
9595
// execute block transform encode
9696
MultiColumnEncoder encoder = EncoderFactory.createEncoder(spec, colnames, fin.getNumColumns(), null);
9797
// TODO: Assign #threads in compiler and pass via the instruction string

0 commit comments

Comments
 (0)