Skip to content

Commit 17710f3

Browse files
committed
Add a system for interception to Metamorph
Currently, there is no way expect for using a Java debugger to find out how data is processed by a Metamorph script. This makes introspection of Metamorph scripts rather difficult as in-depth knowledge of the Java implementation of Metamorph is required. Additionally, no simple method exists for tools building on Metamorph (such as script editors) to interact with Metamorph scripts during their execution. This commit solves these problems by modifying the Metamorph implementation so that the user can add user-defined interceptor objects to the processing pipeline constructed from a Metamorph script. The interceptor objects are added before and after each processing step in the script. Additionally, the user can introspect the `flushWith`- mechanism. It is up to the implementor of the interceptor objects whether invokations are simply watched (e.g. for logging) or actively interfered with (e.g. for setting break points to interrupt data processing at certain points). The implementation of the interception system is based on factory classes which create the interceptor objects. The interface for these classes is defined in `InterceptorFactory`. Users can implement this interface to create custom interceptor objects. Instances of an interceptor factory implementation can be passed to the `Metamorph` object during its construction. The factory is internally passed to the `MorphBuilder` which is modified to add an interceptor object into each link between data-elements, functions and collectors. It also wraps every flush listener in an interceptor object. The interceptor factory can return `null` when asked for creating an interceptor. The `NullInterceptorFactory` implements this behaviour and serves as a default implementation of `InterceptorFactory` which creates a Metamorph pipeline without any interceptors. Writing an interceptor factory is quite simple. First, we need to implement the actual interceptor objects; then we only need to create and return instances of these interceptors in the factory class. The following example demonstrates how to implement interceptors which log all data processed by Metamorph to standard out: - The pipeline interceptors need to implement `NamedValuePipe`. However, we do not implement this interface directly but use `AbstractNamedValuePipe` as a base class. This class implements the method required for connecting the named value pipe with the pipeline so that we only have to implement the `receive` method which is called whenever data is passed through the pipeline. In this method we output the received data to standard out and pass the data to the next pipeline element (if we did not do this we would interrupt the pipeline): ``` class NamedValueLogger extends AbstractNamedValuePipe { @OverRide public void receive(final String name, final String value, final NamedValueSource source, final int recordCount, final int entityCount) { System.out.println(name + ": " + value); getNamedValueReceiver().receive(name, value, this, recordCount, entityCount); } } ``` - The `flushWith`-interceptors need to implement `FlushListener`. Interceptors for flush listeners wrap the actual flush listener. We pass the original flush listener as a constructor argument to our interceptor: ``` class FlushLogger implements FlushListener { private static FlushListener wrappedListener; public FlushLogger(final FlushListener wrappedListener) { this.wrappedListener = wrappedListener; } @OverRide public void flush(final int recordCount, final int entityCount) { System.out.println("Flushing: " + wrappedListener.toString()); wrappedListener.flush(recordCount, entityCount); } } ``` - Now we need to write an interceptor factory for creating our interceptor objects: ``` class LoggingInterceptorFactory implements InterceptorFactory { @OverRide public NamedValuePipe createNamedValueInterceptor() { return new NamedValueLogger(); } @OverRide public FlushListener createFlushInterceptor( final FlushListener listener) { return new FlushLogger(listener); } } ``` - To use our interceptors we need to pass an instance of our factory class to `Metamorph`: ``` final InterceptorFactory factory = new LoggingInterceptorFactory(); final Metamorph metamorph = new Metamorph("morph-script.xml", factory); ```
1 parent efb39d5 commit 17710f3

File tree

4 files changed

+232
-30
lines changed

4 files changed

+232
-30
lines changed

src/main/java/org/culturegraph/mf/morph/Metamorph.java

