Skip to content

Commit 8e9aaa5

Browse files
authored
[SYSTEMDS-3911] OOC Transpose operation (#2316)
This patch introduces the TransposeOOCInstruction, a fundamental component of the out-of-core (OOC) backend. It performs a full transpose on a matrix stream, enabling the composition of complex OOC pipelines required for algorithms like lmDS. Implementation detail: * Asynchronous Producer: The processInstruction method launches a background thread to perform the transpose operation but returns control to the main thread immediately. This allows the compiler to continue building the execution plan without blocking. The actual computation is triggered when a downstream consumer "pulls" data from the output stream. * Streaming Logic: The background thread consumes a stream of IndexedMatrixValue blocks from its input. For each block, it: - Performs an in-memory transpose using the standard reorgOperations with a ReorgOperator. - Crucially, it also transposes the MatrixIndexes of the block (e.g., a block at (row=i, col=j) becomes a block at (row=j, col=i)). - Enqueues the new, transposed IndexedMatrixValue into the output stream. * Integration: The new instruction is fully integrated into the system: - A Reorg type has been added to the OOCType enum. - The OOCInstructionParser has been updated to recognize the r' opcode for OOC execution and route it to the TransposeOOCInstruction. - The new Reorg OOC type is now registered with the OOCInstructionParser to correctly route the r' opcode to this new instruction.
1 parent 2cb1a60 commit 8e9aaa5

File tree

5 files changed

+254
-1
lines changed

5 files changed

+254
-1
lines changed

src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.sysds.runtime.instructions.ooc.ReblockOOCInstruction;
3030
import org.apache.sysds.runtime.instructions.ooc.UnaryOOCInstruction;
3131
import org.apache.sysds.runtime.instructions.ooc.MatrixVectorBinaryOOCInstruction;
32+
import org.apache.sysds.runtime.instructions.ooc.TransposeOOCInstruction;
3233

3334
public class OOCInstructionParser extends InstructionParser {
3435
protected static final Log LOG = LogFactory.getLog(OOCInstructionParser.class.getName());
@@ -60,6 +61,8 @@ public static OOCInstruction parseSingleInstruction(InstructionType ooctype, Str
6061
case AggregateBinary:
6162
case MAPMM:
6263
return MatrixVectorBinaryOOCInstruction.parseInstruction(str);
64+
case Reorg:
65+
return TransposeOOCInstruction.parseInstruction(str);
6366

6467
default:
6568
throw new DMLRuntimeException("Invalid OOC Instruction Type: " + ooctype);

src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public abstract class OOCInstruction extends Instruction {
3030
protected static final Log LOG = LogFactory.getLog(OOCInstruction.class.getName());
3131

3232
public enum OOCType {
33-
Reblock, AggregateUnary, Binary, Unary, MAPMM, AggregateBinary
33+
Reblock, AggregateUnary, Binary, Unary, MAPMM, Reorg, AggregateBinary
3434
}
3535

3636
protected final OOCInstruction.OOCType _ooctype;
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.sysds.runtime.instructions.ooc;
21+
22+
import org.apache.sysds.runtime.DMLRuntimeException;
23+
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
24+
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
25+
import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
26+
import org.apache.sysds.runtime.functionobjects.SwapIndex;
27+
import org.apache.sysds.runtime.instructions.InstructionUtils;
28+
import org.apache.sysds.runtime.instructions.cp.CPOperand;
29+
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
30+
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
31+
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
32+
import org.apache.sysds.runtime.matrix.operators.ReorgOperator;
33+
import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
34+
import org.apache.sysds.runtime.util.CommonThreadPool;
35+
36+
import java.util.concurrent.ExecutorService;
37+
38+
public class TransposeOOCInstruction extends ComputationOOCInstruction {
39+
40+
protected TransposeOOCInstruction(OOCType type, ReorgOperator op, CPOperand in1, CPOperand out, String opcode, String istr) {
41+
super(type, op, in1, out, opcode, istr);
42+
43+
}
44+
45+
public static TransposeOOCInstruction parseInstruction(String str) {
46+
String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
47+
InstructionUtils.checkNumFields(parts, 2);
48+
String opcode = parts[0];
49+
CPOperand in1 = new CPOperand(parts[1]);
50+
CPOperand out = new CPOperand(parts[2]);
51+
52+
ReorgOperator reorg = new ReorgOperator(SwapIndex.getSwapIndexFnObject());
53+
return new TransposeOOCInstruction(OOCType.Reorg, reorg, in1, out, opcode, str);
54+
}
55+
56+
public void processInstruction( ExecutionContext ec ) {
57+
58+
// Create thread and process the transpose operation
59+
MatrixObject min = ec.getMatrixObject(input1);
60+
LocalTaskQueue<IndexedMatrixValue> qIn = min.getStreamHandle();
61+
LocalTaskQueue<IndexedMatrixValue> qOut = new LocalTaskQueue<>();
62+
ec.getMatrixObject(output).setStreamHandle(qOut);
63+
64+
65+
ExecutorService pool = CommonThreadPool.get();
66+
try {
67+
pool.submit(() -> {
68+
IndexedMatrixValue tmp = null;
69+
try {
70+
while ((tmp = qIn.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) {
71+
MatrixBlock inBlock = (MatrixBlock)tmp.getValue();
72+
long oldRowIdx = tmp.getIndexes().getRowIndex();
73+
long oldColIdx = tmp.getIndexes().getColumnIndex();
74+
75+
MatrixBlock outBlock = inBlock.reorgOperations((ReorgOperator) _optr, new MatrixBlock(), -1, -1, -1);
76+
qOut.enqueueTask(new IndexedMatrixValue(new MatrixIndexes(oldColIdx, oldRowIdx), outBlock));
77+
}
78+
qOut.closeInput();
79+
}
80+
catch(Exception ex) {
81+
throw new DMLRuntimeException(ex);
82+
}
83+
});
84+
} catch (Exception ex) {
85+
throw new DMLRuntimeException(ex);
86+
} finally {
87+
pool.shutdown();
88+
}
89+
}
90+
}
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.sysds.test.functions.ooc;
21+
22+
import org.apache.sysds.common.Opcodes;
23+
import org.apache.sysds.common.Types;
24+
import org.apache.sysds.runtime.instructions.Instruction;
25+
import org.apache.sysds.runtime.io.MatrixWriter;
26+
import org.apache.sysds.runtime.io.MatrixWriterFactory;
27+
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
28+
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
29+
import org.apache.sysds.runtime.util.DataConverter;
30+
import org.apache.sysds.runtime.util.HDFSTool;
31+
import org.apache.sysds.test.AutomatedTestBase;
32+
import org.apache.sysds.test.TestConfiguration;
33+
import org.apache.sysds.test.TestUtils;
34+
import org.junit.Assert;
35+
import org.junit.Test;
36+
37+
import java.io.IOException;
38+
39+
public class TransposeTest extends AutomatedTestBase {
40+
private final static String TEST_NAME1 = "Transpose";
41+
private final static String TEST_DIR = "functions/ooc/";
42+
private final static String TEST_CLASS_DIR = TEST_DIR + TransposeTest.class.getSimpleName() + "/";
43+
private final static double eps = 1e-10;
44+
private static final String INPUT_NAME = "X";
45+
private static final String OUTPUT_NAME = "res";
46+
47+
private final static int rows = 1000;
48+
private final static int cols_wide = 1000;
49+
private final static int cols_skinny = 500;
50+
51+
private final static double sparsity1 = 0.7;
52+
private final static double sparsity2 = 0.1;
53+
54+
@Override
55+
public void setUp() {
56+
TestUtils.clearAssertionInformation();
57+
TestConfiguration config = new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1);
58+
addTestConfiguration(TEST_NAME1, config);
59+
}
60+
61+
@Test
62+
public void testTranspose1() {
63+
runTransposeTest(cols_wide, false);
64+
}
65+
66+
// @Test
67+
// public void testTranspose2() {
68+
// runTransposeTest(cols_skinny, false);
69+
// }
70+
71+
private void runTransposeTest(int cols, boolean sparse )
72+
{
73+
Types.ExecMode platformOld = setExecMode(Types.ExecMode.SINGLE_NODE);
74+
75+
try
76+
{
77+
getAndLoadTestConfiguration(TEST_NAME1);
78+
String HOME = SCRIPT_DIR + TEST_DIR;
79+
fullDMLScriptName = HOME + TEST_NAME1 + ".dml";
80+
programArgs = new String[]{"-explain", "-stats", "-ooc",
81+
"-args", input(INPUT_NAME), output(OUTPUT_NAME)};
82+
83+
// 1. Generate the data as MatrixBlock object
84+
double[][] A_data = getRandomMatrix(rows, cols, 0, 1, sparse?sparsity2:sparsity1, 10);
85+
86+
// 2. Convert the double arrays to MatrixBlock object
87+
MatrixBlock A_mb = DataConverter.convertToMatrixBlock(A_data);
88+
89+
// 3. Create a binary matrix writer
90+
MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(Types.FileFormat.BINARY);
91+
92+
// 4. Write matrix A to a binary SequenceFile
93+
writer.writeMatrixToHDFS(A_mb, input(INPUT_NAME), rows, cols, 1000, A_mb.getNonZeros());
94+
HDFSTool.writeMetaDataFile(input(INPUT_NAME + ".mtd"), Types.ValueType.FP64,
95+
new MatrixCharacteristics(rows, cols, 1000, A_mb.getNonZeros()), Types.FileFormat.BINARY);
96+
97+
boolean exceptionExpected = false;
98+
runTest(true, exceptionExpected, null, -1);
99+
100+
double[][] C1 = readMatrix(output(OUTPUT_NAME), Types.FileFormat.BINARY, rows, cols, 1000, 1000);
101+
double result = 0.0;
102+
for(int i = 0; i < rows; i++) { // verify the results with Java
103+
double expected = 0.0;
104+
for(int j = 0; j < cols; j++) {
105+
expected = A_mb.get(i, j);
106+
result = C1[j][i];
107+
Assert.assertEquals(expected, result, eps);
108+
}
109+
110+
}
111+
112+
String prefix = Instruction.OOC_INST_PREFIX;
113+
Assert.assertTrue("OOC wasn't used for RBLK",
114+
heavyHittersContainsString(prefix + Opcodes.RBLK));
115+
Assert.assertTrue("OOC wasn't used for TRANSPOSE",
116+
heavyHittersContainsString(prefix + Opcodes.TRANSPOSE));
117+
}
118+
catch (IOException e) {
119+
throw new RuntimeException(e);
120+
}
121+
finally {
122+
resetExecMode(platformOld);
123+
}
124+
}
125+
126+
private static double[][] readMatrix(String fname, Types.FileFormat fmt, long rows, long cols, int brows, int bcols )
127+
throws IOException
128+
{
129+
MatrixBlock mb = DataConverter.readMatrixFromHDFS(fname, fmt, rows, cols, brows, bcols);
130+
double[][] C = DataConverter.convertToDoubleMatrix(mb);
131+
return C;
132+
}
133+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
#-------------------------------------------------------------
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
#
20+
#-------------------------------------------------------------
21+
22+
# Read the input matrix as a stream
23+
X = read($1);
24+
25+
res = t(X);
26+
27+
write(res, $2, format="binary");

0 commit comments

Comments
 (0)