Skip to content

Commit bea9c96

Browse files
NakromaBaunsgaard
authored andcommitted
[SYSTEMDS-3548] Optimize python dataframe transfer
This commit optimizes how the pandas_to_frame_block function accesses Java types. It also fixes a small regression, where exceptions from the parallelization threads weren't propagating exceptions properly. - Fix perftests not working with large, split-up datasets IO datagen splits large datasets into multiple files (for example 100k_1k). This commit makes load_pandas.py and load_numpy.py able to read those. - Add pandas to FrameBlock row-wise parallel processing in the case of cols > rows. It also adds some other small, unused utility methods. - Add javadocs - Adjust Py4jConverterUtilsTest to reflect the code changes in the main class. - adds missing tests for added code in SYSTEMDS-3548. This includes the FrameBlock and Py4jConverterUtils functions, as well as python pandas to systemds io e2e tests. - Fix pandas io test (rows have to be >4) Closes #2189
1 parent 22642a1 commit bea9c96

File tree

8 files changed

+321
-129
lines changed

8 files changed

+321
-129
lines changed

scripts/perftest/python/io/load_numpy.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,12 @@
2828
[
2929
"from systemds.script_building.script import DMLScript",
3030
"import numpy as np",
31-
"array = np.loadtxt(src, delimiter=',')",
31+
"import os",
32+
"if os.path.isdir(src):",
33+
" files = [os.path.join(src, f) for f in os.listdir(src)]",
34+
" array = np.concatenate([np.loadtxt(f, delimiter=',') for f in files])",
35+
"else:",
36+
" array = np.loadtxt(src, delimiter=',')",
3237
"if dtype is not None:",
3338
" array = array.astype(dtype)",
3439
]

scripts/perftest/python/io/load_pandas.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,12 @@
2727
[
2828
"from systemds.script_building.script import DMLScript",
2929
"import pandas as pd",
30-
"df = pd.read_csv(src, header=None)",
30+
"import os",
31+
"if os.path.isdir(src):",
32+
" files = [os.path.join(src, f) for f in os.listdir(src)]",
33+
" df = pd.concat([pd.read_csv(f, header=None) for f in files])",
34+
"else:",
35+
" df = pd.read_csv(src, header=None)",
3136
"if dtype is not None:",
3237
" df = df.astype(dtype)",
3338
]

src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,17 @@ public void reset() {
555555
reset(0, true);
556556
}
557557

