Skip to content

Commit 696b351

Browse files
authored
Merge pull request #386 from metafacture/385-filter-records-by-path
Implement record path filter.
2 parents fdf1832 + ca9a107 commit 696b351

File tree

5 files changed

+679
-3
lines changed

5 files changed

+679
-3
lines changed

metafacture-mangling/build.gradle

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,10 @@ dependencies {
2323
testImplementation 'junit:junit:4.12'
2424
testImplementation 'org.mockito:mockito-core:2.5.5'
2525
}
26+
27+
test {
28+
testLogging {
29+
showStandardStreams = true
30+
exceptionFormat = 'full'
31+
}
32+
}
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/*
2+
* Copyright 2021 hbz NRW
3+
*
4+
* Licensed under the Apache License, Version 2.0 the "License";
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.metafacture.mangling;
17+
18+
import org.metafacture.framework.FluxCommand;
19+
import org.metafacture.framework.StreamReceiver;
20+
import org.metafacture.framework.annotations.Description;
21+
import org.metafacture.framework.annotations.In;
22+
import org.metafacture.framework.annotations.Out;
23+
import org.metafacture.framework.helpers.DefaultStreamPipe;
24+
25+
/**
26+
Splits a stream into records based on entity path.
27+
*/
28+
@Description("Splits a stream into records based on entity path")
29+
@In(StreamReceiver.class)
30+
@Out(StreamReceiver.class)
31+
@FluxCommand("filter-records-by-path")
32+
public final class RecordPathFilter extends DefaultStreamPipe<StreamReceiver> {
33+
34+
public static final String DEFAULT_RECORD_ID_FORMAT = "%s[%d]";
35+
36+
public static final String ROOT_PATH = "";
37+
38+
private final EntityPathTracker entityPathTracker = new EntityPathTracker();
39+
40+
private String path;
41+
private String recordIdFormat;
42+
private String recordIdentifier;
43+
private boolean inMatch;
44+
private boolean recordStarted;
45+
private int recordCount;
46+
47+
public RecordPathFilter() {
48+
this(ROOT_PATH);
49+
}
50+
51+
public RecordPathFilter(final String path) {
52+
super();
53+
resetRecord();
54+
setPath(path);
55+
setRecordIdFormat(DEFAULT_RECORD_ID_FORMAT);
56+
}
57+
58+
public void setEntitySeparator(final String entitySeparator) {
59+
entityPathTracker.setEntitySeparator(entitySeparator);
60+
}
61+
62+
public String getEntitySeparator() {
63+
return entityPathTracker.getEntitySeparator();
64+
}
65+
66+
public void setPath(final String path) {
67+
this.path = path;
68+
}
69+
70+
public String getPath() {
71+
return path;
72+
}
73+
74+
public void setRecordIdFormat(final String recordIdFormat) {
75+
this.recordIdFormat = recordIdFormat;
76+
}
77+
78+
public String getRecordIdFormat() {
79+
return recordIdFormat;
80+
}
81+
82+
@Override
83+
public void startRecord(final String identifier) {
84+
assert !isClosed();
85+
86+
recordCount = 0;
87+
recordIdentifier = identifier;
88+
entityPathTracker.startRecord(identifier);
89+
}
90+
91+
@Override
92+
public void endRecord() {
93+
assert !isClosed();
94+
95+
endRecordIfNeeded();
96+
entityPathTracker.endRecord();
97+
}
98+
99+
@Override
100+
public void startEntity(final String name) {
101+
entityPathTracker.startEntity(name);
102+
103+
if (inMatch()) {
104+
startRecordIfNeeded();
105+
getReceiver().startEntity(name);
106+
}
107+
else if (pathMatching()) {
108+
inMatch = true;
109+
}
110+
}
111+
112+
@Override
113+
public void endEntity() {
114+
if (pathMatching()) {
115+
endRecordIfNeeded();
116+
inMatch = false;
117+
}
118+
else if (inMatch()) {
119+
getReceiver().endEntity();
120+
}
121+
122+
entityPathTracker.endEntity();
123+
}
124+
125+
@Override
126+
public void literal(final String name, final String value) {
127+
if (inMatch()) {
128+
startRecordIfNeeded();
129+
getReceiver().literal(name, value);
130+
}
131+
}
132+
133+
@Override
134+
public void onResetStream() {
135+
entityPathTracker.resetStream();
136+
resetRecord();
137+
}
138+
139+
@Override
140+
public void onCloseStream() {
141+
entityPathTracker.closeStream();
142+
}
143+
144+
private void resetRecord() {
145+
recordStarted = false;
146+
inMatch = false;
147+
}
148+
149+
private void startRecordIfNeeded() {
150+
if (!recordStarted) {
151+
getReceiver().startRecord(String.format(recordIdFormat, recordIdentifier, ++recordCount));
152+
recordStarted = true;
153+
}
154+
}
155+
156+
private void endRecordIfNeeded() {
157+
if (recordStarted) {
158+
getReceiver().endRecord();
159+
resetRecord();
160+
}
161+
}
162+
163+
private boolean pathMatching() {
164+
return path.equals(entityPathTracker.getCurrentPath());
165+
}
166+
167+
private boolean inMatch() {
168+
return inMatch || path.equals(ROOT_PATH);
169+
}
170+
171+
}

metafacture-mangling/src/main/resources/flux-commands.properties

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,12 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515
#
16+
change-id org.metafacture.mangling.RecordIdChanger
17+
discard-events org.metafacture.mangling.StreamEventDiscarder
1618
filter-duplicate-objects org.metafacture.mangling.DuplicateObjectFilter
1719
filter-null-values org.metafacture.mangling.NullFilter
20+
filter-records-by-path org.metafacture.mangling.RecordPathFilter
21+
flatten org.metafacture.mangling.StreamFlattener
1822
literal-to-object org.metafacture.mangling.LiteralToObject
1923
object-to-literal org.metafacture.mangling.ObjectToLiteral
20-
change-id org.metafacture.mangling.RecordIdChanger
2124
record-to-entity org.metafacture.mangling.RecordToEntity
22-
discard-events org.metafacture.mangling.StreamEventDiscarder
23-
flatten org.metafacture.mangling.StreamFlattener

0 commit comments

Comments
 (0)