Skip to content

Commit 3998870

Browse files
author
mgeipel
committed
more flux commands
1 parent c4c85d2 commit 3998870

File tree

5 files changed

+253
-2
lines changed

5 files changed

+253
-2
lines changed

src/main/java/org/culturegraph/mf/stream/pipe/BatchLogger.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,7 @@ private void writeLog() {
7777
vars.put(BATCH_COUNT_VAR, Long.toString(getBatchCount()));
7878
vars.put(BATCH_SIZE_VAR, Long.toString(getBatchSize()));
7979
vars.put(TOTAL_RECORD_COUNT_VAR, Long.toString((getBatchSize() * getBatchCount())+getRecordCount()));
80-
LOG.info(StringUtil.format(format, vars));
81-
80+
LOG.info(StringUtil.format(format, vars));
8281
}
8382

8483
@Override
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright 2013 Deutsche Nationalbibliothek
3+
*
4+
* Licensed under the Apache License, Version 2.0 the "License";
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.culturegraph.mf.stream.pipe;
17+
18+
import java.util.HashMap;
19+
import java.util.Map;
20+
21+
import org.culturegraph.mf.framework.DefaultObjectPipe;
22+
import org.culturegraph.mf.framework.ObjectReceiver;
23+
import org.culturegraph.mf.framework.annotations.Description;
24+
import org.culturegraph.mf.framework.annotations.In;
25+
import org.culturegraph.mf.framework.annotations.Out;
26+
import org.culturegraph.mf.util.StringUtil;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
/**
31+
* Writes log info for every BATCHSIZE records.
32+
*
33+
* @param <T>
34+
* object type
35+
*
36+
* @author Markus Geipel
37+
*
38+
*/
39+
@Description("Writes log info for every BATCHSIZE records.")
40+
@In(Object.class)
41+
@Out(Object.class)
42+
public final class ObjectBatchLogger<T> extends DefaultObjectPipe<T, ObjectReceiver<T>> {
43+
44+
public static final String RECORD_COUNT_VAR = "records";
45+
public static final String TOTAL_RECORD_COUNT_VAR = "totalRecords";
46+
public static final long DEFAULT_BATCH_SIZE = 1000;
47+
public static final String BATCH_COUNT_VAR = "batches";
48+
public static final String BATCH_SIZE_VAR = "batchSize";
49+
50+
private static final Logger LOG = LoggerFactory.getLogger(BatchLogger.class);
51+
private static final String DEFAULT_FORMAT = "records processed: ${totalRecords}";
52+
53+
private final Map<String, String> vars = new HashMap<String, String>();
54+
private final String format;
55+
56+
private long batchSize = DEFAULT_BATCH_SIZE;
57+
private long recordCount;
58+
private long batchCount;
59+
60+
public ObjectBatchLogger() {
61+
super();
62+
this.format = DEFAULT_FORMAT;
63+
64+
}
65+
66+
public ObjectBatchLogger(final String format) {
67+
super();
68+
this.format = format;
69+
}
70+
71+
public ObjectBatchLogger(final String format, final Map<String, String> vars) {
72+
super();
73+
this.format = format;
74+
this.vars.putAll(vars);
75+
}
76+
77+
public void setBatchSize(final int batchSize) {
78+
this.batchSize = batchSize;
79+
}
80+
81+
private void writeLog() {
82+
vars.put(RECORD_COUNT_VAR, Long.toString(recordCount));
83+
vars.put(BATCH_COUNT_VAR, Long.toString(batchCount));
84+
vars.put(BATCH_SIZE_VAR, Long.toString(batchSize));
85+
vars.put(TOTAL_RECORD_COUNT_VAR, Long.toString((batchSize * batchCount) + recordCount));
86+
LOG.info(StringUtil.format(format, vars));
87+
}
88+
89+
@Override
90+
protected void onCloseStream() {
91+
writeLog();
92+
}
93+
94+
@Override
95+
public void process(final T obj) {
96+
97+
if (getReceiver() != null) {
98+
getReceiver().process(obj);
99+
}
100+
101+
++recordCount;
102+
recordCount %= batchSize;
103+
if (0 == recordCount) {
104+
++batchCount;
105+
writeLog();
106+
}
107+
}
108+
109+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright 2013 Deutsche Nationalbibliothek
3+
*
4+
* Licensed under the Apache License, Version 2.0 the "License";
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.culturegraph.mf.stream.pipe;
17+
18+
import java.util.regex.Matcher;
19+
import java.util.regex.Pattern;
20+
21+
import org.culturegraph.mf.framework.DefaultObjectPipe;
22+
import org.culturegraph.mf.framework.ObjectReceiver;
23+
import org.culturegraph.mf.framework.annotations.Description;
24+
import org.culturegraph.mf.framework.annotations.In;
25+
import org.culturegraph.mf.framework.annotations.Out;
26+
27+
/**
28+
* Splits a {@link String} into several {@link String}s, either by extracting
29+
* parts that match a regexp or by splitting by a regexp.
30+
*
31+
* @author Markus M Geipel
32+
*/
33+
@Description("Splits a String into several Strings, either by extracting parts that match a regexp or by splitting by a regexp.")
34+
@In(String.class)
35+
@Out(String.class)
36+
public final class StringDecoder extends DefaultObjectPipe<String, ObjectReceiver<String>> {
37+
38+
/**
39+
* determines whether string is split by the regexp or parts matching the
40+
* regexp are extracted
41+
*
42+
*/
43+
public enum Mode {
44+
SPLIT, EXTRACT
45+
}
46+
47+
private Mode mode = Mode.SPLIT;
48+
49+
private final Pattern pattern;
50+
51+
public StringDecoder(final String pattern) {
52+
this.pattern = Pattern.compile(pattern);
53+
}
54+
55+
public void setMode(final Mode mode) {
56+
this.mode = mode;
57+
}
58+
59+
@Override
60+
public void process(final String obj) {
61+
assert !isClosed();
62+
assert null != obj;
63+
64+
final ObjectReceiver<String> receiver = getReceiver();
65+
if (mode == Mode.SPLIT) {
66+
for (String part : pattern.split(obj)) {
67+
receiver.process(part);
68+
}
69+
} else {
70+
final Matcher matcher = pattern.matcher(obj);
71+
while (matcher.find()) {
72+
receiver.process(matcher.group());
73+
}
74+
}
75+
}
76+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2013 Deutsche Nationalbibliothek
3+
*
4+
* Licensed under the Apache License, Version 2.0 the "License";
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.culturegraph.mf.stream.sink;
18+
19+
import org.culturegraph.mf.framework.ObjectReceiver;
20+
21+
/**
22+
* concatenetes all recieved Strings
23+
*
24+
* @author markus geipel
25+
*
26+
*/
27+
public final class StringConcatenator implements ObjectReceiver<String> {
28+
29+
private StringBuilder builder = new StringBuilder();
30+
private String separator = "";
31+
32+
@Override
33+
public void resetStream() {
34+
reset();
35+
36+
}
37+
38+
public void setSeparator(final String separator) {
39+
this.separator = separator;
40+
}
41+
42+
@Override
43+
public void closeStream() {
44+
// nothing to do
45+
46+
}
47+
48+
@Override
49+
public void process(final String obj) {
50+
builder.append(separator);
51+
builder.append(obj);
52+
53+
}
54+
55+
public void reset(){
56+
builder = new StringBuilder();
57+
}
58+
59+
public String getString(){
60+
return builder.toString();
61+
}
62+
63+
}

src/main/resources/flux-commands.properties

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,11 @@ decode-formeta org.culturegraph.mf.stream.converter.FormetaDecoder
2929
decode-csv org.culturegraph.mf.stream.converter.CsvDecoder
3030
decode-marc21 org.culturegraph.mf.stream.converter.bib.MarcDecoder
3131
decode-xml org.culturegraph.mf.stream.converter.xml.XmlDecoder
32+
decode-string org.culturegraph.mf.stream.pipe.StringDecoder
3233
regex-decode org.culturegraph.mf.stream.converter.RegexDecoder
3334

35+
read-beacon org.culturegraph.mf.stream.reader.BeaconReader
36+
3437
# XML-Handler:
3538
handle-cg-xml org.culturegraph.mf.stream.converter.xml.CGXmlHandler
3639
handle-generic-xml org.culturegraph.mf.stream.converter.xml.GenericXmlHandler
@@ -65,6 +68,7 @@ stream-to-xml org.culturegraph.mf.stream.sink.SimpleXmlWriter
6568
rdf-macros org.culturegraph.mf.stream.pipe.RdfMacroPipe
6669

6770
batch-log org.culturegraph.mf.stream.pipe.BatchLogger
71+
object-batch-log org.culturegraph.mf.stream.pipe.ObjectBatchLogger
6872
batch-reset org.culturegraph.mf.stream.pipe.BatchResetter
6973

7074
pass-through org.culturegraph.mf.stream.pipe.IdentityStreamPipe

0 commit comments

Comments
 (0)