558+
/**
559+
* Sets row at position r to the input array of objects, corresponding to the schema.
560+
* @param r row index
561+
* @param row array of objects
562+
*/
563+
public void setRow(int r, Object[] row) {
564+
for (int i = 0; i < row.length; i++) {
565+
set(r, i, row[i]);
566+
}
567+
}
568+
558569
/**
559570
* Append a row to the end of the data frame, where all row fields are boxed objects according to the schema.
560571
*
@@ -753,6 +764,55 @@ else if(column != null && column.size() != _nRow)
753764
_msize = -1;
754765
}
755766

767+
/**
768+
* Appends a chunk of data to the end of a specified column.
769+
*
770+
* @param c column index
771+
* @param chunk chunk of data to append
772+
*/
773+
public void appendColumnChunk(int c, Array<?> chunk) {
774+
if (_coldata == null) {
775+
_coldata = new Array[getNumColumns()];
776+
}
777+
778+
if (_coldata[c] == null) {
779+
_coldata[c] = chunk;
780+
_nRow = chunk.size();
781+
} else {
782+
_coldata[c] = ArrayFactory.append(_coldata[c], chunk);
783+
_nRow += chunk.size();
784+
}
785+
786+
_msize = -1;
787+
}
788+
789+
/**
790+
* Sets a chunk of data to a specified column, starting at the specified offset.
791+
*
792+
* @param c column index
793+
* @param chunk chunk of data to set
794+
* @param offset offset position where it should set the chunk
795+
* @param colSize size of columns, in case columns aren't initialized yet
796+
*/
797+
public void setColumnChunk(int c, Array<?> chunk, int offset, int colSize) {
798+
if (_coldata == null) {
799+
_coldata = new Array[getNumColumns()];
800+
_nRow = colSize;
801+
}
802+
803+
if (_coldata[c] == null) {
804+
_coldata[c] = ArrayFactory.allocate(chunk.getValueType(), _nRow);
805+
}
806+
807+
if (_coldata[c].getValueType() != chunk.getValueType()) {
808+
throw new DMLRuntimeException("ValueType mismatch in setColumnChunk: expected " +
809+
_coldata[c].getValueType() + " but got " + chunk.getValueType());
810+
}
811+
812+
ArrayFactory.set(_coldata[c], chunk, offset, offset + chunk.size() - 1, _nRow);
813+
}
814+
815+
756816
@Override
757817
public void write(DataOutput out) throws IOException {
758818
final boolean isDefaultMeta = isColNamesDefault() && isColumnMetadataDefault();

src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java

Lines changed: 60 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -128,75 +128,81 @@ public static Array<?> convert(byte[] data, int numElements, Types.ValueType val
128128
buffer.order(ByteOrder.LITTLE_ENDIAN);
129129

130130
Array<?> array = ArrayFactory.allocate(valueType, numElements);
131+
readBufferIntoArray(buffer, array, valueType, numElements);
131132

132-
// Process the data based on the value type
133-
switch(valueType) {
134-
case UINT8:
135-
for(int i = 0; i < numElements; i++) {
133+
return array;
134+
}
135+
136+
// Right now row conversion is only supported for if all columns have the same datatype, so this is a placeholder for now that essentially just casts to Object[]
137+
public static Object[] convertRow(byte[] data, int numElements, Types.ValueType valueType) {
138+
Array<?> converted = convert(data, numElements, valueType);
139+
140+
Object[] row = new Object[numElements];
141+
for(int i = 0; i < numElements; i++) {
142+
row[i] = converted.get(i);
143+
}
144+
145+
return row;
146+
}
147+
148+
public static Array<?>[] convertFused(byte[] data, int numElements, Types.ValueType[] valueTypes) {
149+
int numOperations = valueTypes.length;
150+
151+
ByteBuffer buffer = ByteBuffer.wrap(data);
152+
buffer.order(ByteOrder.LITTLE_ENDIAN);
153+
154+
Array<?>[] arrays = new Array<?>[numOperations];
155+
156+
for (int i = 0; i < numOperations; i++) {
157+
arrays[i] = ArrayFactory.allocate(valueTypes[i], numElements);
158+
readBufferIntoArray(buffer, arrays[i], valueTypes[i], numElements);
159+
}
160+
161+
return arrays;
162+
}
163+
164+
private static void readBufferIntoArray(ByteBuffer buffer, Array<?> array, Types.ValueType valueType, int numElements) {
165+
for (int i = 0; i < numElements; i++) {
166+
switch (valueType) {
167+
case UINT8:
136168
array.set(i, (int) (buffer.get() & 0xFF));
137-
}
138-
break;
139-
case INT32:
140-
for(int i = 0; i < numElements; i++) {
141-
array.set(i, buffer.getInt());
142-
}
143-
break;
144-
case INT64:
145-
for(int i = 0; i < numElements; i++) {
146-
array.set(i, buffer.getLong());
147-
}
148-
break;
149-
case FP32:
150-
for(int i = 0; i < numElements; i++) {
169+
break;
170+
case INT32:
171+
case HASH32:
172+
array.set(i, buffer.getInt());
173+
break;
174+
case INT64:
175+
case HASH64:
176+
array.set(i, buffer.getLong());
177+
break;
178+
case FP32:
151179
array.set(i, buffer.getFloat());
152-
}
153-
break;
154-
case FP64:
155-
for(int i = 0; i < numElements; i++) {
180+
break;
181+
case FP64:
156182
array.set(i, buffer.getDouble());
157-
}
158-
break;
159-
case BOOLEAN:
160-
for(int i = 0; i < numElements; i++) {
183+
break;
184+
case BOOLEAN:
161185
if (array instanceof BooleanArray) {
162186
((BooleanArray) array).set(i, buffer.get() != 0);
163187
} else if (array instanceof BitSetArray) {
164188
((BitSetArray) array).set(i, buffer.get() != 0);
165189
} else {
166190
throw new DMLRuntimeException("Array factory returned invalid array type for boolean values.");
167191
}
168-
}
169-
break;
170-
case STRING:
171-
for(int i = 0; i < numElements; i++) {
172-
buffer.order(ByteOrder.BIG_ENDIAN);
173-
int strLen = buffer.getInt();
174-
buffer.order(ByteOrder.LITTLE_ENDIAN);
175-
byte[] strBytes = new byte[strLen];
192+
break;
193+
case STRING:
194+
int strLength = buffer.getInt();
195+
byte[] strBytes = new byte[strLength];
176196
buffer.get(strBytes);
177197
array.set(i, new String(strBytes, StandardCharsets.UTF_8));
178-
}
179-
break;
180-
case CHARACTER:
181-
for(int i = 0; i < numElements; i++) {
198+
break;
199+
case CHARACTER:
182200
array.set(i, buffer.getChar());
183-
}
184-
break;
185-
case HASH32:
186-
for(int i = 0; i < numElements; i++) {
187-
array.set(i, buffer.getInt());
188-
}
189-
break;
190-
case HASH64:
191-
for(int i = 0; i < numElements; i++) {
192-
array.set(i, buffer.getLong());
193-
}
194-
break;
195-
default:
196-
throw new DMLRuntimeException("Unsupported value type: " + valueType);
201+
break;
202+
default:
203+
throw new DMLRuntimeException("Unsupported value type: " + valueType);
204+
}
197205
}
198-
199-
return array;
200206
}
201207

202208
public static byte[] convertMBtoPy4JDenseArr(MatrixBlock mb) {

0 commit comments

Comments
 (0)