5
5
import org .pentaho .di .core .Const ;
6
6
import org .pentaho .di .core .exception .KettleException ;
7
7
import org .pentaho .di .core .row .RowDataUtil ;
8
- import org .pentaho .di .core .row .RowMetaInterface ;
9
8
import org .pentaho .di .trans .Trans ;
10
9
import org .pentaho .di .trans .TransMeta ;
11
10
import org .pentaho .di .trans .step .BaseStep ;
23
22
*/
24
23
public class ProtobufDecodeStep extends BaseStep implements StepInterface {
25
24
26
- public ProtobufDecodeStep (StepMeta stepMeta , StepDataInterface stepDataInterface , int copyNr , TransMeta transMeta ,
27
- Trans trans ) {
25
+ public ProtobufDecodeStep (StepMeta stepMeta ,
26
+ StepDataInterface stepDataInterface , int copyNr ,
27
+ TransMeta transMeta , Trans trans ) {
28
28
super (stepMeta , stepDataInterface , copyNr , transMeta , trans );
29
29
}
30
30
@@ -34,15 +34,34 @@ public boolean init(StepMetaInterface smi, StepDataInterface sdi) {
34
34
ProtobufDecodeMeta meta = (ProtobufDecodeMeta ) smi ;
35
35
ProtobufDecodeData data = (ProtobufDecodeData ) sdi ;
36
36
try {
37
- data .decoder = new ProtobufDecoder (meta .getClasspath (), meta .getRootClass (), meta .getFields ());
37
+ data .decoder = new ProtobufDecoder (
38
+ environmentSubstitute (meta .getClasspath ()),
39
+ meta .getRootClass (), meta .getFields ());
38
40
} catch (ProtobufDecoderException e ) {
39
- logError (Messages .getString ("ProtobufDecodeStep.Init.Error" , getStepname ()), e );
41
+ logError (Messages .getString ("ProtobufDecodeStep.Dispose.Error" ,
42
+ getStepname ()), e );
40
43
return false ;
41
44
}
42
45
return true ;
43
46
}
44
47
45
- public boolean processRow (StepMetaInterface smi , StepDataInterface sdi ) throws KettleException {
48
+ public void dispose (StepMetaInterface smi , StepDataInterface sdi ) {
49
+
50
+ ProtobufDecodeData data = (ProtobufDecodeData ) sdi ;
51
+ if (data .decoder != null ) {
52
+ try {
53
+ data .decoder .dispose ();
54
+ } catch (ProtobufDecoderException e ) {
55
+ logError (Messages .getString ("ProtobufDecodeStep.Init.Error" ,
56
+ getStepname ()), e );
57
+ }
58
+ data .decoder = null ;
59
+ }
60
+ super .dispose (smi , sdi );
61
+ }
62
+
63
+ public boolean processRow (StepMetaInterface smi , StepDataInterface sdi )
64
+ throws KettleException {
46
65
Object [] r = getRow ();
47
66
if (r == null ) {
48
67
setOutputDone ();
@@ -52,66 +71,74 @@ public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws K
52
71
ProtobufDecodeMeta meta = (ProtobufDecodeMeta ) smi ;
53
72
ProtobufDecodeData data = (ProtobufDecodeData ) sdi ;
54
73
55
- RowMetaInterface inputRowMeta = getInputRowMeta ();
56
-
57
74
if (first ) {
58
75
first = false ;
59
- data .outputRowMeta = inputRowMeta .clone ();
76
+ data .outputRowMeta = getInputRowMeta () .clone ();
60
77
meta .getFields (data .outputRowMeta , getStepname (), null , null , this );
61
78
62
79
String inputField = environmentSubstitute (meta .getInputField ());
63
80
64
81
int numErrors = 0 ;
65
82
if (Const .isEmpty (inputField )) {
66
- logError (Messages .getString ("ProtobufDecodeStep.Log.FieldNameIsNull" )); //$NON-NLS-1$
83
+ logError (Messages
84
+ .getString ("ProtobufDecodeStep.Log.FieldNameIsNull" )); //$NON-NLS-1$
67
85
numErrors ++;
68
86
}
69
- data .inputFieldNr = inputRowMeta .indexOfValue (inputField );
87
+ data .inputFieldNr = getInputRowMeta () .indexOfValue (inputField );
70
88
if (data .inputFieldNr < 0 ) {
71
- logError (Messages .getString ("ProtobufDecodeStep.Log.CouldntFindField" , inputField )); //$NON-NLS-1$
89
+ logError (Messages .getString (
90
+ "ProtobufDecodeStep.Log.CouldntFindField" , inputField )); //$NON-NLS-1$
72
91
numErrors ++;
73
92
}
74
- if (!inputRowMeta .getValueMeta (data .inputFieldNr ).isBinary ()) {
75
- logError (Messages .getString ("ProtobufDecodeStep.Log.FieldNotValid" , inputField )); //$NON-NLS-1$
93
+ if (!getInputRowMeta ().getValueMeta (data .inputFieldNr ).isBinary ()) {
94
+ logError (Messages .getString (
95
+ "ProtobufDecodeStep.Log.FieldNotValid" , inputField )); //$NON-NLS-1$
76
96
numErrors ++;
77
97
}
78
98
if (numErrors > 0 ) {
79
99
setErrors (numErrors );
80
100
stopAll ();
81
101
return false ;
82
102
}
83
- data .inputFieldMeta = inputRowMeta .getValueMeta (data .inputFieldNr );
103
+ data .inputFieldMeta = getInputRowMeta ().getValueMeta (
104
+ data .inputFieldNr );
84
105
}
85
106
86
107
try {
87
- byte [] message = data .inputFieldMeta .getBinary (r [data .inputFieldNr ]);
108
+ byte [] message = data .inputFieldMeta
109
+ .getBinary (r [data .inputFieldNr ]);
88
110
try {
89
111
List <Object []> decodedData = data .decoder .decode (message );
90
112
for (Object [] d : decodedData ) {
91
- r = RowDataUtil .addRowData (r , inputRowMeta .size (), d );
113
+ r = RowDataUtil .addRowData (r , getInputRowMeta () .size (), d );
92
114
putRow (data .outputRowMeta , r );
93
115
if (isRowLevel ()) {
94
- logRowlevel (Messages .getString ("ProtobufDecodeStep.Log.OutputRow" ,
95
- Long .toString (getLinesWritten ()), data .outputRowMeta .getString (r )));
116
+ logRowlevel (Messages .getString (
117
+ "ProtobufDecodeStep.Log.OutputRow" ,
118
+ Long .toString (getLinesWritten ()),
119
+ data .outputRowMeta .getString (r )));
96
120
}
97
121
}
98
122
} catch (ProtobufDecoderException e ) {
99
123
throw new KettleException (e );
100
124
}
101
125
} catch (KettleException e ) {
102
126
if (!getStepMeta ().isDoingErrorHandling ()) {
103
- logError (Messages .getString ("ProtobufDecodeStep.ErrorInStepRunning" , e .getMessage ()));
127
+ logError (Messages
128
+ .getString ("ProtobufDecodeStep.ErrorInStepRunning" ,
129
+ e .getMessage ()));
104
130
setErrors (1 );
105
131
stopAll ();
106
132
setOutputDone ();
107
133
return false ;
108
134
}
109
- putError (inputRowMeta , r , 1 , e .toString (), null , getStepname ());
135
+ putError (getInputRowMeta () , r , 1 , e .toString (), null , getStepname ());
110
136
}
111
137
return true ;
112
138
}
113
139
114
- public void stopRunning (StepMetaInterface smi , StepDataInterface sdi ) throws KettleException {
140
+ public void stopRunning (StepMetaInterface smi , StepDataInterface sdi )
141
+ throws KettleException {
115
142
116
143
ProtobufDecodeData data = (ProtobufDecodeData ) sdi ;
117
144
data .canceled = true ;
0 commit comments