Skip to content

Commit 3cc44c7

Browse files
committed
Improve diff readability by adjusting tabs
1 parent 3fc3c03 commit 3cc44c7

File tree

5 files changed

+126
-126
lines changed

5 files changed

+126
-126
lines changed

src/main/java/org/apache/sysds/runtime/instructions/ooc/AggregateUnaryOOCInstruction.java

Lines changed: 55 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -92,65 +92,65 @@ public void processInstruction( ExecutionContext ec ) {
9292
ec.getMatrixObject(output).setStreamHandle(qOut);
9393

9494
submitOOCTask(() -> {
95-
IndexedMatrixValue tmp = null;
96-
try {
97-
while((tmp = q.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) {
98-
long idx = aggun.isRowAggregate() ?
99-
tmp.getIndexes().getRowIndex() : tmp.getIndexes().getColumnIndex();
100-
MatrixBlock ret = aggTracker.get(idx);
101-
if(ret != null) {
102-
MatrixBlock corr = corrs.get(idx);
103-
104-
// aggregation
105-
MatrixBlock ltmp = (MatrixBlock) ((MatrixBlock) tmp.getValue())
106-
.aggregateUnaryOperations(aggun, new MatrixBlock(), blen, tmp.getIndexes());
107-
OperationsOnMatrixValues.incrementalAggregation(ret,
108-
_aop.existsCorrection() ? corr : null, ltmp, _aop, true);
109-
110-
if (!aggTracker.putAndIncrementCount(idx, ret)){
111-
corrs.replace(idx, corr);
112-
continue;
95+
IndexedMatrixValue tmp = null;
96+
try {
97+
while((tmp = q.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) {
98+
long idx = aggun.isRowAggregate() ?
99+
tmp.getIndexes().getRowIndex() : tmp.getIndexes().getColumnIndex();
100+
MatrixBlock ret = aggTracker.get(idx);
101+
if(ret != null) {
102+
MatrixBlock corr = corrs.get(idx);
103+
104+
// aggregation
105+
MatrixBlock ltmp = (MatrixBlock) ((MatrixBlock) tmp.getValue())
106+
.aggregateUnaryOperations(aggun, new MatrixBlock(), blen, tmp.getIndexes());
107+
OperationsOnMatrixValues.incrementalAggregation(ret,
108+
_aop.existsCorrection() ? corr : null, ltmp, _aop, true);
109+
110+
if (!aggTracker.putAndIncrementCount(idx, ret)){
111+
corrs.replace(idx, corr);
112+
continue;
113+
}
113114
}
114-
}
115-
else {
116-
// first block for this idx - init aggregate and correction
117-
// TODO avoid corr block for inplace incremental aggregation
118-
int rows = tmp.getValue().getNumRows();
119-
int cols = tmp.getValue().getNumColumns();
120-
int extra = _aop.correction.getNumRemovedRowsColumns();
121-
ret = aggun.isRowAggregate()? new MatrixBlock(rows, 1 + extra, false) : new MatrixBlock(1 + extra, cols, false);
122-
MatrixBlock corr = aggun.isRowAggregate()? new MatrixBlock(rows, 1 + extra, false) : new MatrixBlock(1 + extra, cols, false);
123-
124-
// aggregation
125-
MatrixBlock ltmp = (MatrixBlock) ((MatrixBlock) tmp.getValue()).aggregateUnaryOperations(
126-
aggun, new MatrixBlock(), blen, tmp.getIndexes());
127-
OperationsOnMatrixValues.incrementalAggregation(ret,
128-
_aop.existsCorrection() ? corr : null, ltmp, _aop, true);
129-
130-
if(emitThreshold > 1){
131-
aggTracker.putAndIncrementCount(idx, ret);
132-
corrs.put(idx, corr);
133-
continue;
115+
else {
116+
// first block for this idx - init aggregate and correction
117+
// TODO avoid corr block for inplace incremental aggregation
118+
int rows = tmp.getValue().getNumRows();
119+
int cols = tmp.getValue().getNumColumns();
120+
int extra = _aop.correction.getNumRemovedRowsColumns();
121+
ret = aggun.isRowAggregate()? new MatrixBlock(rows, 1 + extra, false) : new MatrixBlock(1 + extra, cols, false);
122+
MatrixBlock corr = aggun.isRowAggregate()? new MatrixBlock(rows, 1 + extra, false) : new MatrixBlock(1 + extra, cols, false);
123+
124+
// aggregation
125+
MatrixBlock ltmp = (MatrixBlock) ((MatrixBlock) tmp.getValue()).aggregateUnaryOperations(
126+
aggun, new MatrixBlock(), blen, tmp.getIndexes());
127+
OperationsOnMatrixValues.incrementalAggregation(ret,
128+
_aop.existsCorrection() ? corr : null, ltmp, _aop, true);
129+
130+
if(emitThreshold > 1){
131+
aggTracker.putAndIncrementCount(idx, ret);
132+
corrs.put(idx, corr);
133+
continue;
134+
}
134135
}
135-
}
136136

137-
// all input blocks for this idx processed - emit aggregated block
138-
ret.dropLastRowsOrColumns(_aop.correction);
139-
MatrixIndexes midx = aggun.isRowAggregate() ?
140-
new MatrixIndexes(tmp.getIndexes().getRowIndex(), 1) :
141-
new MatrixIndexes(1, tmp.getIndexes().getColumnIndex());
142-
IndexedMatrixValue tmpOut = new IndexedMatrixValue(midx, ret);
143-
144-
qOut.enqueueTask(tmpOut);
145-
// drop intermediate states
146-
aggTracker.remove(idx);
147-
corrs.remove(idx);
137+
// all input blocks for this idx processed - emit aggregated block
138+
ret.dropLastRowsOrColumns(_aop.correction);
139+
MatrixIndexes midx = aggun.isRowAggregate() ?
140+
new MatrixIndexes(tmp.getIndexes().getRowIndex(), 1) :
141+
new MatrixIndexes(1, tmp.getIndexes().getColumnIndex());
142+
IndexedMatrixValue tmpOut = new IndexedMatrixValue(midx, ret);
143+
144+
qOut.enqueueTask(tmpOut);
145+
// drop intermediate states
146+
aggTracker.remove(idx);
147+
corrs.remove(idx);
148+
}
149+
qOut.closeInput();
150+
}
151+
catch(Exception ex) {
152+
throw new DMLRuntimeException(ex);
148153
}
149-
qOut.closeInput();
150-
}
151-
catch(Exception ex) {
152-
throw new DMLRuntimeException(ex);
153-
}
154154
}, q, qOut);
155155
}
156156
// full aggregation

src/main/java/org/apache/sysds/runtime/instructions/ooc/BinaryOOCInstruction.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -71,19 +71,19 @@ public void processInstruction( ExecutionContext ec ) {
7171
ec.getMatrixObject(output).setStreamHandle(qOut);
7272

7373
submitOOCTask(() -> {
74-
IndexedMatrixValue tmp = null;
75-
try {
76-
while((tmp = qIn.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) {
77-
IndexedMatrixValue tmpOut = new IndexedMatrixValue();
78-
tmpOut.set(tmp.getIndexes(),
79-
tmp.getValue().scalarOperations(sc_op, new MatrixBlock()));
80-
qOut.enqueueTask(tmpOut);
74+
IndexedMatrixValue tmp = null;
75+
try {
76+
while((tmp = qIn.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) {
77+
IndexedMatrixValue tmpOut = new IndexedMatrixValue();
78+
tmpOut.set(tmp.getIndexes(),
79+
tmp.getValue().scalarOperations(sc_op, new MatrixBlock()));
80+
qOut.enqueueTask(tmpOut);
81+
}
82+
qOut.closeInput();
83+
}
84+
catch(Exception ex) {
85+
throw new DMLRuntimeException(ex);
8186
}
82-
qOut.closeInput();
83-
}
84-
catch(Exception ex) {
85-
throw new DMLRuntimeException(ex);
86-
}
8787
}, qIn, qOut);
8888
}
8989
}

src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixVectorBinaryOOCInstruction.java

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -91,46 +91,46 @@ public void processInstruction( ExecutionContext ec ) {
9191
ec.getMatrixObject(output).setStreamHandle(qOut);
9292

9393
submitOOCTask(() -> {
94-
IndexedMatrixValue tmp = null;
95-
try {
96-
while((tmp = qIn.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) {
97-
MatrixBlock matrixBlock = (MatrixBlock) tmp.getValue();
98-
long rowIndex = tmp.getIndexes().getRowIndex();
99-
long colIndex = tmp.getIndexes().getColumnIndex();
100-
MatrixBlock vectorSlice = partitionedVector.get(colIndex);
101-
102-
// Now, call the operation with the correct, specific operator.
103-
MatrixBlock partialResult = matrixBlock.aggregateBinaryOperations(
104-
matrixBlock, vectorSlice, new MatrixBlock(), (AggregateBinaryOperator) _optr);
105-
106-
// for single column block, no aggregation neeeded
107-
if(emitThreshold == 1) {
108-
qOut.enqueueTask(new IndexedMatrixValue(tmp.getIndexes(), partialResult));
109-
}
110-
else {
111-
// aggregation
112-
MatrixBlock currAgg = aggTracker.get(rowIndex);
113-
if (currAgg == null) {
114-
aggTracker.putAndIncrementCount(rowIndex, partialResult);
94+
IndexedMatrixValue tmp = null;
95+
try {
96+
while((tmp = qIn.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) {
97+
MatrixBlock matrixBlock = (MatrixBlock) tmp.getValue();
98+
long rowIndex = tmp.getIndexes().getRowIndex();
99+
long colIndex = tmp.getIndexes().getColumnIndex();
100+
MatrixBlock vectorSlice = partitionedVector.get(colIndex);
101+
102+
// Now, call the operation with the correct, specific operator.
103+
MatrixBlock partialResult = matrixBlock.aggregateBinaryOperations(
104+
matrixBlock, vectorSlice, new MatrixBlock(), (AggregateBinaryOperator) _optr);
105+
106+
// for single column block, no aggregation neeeded
107+
if(emitThreshold == 1) {
108+
qOut.enqueueTask(new IndexedMatrixValue(tmp.getIndexes(), partialResult));
115109
}
116110
else {
117-
currAgg = currAgg.binaryOperations(plus, partialResult);
118-
if (aggTracker.putAndIncrementCount(rowIndex, currAgg)){
119-
// early block output: emit aggregated block
120-
MatrixIndexes idx = new MatrixIndexes(rowIndex, 1L);
121-
qOut.enqueueTask(new IndexedMatrixValue(idx, currAgg));
122-
aggTracker.remove(rowIndex);
111+
// aggregation
112+
MatrixBlock currAgg = aggTracker.get(rowIndex);
113+
if (currAgg == null) {
114+
aggTracker.putAndIncrementCount(rowIndex, partialResult);
115+
}
116+
else {
117+
currAgg = currAgg.binaryOperations(plus, partialResult);
118+
if (aggTracker.putAndIncrementCount(rowIndex, currAgg)){
119+
// early block output: emit aggregated block
120+
MatrixIndexes idx = new MatrixIndexes(rowIndex, 1L);
121+
qOut.enqueueTask(new IndexedMatrixValue(idx, currAgg));
122+
aggTracker.remove(rowIndex);
123+
}
123124
}
124125
}
125126
}
126127
}
127-
}
128-
catch(Exception ex) {
129-
throw new DMLRuntimeException(ex);
130-
}
131-
finally {
132-
qOut.closeInput();
133-
}
128+
catch(Exception ex) {
129+
throw new DMLRuntimeException(ex);
130+
}
131+
finally {
132+
qOut.closeInput();
133+
}
134134
}, qIn, qOut);
135135
}
136136
}

src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -61,21 +61,21 @@ public void processInstruction( ExecutionContext ec ) {
6161
ec.getMatrixObject(output).setStreamHandle(qOut);
6262

6363
submitOOCTask(() -> {
64-
IndexedMatrixValue tmp = null;
65-
try {
66-
while ((tmp = qIn.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) {
67-
MatrixBlock inBlock = (MatrixBlock)tmp.getValue();
68-
long oldRowIdx = tmp.getIndexes().getRowIndex();
69-
long oldColIdx = tmp.getIndexes().getColumnIndex();
64+
IndexedMatrixValue tmp = null;
65+
try {
66+
while ((tmp = qIn.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) {
67+
MatrixBlock inBlock = (MatrixBlock)tmp.getValue();
68+
long oldRowIdx = tmp.getIndexes().getRowIndex();
69+
long oldColIdx = tmp.getIndexes().getColumnIndex();
7070

71-
MatrixBlock outBlock = inBlock.reorgOperations((ReorgOperator) _optr, new MatrixBlock(), -1, -1, -1);
72-
qOut.enqueueTask(new IndexedMatrixValue(new MatrixIndexes(oldColIdx, oldRowIdx), outBlock));
71+
MatrixBlock outBlock = inBlock.reorgOperations((ReorgOperator) _optr, new MatrixBlock(), -1, -1, -1);
72+
qOut.enqueueTask(new IndexedMatrixValue(new MatrixIndexes(oldColIdx, oldRowIdx), outBlock));
73+
}
74+
qOut.closeInput();
75+
}
76+
catch(Exception ex) {
77+
throw new DMLRuntimeException(ex);
7378
}
74-
qOut.closeInput();
75-
}
76-
catch(Exception ex) {
77-
throw new DMLRuntimeException(ex);
78-
}
7979
}, qIn, qOut);
8080
}
8181
}

src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -62,19 +62,19 @@ public void processInstruction( ExecutionContext ec ) {
6262

6363

6464
submitOOCTask(() -> {
65-
IndexedMatrixValue tmp = null;
66-
try {
67-
while ((tmp = qIn.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) {
68-
IndexedMatrixValue tmpOut = new IndexedMatrixValue();
69-
tmpOut.set(tmp.getIndexes(),
70-
tmp.getValue().unaryOperations(uop, new MatrixBlock()));
71-
qOut.enqueueTask(tmpOut);
65+
IndexedMatrixValue tmp = null;
66+
try {
67+
while ((tmp = qIn.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) {
68+
IndexedMatrixValue tmpOut = new IndexedMatrixValue();
69+
tmpOut.set(tmp.getIndexes(),
70+
tmp.getValue().unaryOperations(uop, new MatrixBlock()));
71+
qOut.enqueueTask(tmpOut);
72+
}
73+
qOut.closeInput();
74+
}
75+
catch(Exception ex) {
76+
throw new DMLRuntimeException(ex);
7277
}
73-
qOut.closeInput();
74-
}
75-
catch(Exception ex) {
76-
throw new DMLRuntimeException(ex);
77-
}
7878
}, qIn, qOut);
7979
}
8080
}

0 commit comments

Comments
 (0)