48
48
@ Description ("applies a metamorph transformation to the event stream. Metamorph definition is given in brackets." )
49
49
@ In (StreamReceiver .class )
50
50
@ Out (StreamReceiver .class )
51
- public final class Metamorph implements StreamPipe <StreamReceiver >, NamedValueReceiver , MultiMap ,
52
- EntityEndIndicator {
51
+ public final class Metamorph implements StreamPipe <StreamReceiver >, NamedValueReceiver , MultiMap {
53
52
54
53
public static final String ELSE_KEYWORD = "_else" ;
55
54
public static final char FEEDBACK_CHAR = '@' ;
@@ -59,9 +58,10 @@ public final class Metamorph implements StreamPipe<StreamReceiver>, NamedValueRe
59
58
60
59
private static final String ENTITIES_NOT_BALANCED = "Entity starts and ends are not balanced" ;
61
60
62
- private final Registry <Data > dataRegistry = new WildcardRegistry <Data >();
63
- private final List <Data > elseSources = new ArrayList <Data >();
64
- private final Registry <EntityEndListener > entityEndListenerRegistry = new WildcardRegistry <EntityEndListener >();
61
+ private final Registry <NamedValueReceiver > dataRegistry = new WildcardRegistry <NamedValueReceiver >();
62
+ private final List <NamedValueReceiver > elseSources = new ArrayList <NamedValueReceiver >();
63
+
64
+ //rivate final Registry<FlushListener> entityEndListenerRegistry = new WildcardRegistry<FlushListener>();
65
65
66
66
private final MultiMap multiMap = new MultiHashMap ();
67
67
private final List <Closeable > resources = new ArrayList <Closeable >();
@@ -75,6 +75,7 @@ public final class Metamorph implements StreamPipe<StreamReceiver>, NamedValueRe
75
75
private StreamReceiver outputStreamReceiver ;
76
76
private MorphErrorHandler errorHandler = new DefaultErrorHandler ();
77
77
private int recordCount ;
78
+ private final List <FlushListener > recordEndListener = new ArrayList <FlushListener >();
78
79
79
80
protected Metamorph () {
80
81
// package private
@@ -126,7 +127,7 @@ public void literal(final String name, final String value) {
126
127
});
127
128
}
128
129
129
- protected List <Data > getElseSources () {
130
+ protected List <NamedValueReceiver > getElseSources () {
130
131
return elseSources ;
131
132
}
132
133
@@ -138,14 +139,14 @@ public void setErrorHandler(final MorphErrorHandler errorHandler) {
138
139
this .errorHandler = errorHandler ;
139
140
}
140
141
141
- protected void registerData (final Data data ) {
142
+ protected void registerNamedValueReceiver (final String source , final NamedValueReceiver data ) {
142
143
143
- final String path = data .getSource ();
144
+ // final String path = data.getSource();
144
145
145
- if (ELSE_KEYWORD .equals (path )) {
146
+ if (ELSE_KEYWORD .equals (source )) {
146
147
elseSources .add (data );
147
148
} else {
148
- dataRegistry .register (path , data );
149
+ dataRegistry .register (source , data );
149
150
}
150
151
}
151
152
@@ -171,8 +172,10 @@ public void startRecord(final String identifier) {
171
172
@ Override
172
173
public void endRecord () {
173
174
174
- notifyEntityEndListeners (RECORD_KEYWORD );
175
-
175
+ for (FlushListener listener : recordEndListener ){
176
+ listener .flush (recordCount , currentEntityCount );
177
+ }
178
+
176
179
outputStreamReceiver .endRecord ();
177
180
entityCountStack .removeLast ();
178
181
if (!entityCountStack .isEmpty ()) {
@@ -194,32 +197,18 @@ public void startEntity(final String name) {
194
197
195
198
flattener .startEntity (name );
196
199
197
- dispatch ( flattener . getCurrentPath (), "" , null );
200
+
198
201
199
202
}
200
203
201
204
@ Override
202
205
public void endEntity () {
203
-
206
+ dispatch ( flattener . getCurrentPath (), "" , null );
204
207
currentEntityCount = entityCountStack .pop ().intValue ();
205
-
206
- final String currentEntityName = flattener .getCurrentEntityName ();
207
- if (currentEntityName != null ) {
208
- notifyEntityEndListeners (currentEntityName );
209
- }
210
-
211
208
flattener .endEntity ();
212
209
213
210
}
214
211
215
- private void notifyEntityEndListeners (final String name ) {
216
- final List <EntityEndListener > matchingListeners = entityEndListenerRegistry .get (name );
217
-
218
- for (EntityEndListener listener : matchingListeners ) {
219
- listener .onEntityEnd (name , recordCount , currentEntityCount );
220
- }
221
-
222
- }
223
212
224
213
@ Override
225
214
public void literal (final String name , final String value ) {
@@ -245,15 +234,15 @@ public void closeStream() {
245
234
outputStreamReceiver .closeStream ();
246
235
}
247
236
248
- protected void dispatch (final String path , final String value , final List <Data > fallback ) {
249
- final List <Data > matchingData = findMatchingData (path , fallback );
237
+ protected void dispatch (final String path , final String value , final List <NamedValueReceiver > fallback ) {
238
+ final List <NamedValueReceiver > matchingData = findMatchingData (path , fallback );
250
239
if (null != matchingData ) {
251
240
send (path , value , matchingData );
252
241
}
253
242
}
254
243
255
- private List <Data > findMatchingData (final String path , final List <Data > fallback ) {
256
- final List <Data > matchingData = dataRegistry .get (path );
244
+ private List <NamedValueReceiver > findMatchingData (final String path , final List <NamedValueReceiver > fallback ) {
245
+ final List <NamedValueReceiver > matchingData = dataRegistry .get (path );
257
246
if (matchingData == null || matchingData .isEmpty ()) {
258
247
return fallback ;
259
248
}
@@ -266,8 +255,8 @@ private List<Data> findMatchingData(final String path, final List<Data> fallback
266
255
* @param dataList
267
256
* destination
268
257
*/
269
- private void send (final String key , final String value , final List <Data > dataList ) {
270
- for (Data data : dataList ) {
258
+ private void send (final String key , final String value , final List <NamedValueReceiver > dataList ) {
259
+ for (NamedValueReceiver data : dataList ) {
271
260
try {
272
261
data .receive (key , value , null , recordCount , currentEntityCount );
273
262
} catch (RuntimeException e ) {
@@ -322,10 +311,10 @@ public void receive(final String name, final String value, final NamedValueSourc
322
311
// //entityMap.put(from, toParam);
323
312
// }
324
313
325
- @ Override
326
- public void addEntityEndListener (final EntityEndListener entityEndListener , final String entityName ) {
327
- entityEndListenerRegistry .register (entityName , entityEndListener );
328
- }
314
+ // @Override
315
+ // public void addEntityEndListener(final FlushListener entityEndListener, final String entityName) {
316
+ // entityEndListenerRegistry.register(entityName, entityEndListener);
317
+ // }
329
318
330
319
@ Override
331
320
public Map <String , String > getMap (final String mapName ) {
@@ -356,4 +345,8 @@ public Collection<String> getMapNames() {
356
345
return multiMap .getMapNames ();
357
346
}
358
347
348
+ public void registerRecordEndFlush (final FlushListener flushListener ) {
349
+ recordEndListener .add (flushListener );
350
+ }
351
+
359
352
}
0 commit comments