Lines changed: 63 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.Reader;
2222
import java.net.MalformedURLException;
2323
import java.util.Collection;
24+
import java.util.Collections;
2425
import java.util.Deque;
2526
import java.util.List;
2627
import java.util.Map;
@@ -32,6 +33,8 @@
3233
import org.culturegraph.mf.framework.annotations.Description;
3334
import org.culturegraph.mf.framework.annotations.In;
3435
import org.culturegraph.mf.framework.annotations.Out;
36+
import org.culturegraph.mf.morph.interceptors.InterceptorFactory;
37+
import org.culturegraph.mf.morph.interceptors.NullInterceptorFactory;
3538
import org.culturegraph.mf.stream.pipe.StreamFlattener;
3639
import org.culturegraph.mf.types.MultiMap;
3740
import org.culturegraph.mf.util.ResourceUtil;
@@ -44,6 +47,7 @@
4447
* {@link MorphBuilder} to create an instance based on an xml description
4548
*
4649
* @author Markus Michael Geipel
50+
* @author Christoph Böhme
4751
*/
4852
@Description("applies a metamorph transformation to the event stream. Metamorph "
4953
+ "definition is given in brackets.")
@@ -61,6 +65,9 @@ public final class Metamorph implements StreamPipe<StreamReceiver>, NamedValuePi
6165
private static final String ENTITIES_NOT_BALANCED = "Entity starts and ends are not balanced";
6266
private static final String COULD_NOT_LOAD_MORPH_FILE = "Could not load morph file";
6367

68+
private static final InterceptorFactory NULL_INTERCEPTOR_FACTORY = new NullInterceptorFactory();
69+
private static final Map<String, String> NO_VARS = Collections.<String, String>emptyMap();
70+
6471
private final Registry<NamedValueReceiver> dataRegistry = MorphCollectionFactory.createRegistry();
6572
private final List<NamedValueReceiver> elseSources = MorphCollectionFactory.createList();
6673

@@ -84,42 +91,80 @@ protected Metamorph() {
8491
}
8592

8693
public Metamorph(final String morphDef) {
87-
final MorphBuilder builder = new MorphBuilder(this);
88-
builder.walk(getInputSource(morphDef));
89-
init();
94+
this(morphDef, NO_VARS);
9095
}
9196

9297
public Metamorph(final String morphDef, final Map<String, String> vars) {
93-
final MorphBuilder builder = new MorphBuilder(this);
94-
builder.walk(getInputSource(morphDef), vars);
95-
init();
98+
this(morphDef, vars, NULL_INTERCEPTOR_FACTORY);
99+
}
100+
101+
public Metamorph(final String morphDef, final InterceptorFactory interceptorFactory) {
102+
this(morphDef, NO_VARS, interceptorFactory);
103+
}
104+
105+
public Metamorph(final String morphDef, final Map<String, String> vars,
106+
final InterceptorFactory interceptorFactory) {
107+
108+
this(getInputSource(morphDef), NO_VARS, interceptorFactory);
96109
}
97110

98111
public Metamorph(final Reader morphDef) {
99-
final MorphBuilder builder = new MorphBuilder(this);
100-
builder.walk(new InputSource(morphDef));
101-
init();
112+
this(morphDef, NO_VARS);
102113
}
103114

104115
public Metamorph(final Reader morphDef, final Map<String, String> vars) {
105-
final MorphBuilder builder = new MorphBuilder(this);
106-
builder.walk(new InputSource(morphDef), vars);
107-
init();
116+
this(morphDef, vars, NULL_INTERCEPTOR_FACTORY);
117+
}
118+
119+
public Metamorph(final Reader morphDef, final InterceptorFactory interceptorFactory) {
120+
this(morphDef, NO_VARS, interceptorFactory);
121+
}
122+
123+
public Metamorph(final Reader morphDef, final Map<String, String> vars,
124+
final InterceptorFactory interceptorFactory) {
125+
126+
this(new InputSource(morphDef), vars, interceptorFactory);
108127
}
109128

110129
public Metamorph(final InputStream morphDef) {
111-
final MorphBuilder builder = new MorphBuilder(this);
112-
builder.walk(new InputSource(morphDef));
113-
init();
130+
this(morphDef, NO_VARS);
114131
}
115132

