Skip to content

Commit 31dd0ab

Browse files
committed
Merge pull request #117 from ah641054/eliminate-duplicate-triples
Adds a module to filter consecutive duplicated objects.
2 parents 78f800c + dd54a37 commit 31dd0ab

File tree

3 files changed

+114
-0
lines changed

3 files changed

+114
-0
lines changed
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2013 Deutsche Nationalbibliothek
3+
*
4+
* Licensed under the Apache License, Version 2.0 the "License";
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.culturegraph.mf.stream.pipe;
17+
18+
import java.util.regex.Matcher;
19+
import java.util.regex.Pattern;
20+
21+
import org.culturegraph.mf.framework.DefaultObjectPipe;
22+
import org.culturegraph.mf.framework.ObjectReceiver;
23+
import org.culturegraph.mf.framework.annotations.Description;
24+
import org.culturegraph.mf.framework.annotations.In;
25+
import org.culturegraph.mf.framework.annotations.Out;
26+
import org.culturegraph.mf.types.Triple;
27+
28+
/**
29+
* Filters consecutive duplicated data objects.
30+
*
31+
* @param <T> object type
32+
*
33+
* @author Alexander Haffner
34+
*
35+
*/
36+
@Description("Filters consecutive duplicated data objects.")
37+
@In(Object.class)
38+
@Out(Object.class)
39+
public final class DuplicateObjectFilter<T> extends
40+
DefaultObjectPipe<T, ObjectReceiver<T>> {
41+
42+
private T lastObj;
43+
44+
@Override
45+
public void process(final T obj) {
46+
if (!obj.equals(lastObj)) {
47+
lastObj = obj;
48+
getReceiver().process(obj);
49+
}
50+
}
51+
52+
@Override
53+
protected void onResetStream() {
54+
lastObj = null;
55+
}
56+
57+
}

src/main/resources/flux-commands.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ flatten org.culturegraph.mf.stream.pipe.StreamFlattener
6262
template org.culturegraph.mf.stream.converter.ObjectTemplate
6363
add-preamble-epilogue org.culturegraph.mf.stream.converter.PreambleEpilogueAdder
6464

65+
filter-duplicate-objects org.culturegraph.mf.stream.pipe.DuplicateObjectFilter
66+
6567
object-tee org.culturegraph.mf.stream.pipe.ObjectTee
6668
stream-tee org.culturegraph.mf.stream.pipe.StreamTee
6769

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/**
2+
*
3+
*/
4+
package org.culturegraph.mf.stream.pipe;
5+
6+
import static org.mockito.Mockito.verify;
7+
import static org.mockito.Mockito.verifyNoMoreInteractions;
8+
9+
import org.culturegraph.mf.framework.ObjectReceiver;
10+
import org.junit.After;
11+
import org.junit.Before;
12+
import org.junit.Test;
13+
import org.mockito.Mock;
14+
import org.mockito.MockitoAnnotations;
15+
16+
/**
17+
* Tests for {@link DuplicateObjectFilter}.
18+
*
19+
* @author Alexander Haffner
20+
*
21+
*/
22+
public final class DuplicateObjectFilterTest {
23+
24+
private static final String OBJECT1 = "Object 1";
25+
private static final String OBJECT2 = "Object 2";
26+
27+
private DuplicateObjectFilter<String> duplicateObjectFilter;
28+
29+
@Mock
30+
private ObjectReceiver<String> receiver;
31+
32+
@Before
33+
public void setup() {
34+
MockitoAnnotations.initMocks(this);
35+
duplicateObjectFilter = new DuplicateObjectFilter<String>();
36+
duplicateObjectFilter.setReceiver(receiver);
37+
}
38+
39+
@After
40+
public void cleanup() {
41+
duplicateObjectFilter.closeStream();
42+
}
43+
44+
@Test
45+
public void testShouldEliminateDuplicateObjects() {
46+
duplicateObjectFilter.process(OBJECT1);
47+
duplicateObjectFilter.process(OBJECT1);
48+
duplicateObjectFilter.process(OBJECT2);
49+
50+
verify(receiver).process(OBJECT1);
51+
verify(receiver).process(OBJECT2);
52+
verifyNoMoreInteractions(receiver);
53+
}
54+
55+
}

0 commit comments

Comments
 (0)