Skip to content

Commit 3ea306d

Browse files
author
Michael Spector
committed
fixed denormalization of rows
1 parent 4ff9e4a commit 3ea306d

File tree

4 files changed

+120
-66
lines changed

4 files changed

+120
-66
lines changed

src/main/java/com/ruckuswireless/pentaho/protobuf/decode/ProtobufDecodeDialog.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ private void ok() {
391391
private void detectFields() {
392392
try {
393393
ProtobufDecoder protobufDecoder = new ProtobufDecoder(
394-
wClasspath.getText().trim().split(File.pathSeparator), wRootClass.getText());
394+
wClasspath.getText().trim().split(File.pathSeparator), wRootClass.getText(), null);
395395
Map<String, Class<?>> fields = protobufDecoder.guessFields();
396396
RowMeta rowMeta = new RowMeta();
397397
for (Entry<String, Class<?>> e : fields.entrySet()) {

src/main/java/com/ruckuswireless/pentaho/protobuf/decode/ProtobufDecodeStep.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public boolean init(StepMetaInterface smi, StepDataInterface sdi) {
3434
ProtobufDecodeMeta meta = (ProtobufDecodeMeta) smi;
3535
ProtobufDecodeData data = (ProtobufDecodeData) sdi;
3636
try {
37-
data.decoder = new ProtobufDecoder(meta.getClasspath(), meta.getRootClass());
37+
data.decoder = new ProtobufDecoder(meta.getClasspath(), meta.getRootClass(), meta.getFields());
3838
} catch (ProtobufDecoderException e) {
3939
logError(Messages.getString("ProtobufDecodeStep.Init.Error", getStepname()), e);
4040
return false;
@@ -86,7 +86,7 @@ public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws K
8686
try {
8787
byte[] message = data.inputFieldMeta.getBinary(r[data.inputFieldNr]);
8888
try {
89-
List<Object[]> decodedData = data.decoder.decode(message, meta.getFields());
89+
List<Object[]> decodedData = data.decoder.decode(message);
9090
for (Object[] d : decodedData) {
9191
r = RowDataUtil.addRowData(r, inputRowMeta.size(), d);
9292
putRow(data.outputRowMeta, r);

src/main/java/com/ruckuswireless/pentaho/protobuf/decode/ProtobufDecoder.java

Lines changed: 114 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,12 @@
88
import java.net.URLClassLoader;
99
import java.util.ArrayList;
1010
import java.util.HashMap;
11+
import java.util.HashSet;
12+
import java.util.LinkedHashMap;
1113
import java.util.LinkedList;
1214
import java.util.List;
1315
import java.util.Map;
16+
import java.util.Set;
1417

1518
import com.google.protobuf.Descriptors.FieldDescriptor;
1619
import com.google.protobuf.Message;
@@ -23,10 +26,14 @@
2326
*/
2427
public class ProtobufDecoder {
2528

29+
private final Object EMPTY = new Object();
2630
private Class<?> rootClass;
2731
private Method rootParseFromMethod;
32+
private LinkedHashMap<String, Integer> paths;
33+
private FieldDefinition[] fields;
2834

29-
public ProtobufDecoder(String[] classpath, String rootClass) throws ProtobufDecoderException {
35+
public ProtobufDecoder(String[] classpath, String rootClass, FieldDefinition[] fields)
36+
throws ProtobufDecoderException {
3037
URLClassLoader classLoader;
3138
try {
3239
URL[] url = new URL[classpath.length];
@@ -51,100 +58,140 @@ public ProtobufDecoder(String[] classpath, String rootClass) throws ProtobufDeco
5158
} catch (NoSuchMethodException e) {
5259
throw new ProtobufDecoderException("Can't setup Protocol Buffers decoder", e);
5360
}
61+
62+
if (fields != null) {
63+
this.paths = new LinkedHashMap<String, Integer>();
64+
for (int i = 0; i < fields.length; ++i) {
65+
this.paths.put(fields[i].path, Integer.valueOf(i));
66+
}
67+
this.fields = fields;
68+
}
5469
}
5570

5671
/**
57-
* Parses binary message
72+
* Decodes message, and returns denormalized rows for the object fields
73+
* exactly in the order they appear in <code>fields</code> parameter.
5874
*
5975
* @param message
60-
* Encoded message
61-
* @return decoded Protocol Buffers message
76+
* Encoded Protocol Buffers message
77+
* @return rows list
6278
* @throws ProtobufDecoderException
6379
*/
64-
protected Message decode(byte[] message) throws ProtobufDecoderException {
80+
public List<Object[]> decode(byte[] message) throws ProtobufDecoderException {
81+
Message decodedMessage;
6582
try {
66-
return (Message) rootParseFromMethod.invoke(null, message);
83+
decodedMessage = (Message) rootParseFromMethod.invoke(null, message);
6784
} catch (IllegalArgumentException e) {
6885
throw new ProtobufDecoderException(e);
6986
} catch (IllegalAccessException e) {
7087
throw new ProtobufDecoderException(e);
7188
} catch (InvocationTargetException e) {
7289
throw new ProtobufDecoderException("Can't call to " + rootParseFromMethod, e.getCause());
7390
}
74-
}
7591

76-
/**
77-
* Decodes message, and returns denormalized rows for the object fields
78-
* exactly in the order they appear in <code>fields</code> parameter.
79-
*
80-
* @param message
81-
* Encoded Protocol Buffers message
82-
* @param fields
83-
* Definitions of fields to return
84-
* @return values
85-
* @throws ProtobufDecoderException
86-
*/
87-
public List<Object[]> decode(byte[] message, FieldDefinition[] fields) throws ProtobufDecoderException {
92+
ValueNode root = buildValuesTree(decodedMessage, "");
8893
LinkedList<Object[]> result = new LinkedList<Object[]>();
89-
result.add(new Object[fields.length]);
90-
buildRows(decode(message), fields, result);
94+
if (root != null) {
95+
produceRows(root, new LinkedList<Object[]>(), result, new HashSet<Integer>());
96+
}
9197
return result;
9298
}
9399

94-
protected void buildRows(Message message, FieldDefinition[] fields, LinkedList<Object[]> result)
95-
throws ProtobufDecoderException {
96-
97-
for (int i = 0; i < fields.length; ++i) {
98-
FieldDefinition field = fields[i];
100+
protected static class ValueNode {
101+
Integer fieldIdx;
102+
Object value;
103+
List<ValueNode> children;
104+
}
99105

100-
List<Object> values = new ArrayList<Object>();
101-
getFieldValues(message, field.path, values);
106+
protected ValueNode buildValuesTree(Object root, String currentPath) throws ProtobufDecoderException {
102107

103-
Object[] lastRow = result.getLast();
104-
if (values.size() > 0) {
105-
lastRow[i] = values.get(0);
108+
if (root instanceof Message) {
109+
Message message = (Message) root;
110+
List<FieldDescriptor> fields = message.getDescriptorForType().getFields();
111+
List<ValueNode> ch = new ArrayList<ValueNode>(fields.size());
112+
for (FieldDescriptor field : fields) {
113+
ValueNode n = buildValuesTree(message.getField(field), currentPath.length() > 0 ? currentPath + "."
114+
+ field.getName() : field.getName());
115+
if (n != null) {
116+
ch.add(n);
117+
}
106118
}
107-
if (values.size() > 1) {
108-
// normalize:
109-
for (int j = 1; j < values.size(); ++j) {
110-
Object[] clone = new Object[lastRow.length];
111-
System.arraycopy(lastRow, 0, clone, 0, lastRow.length);
112-
clone[i] = values.get(j);
113-
result.add(clone);
119+
if (ch.size() > 0) {
120+
ValueNode node = new ValueNode();
121+
node.children = ch;
122+
return node;
123+
}
124+
return null;
125+
}
126+
127+
if (root instanceof List<?>) {
128+
List<?> list = (List<?>) root;
129+
List<ValueNode> ch = new ArrayList<ValueNode>(list.size());
130+
for (Object v : list) {
131+
ValueNode n = buildValuesTree(v, currentPath);
132+
if (n != null) {
133+
ch.add(n);
114134
}
115135
}
136+
if (ch.size() > 0) {
137+
ValueNode node = new ValueNode();
138+
node.children = ch;
139+
return node;
140+
}
141+
return null;
116142
}
117-
}
118143

119-
protected void getFieldValues(Object root, String fieldPath, List<Object> values) throws ProtobufDecoderException {
120-
int i = fieldPath.indexOf('.');
121-
String fieldName, nextPath;
122-
if (i == -1) {
123-
fieldName = fieldPath;
124-
nextPath = "";
125-
} else {
126-
fieldName = fieldPath.substring(0, i);
127-
nextPath = fieldPath.substring(i + 1);
144+
// primitive
145+
Integer fieldIdx = paths.get(currentPath);
146+
if (fieldIdx != null) {
147+
ValueNode node = new ValueNode();
148+
node.fieldIdx = fieldIdx;
149+
node.value = KettleTypesConverter.kettleCast(root);
150+
return node;
128151
}
152+
return null;
153+
}
129154

130-
if (root instanceof Message) {
131-
if (fieldName.length() == 0) {
132-
throw new ProtobufDecoderException("Field path doesn't lead to a primitive value!");
133-
}
134-
Message message = (Message) root;
135-
FieldDescriptor fieldDesc = message.getDescriptorForType().findFieldByName(fieldName);
136-
getFieldValues(message.getField(fieldDesc), nextPath, values);
155+
protected void produceRows(ValueNode root, LinkedList<Object[]> rowsPool, LinkedList<Object[]> result,
156+
Set<Integer> processedFields) {
157+
158+
if (root.fieldIdx != null) { // Field value (leaf)
159+
int fieldIdx = root.fieldIdx.intValue();
160+
if (rowsPool.size() == 0) {
161+
// Create new row, and push it to the rows pool
162+
Object[] row = new Object[fields.length];
163+
for (int i = 0; i < row.length; ++i) {
164+
row[i] = EMPTY;
165+
}
166+
row[fieldIdx] = root.value;
167+
rowsPool.add(row);
137168

138-
} else if (root instanceof List<?>) {
139-
List<?> valuesList = (List<?>) root;
140-
for (Object v : valuesList) {
141-
getFieldValues(v, fieldPath, values);
169+
} else {
170+
List<Object[]> newRows = new LinkedList<Object[]>();
171+
for (Object[] row : rowsPool) {
172+
if (row[fieldIdx] == EMPTY) {
173+
row[fieldIdx] = root.value;
174+
} else {
175+
// Clone the row:
176+
Object[] newRow = new Object[row.length];
177+
System.arraycopy(row, 0, newRow, 0, row.length);
178+
newRow[fieldIdx] = root.value;
179+
newRows.add(newRow);
180+
}
181+
}
182+
rowsPool.addAll(newRows);
183+
}
184+
processedFields.add(root.fieldIdx);
185+
} else {
186+
for (ValueNode child : root.children) {
187+
produceRows(child, rowsPool, result, processedFields);
142188
}
143-
} else { // primitive
144-
if (nextPath.length() > 0) {
145-
throw new ProtobufDecoderException("Field path leads through primitive value!");
189+
if (processedFields.size() == fields.length) {
190+
// Move all created rows from the pool to the result
191+
result.addAll(rowsPool);
192+
rowsPool.clear();
193+
processedFields.clear();
146194
}
147-
values.add(KettleTypesConverter.kettleCast(root));
148195
}
149196
}
150197

@@ -211,4 +258,8 @@ public ProtobufDecoderException(Throwable cause) {
211258
super(cause);
212259
}
213260
}
261+
262+
public static interface RowProduceListener {
263+
void onNewRow(Object[] row);
264+
}
214265
}

src/main/java/com/ruckuswireless/pentaho/utils/KettleTypesConverter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
public class KettleTypesConverter {
1515

1616
public static Object kettleCast(Object value) {
17+
if (value == null) {
18+
return value;
19+
}
1720
Class<?> c = value.getClass();
1821
if (c == Integer.class || c == int.class) {
1922
return ((Integer) value).longValue();

0 commit comments

Comments
 (0)