116133
public Metamorph(final InputStream morphDef, final Map<String, String> vars) {
117-
final MorphBuilder builder = new MorphBuilder(this);
118-
builder.walk(new InputSource(morphDef), vars);
134+
this(morphDef, vars, NULL_INTERCEPTOR_FACTORY);
135+
}
136+
137+
public Metamorph(final InputStream morphDef, final InterceptorFactory interceptorFactory) {
138+
this(morphDef, NO_VARS, interceptorFactory);
139+
}
140+
141+
public Metamorph(final InputStream morphDef, final Map<String, String> vars,
142+
final InterceptorFactory interceptorFactory) {
143+
144+
this(new InputSource(morphDef), vars, interceptorFactory);
145+
}
146+
147+
public Metamorph(final InputSource inputSource) {
148+
this(inputSource, NO_VARS);
149+
}
150+
151+
public Metamorph(final InputSource inputSource, final Map<String, String> vars) {
152+
this(inputSource, vars, NULL_INTERCEPTOR_FACTORY);
153+
}
154+
155+
public Metamorph(final InputSource inputSource, final InterceptorFactory interceptorFactory) {
156+
this(inputSource, NO_VARS, interceptorFactory);
157+
}
158+
159+
public Metamorph(final InputSource inputSource, final Map<String, String> vars,
160+
final InterceptorFactory interceptorFactory) {
161+
162+
final MorphBuilder builder = new MorphBuilder(this, interceptorFactory);
163+
builder.walk(inputSource, vars);
119164
init();
120165
}
121166

122-
private InputSource getInputSource(final String morphDef) {
167+
private static InputSource getInputSource(final String morphDef) {
123168
try {
124169
return new InputSource(
125170
ResourceUtil.getUrl(morphDef).toExternalForm());

src/main/java/org/culturegraph/mf/morph/MorphBuilder.java

Lines changed: 67 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.culturegraph.mf.morph.collectors.Collect;
2525
import org.culturegraph.mf.morph.collectors.Entity;
2626
import org.culturegraph.mf.morph.functions.Function;
27+
import org.culturegraph.mf.morph.interceptors.InterceptorFactory;
2728
import org.culturegraph.mf.types.MultiMap;
2829
import org.culturegraph.mf.util.reflection.ObjectFactory;
2930
import org.culturegraph.mf.util.xml.Location;
@@ -46,6 +47,7 @@ public final class MorphBuilder extends AbstractMetamorphDomWalker {
4647
private static final Pattern OR_PATTERN = Pattern.compile(OR_STRING, Pattern.LITERAL);
4748

4849
private final Metamorph metamorph;
50+
private final InterceptorFactory interceptorFactory;
4951
private final Deque<StackFrame> stack = new LinkedList<StackFrame>();
5052

5153
private static final class StackFrame {
@@ -84,10 +86,13 @@ public boolean isInCondition() {
8486

8587
}
8688

87-
protected MorphBuilder(final Metamorph metamorph) {
89+
protected MorphBuilder(final Metamorph metamorph,
90+
final InterceptorFactory interceptorFactory) {
91+
8892
super();
8993

9094
this.metamorph = metamorph;
95+
this.interceptorFactory = interceptorFactory;
9196
stack.push(new StackFrame(metamorph));
9297
}
9398

@@ -176,8 +181,17 @@ protected void enterData(final Node dataNode) {
176181
data.setName(resolvedAttribute(dataNode, ATTRITBUTE.NAME));
177182
data.setSourceLocation((Location) dataNode.getUserData(Location.USER_DATA_ID));
178183

184+
final NamedValuePipe interceptor = interceptorFactory.createNamedValueInterceptor();
185+
final NamedValuePipe delegate;
186+
if (interceptor == null) {
187+
delegate = data;
188+
} else {
189+
delegate = interceptor;
190+
data.addNamedValueSource(delegate);
191+
}
192+
179193
final String source = resolvedAttribute(dataNode, ATTRITBUTE.SOURCE);
180-
metamorph.registerNamedValueReceiver(source, data);
194+
metamorph.registerNamedValueReceiver(source, delegate);
181195

182196
stack.push(new StackFrame(data));
183197
}
@@ -186,15 +200,24 @@ protected void enterData(final Node dataNode) {
186200
protected void exitData(final Node node) {
187201
final NamedValuePipe dataPipe = stack.pop().getPipe();
188202

203+
final NamedValuePipe interceptor = interceptorFactory.createNamedValueInterceptor();
204+
final NamedValuePipe delegate;
205+
if (interceptor == null) {
206+
delegate = dataPipe;
207+
} else {
208+
delegate = interceptor;
209+
delegate.addNamedValueSource(dataPipe);
210+
}
211+
189212
final StackFrame parent = stack.peek();
190213
if (parent.isInEntityName()) {
191214
// Protected xsd schema and by assertion in enterName:
192-
((Entity) parent.getPipe()).setNameSource(dataPipe);
215+
((Entity) parent.getPipe()).setNameSource(delegate);
193216
} else if (parent.isInCondition()) {
194217
// Protected xsd schema and by assertion in enterIf:
195-
((ConditionAware) parent.getPipe()).setConditionSource(dataPipe);
218+
((ConditionAware) parent.getPipe()).setConditionSource(delegate);
196219
} else {
197-
parent.getPipe().addNamedValueSource(dataPipe);
220+
parent.getPipe().addNamedValueSource(delegate);
198221
}
199222
}
200223

@@ -243,16 +266,31 @@ protected void enterCollect(final Node node) {
243266
protected void exitCollect(final Node node) {
244267
final NamedValuePipe collectPipe = stack.pop().getPipe();
245268

246-
final StackFrame parent = stack.peek();
269+
final NamedValuePipe interceptor = interceptorFactory.createNamedValueInterceptor();
270+
final NamedValuePipe delegate;
271+
if (interceptor == null || collectPipe instanceof Entity) {
272+
// The result of entity collectors cannot be intercepted
273+
// because they only use the receive/emit interface for
274+
// signalling while the actual data is transferred using
275+
// a custom mechanism. In order for this to work the Entity
276+
// class checks whether source and receiver are an
277+
// instances of Entity. If an interceptor is inserted between
278+
// entity elements this mechanism will break.
279+
delegate = collectPipe;
280+
} else {
281+
delegate = interceptor;
282+
delegate.addNamedValueSource(collectPipe);
283+
}
247284

285+
final StackFrame parent = stack.peek();
248286
if (parent.isInEntityName()) {
249287
// Protected xsd schema and by assertion in enterName:
250-
((Entity) parent.getPipe()).setNameSource(collectPipe);
288+
((Entity) parent.getPipe()).setNameSource(delegate);
251289
} else if (parent.isInCondition()) {
252290
// Protected xsd schema and by assertion in enterIf:
253-
((ConditionAware) parent.getPipe()).setConditionSource(collectPipe);
291+
((ConditionAware) parent.getPipe()).setConditionSource(delegate);
254292
} else {
255-
parent.getPipe().addNamedValueSource(collectPipe);
293+
parent.getPipe().addNamedValueSource(delegate);
256294
}
257295

258296
// must be set after recursive calls to flush descendants before parent
@@ -269,10 +307,17 @@ protected void exitCollect(final Node node) {
269307
private void registerFlush(final String flushWith, final FlushListener flushListener) {
270308
final String[] keysSplit = OR_PATTERN.split(flushWith);
271309
for (final String key : keysSplit) {
310+
final FlushListener interceptor = interceptorFactory.createFlushInterceptor(flushListener);
311+
final FlushListener delegate;
312+
if (interceptor == null) {
313+
delegate = flushListener;
314+
} else {
315+
delegate = interceptor;
316+
}
272317
if (key.equals(RECORD)) {
273-
metamorph.registerRecordEndFlush(flushListener);
318+
metamorph.registerRecordEndFlush(delegate);
274319
} else {
275-
metamorph.registerNamedValueReceiver(key, new Flush(flushListener));
320+
metamorph.registerNamedValueReceiver(key, new Flush(delegate));
276321
}
277322
}
278323
}
@@ -310,7 +355,17 @@ protected void handleFunction(final Node functionNode) {
310355
}
311356

312357
final StackFrame head = stack.peek();
313-
function.addNamedValueSource(head.getPipe());
358+
359+
final NamedValuePipe interceptor = interceptorFactory.createNamedValueInterceptor();
360+
final NamedValuePipe delegate;
361+
if (interceptor == null) {
362+
delegate = function;
363+
} else {
364+
delegate = interceptor;
365+
function.addNamedValueSource(delegate);
366+
}
367+
delegate.addNamedValueSource(head.getPipe());
368+
314369
head.setPipe(function);
315370
}
316371

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2014 Christoph Böhme
3+
*
4+
* Licensed under the Apache License, Version 2.0 the "License";
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.culturegraph.mf.morph.interceptors;
17+
18+
import org.culturegraph.mf.morph.FlushListener;
19+
import org.culturegraph.mf.morph.Metamorph;
20+
import org.culturegraph.mf.morph.MorphBuilder;
21+
import org.culturegraph.mf.morph.NamedValuePipe;
22+
23+
/**
24+
* Interface for classes which create interceptors for Metamorph. When creating
25+
* an instance of {@link Metamorph} an implementation of
26+
* {@link InterceptorFactory} can be passed to it. During the construction of
27+
* the transformation pipeline the {@link MorphBuilder} adds the interceptor
28+
* objects returned by the factory to the pipeline.
29+
*
30+
* An interceptor factory can return {@code null} when asked for creating an
31+
* interceptor. The {@link NullInterceptorFactory} implements this behaviour and
32+
* serves as a default implementation of {@link InterceptorFactory} which
33+
* creates a Metamorph pipeline without any interceptors.
34+
*
35+
* @author Christoph Böhme
36+
*
37+
*/
38+
public interface InterceptorFactory {
39+
40+
/**
41+
* Returns an interceptor which is placed between a NamedValueSource and a
42+
* NamedValueReceiver to intercept named values passed between the two
43+
* objects.
44+
*
45+
* @return an interceptor object
46+
*/
47+
NamedValuePipe createNamedValueInterceptor();
48+
49+
/**
50+
* Returns an interceptor which wraps the flush listener passed as an
51+
* argument and intercepts calls of the {@link FlushListener.flush} method.
52+
*
53+
* @param listener
54+
* flush listener object whose invokations should be intercepted.
55+
* @return an interceptor object which should be registered with the
56+
* {@link Metamorph} object in place of the original
57+
* {@code listener}.
58+
*/
59+
FlushListener createFlushInterceptor(FlushListener listener);
60+
61+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2014 Christoph Böhme
3+
*
4+
* Licensed under the Apache License, Version 2.0 the "License";
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.culturegraph.mf.morph.interceptors;
17+
18+
import org.culturegraph.mf.morph.FlushListener;
19+
import org.culturegraph.mf.morph.NamedValuePipe;
20+
21+
/**
22+
* An implementation of {@link InterceptorFactory} which returns no
23+
* interceptors. It can be used as a default implementation when no interception
24+
* should occur.
25+
*
26+
* @author Christoph Böhme
27+
*
28+
*/
29+
public final class NullInterceptorFactory implements InterceptorFactory {
30+
31+
@Override
32+
public NamedValuePipe createNamedValueInterceptor() {
33+
return null;
34+
}
35+
36+
@Override
37+
public FlushListener createFlushInterceptor(final FlushListener listener) {
38+
return null;
39+
}
40+
41+
}

0 commit comments

Comments
 (0)