Skip to content

Commit 0a71217

Browse files
committed
Merge remote-tracking branch 'origin/master'
2 parents 669ba83 + fdad535 commit 0a71217

File tree

5 files changed

+187
-53
lines changed

5 files changed

+187
-53
lines changed

src/main/java/org/culturegraph/mf/stream/pipe/sort/AbstractTripleSort.java

Lines changed: 32 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -29,31 +29,33 @@
2929
import org.culturegraph.mf.framework.DefaultObjectPipe;
3030
import org.culturegraph.mf.framework.ObjectReceiver;
3131
import org.culturegraph.mf.types.Triple;
32-
32+
import org.culturegraph.mf.util.MemoryWarningSystem;
33+
import org.culturegraph.mf.util.MemoryWarningSystem.Listener;
3334

3435
/**
3536
* @author markus geipel
36-
*
37+
*
3738
*/
38-
public abstract class AbstractTripleSort extends DefaultObjectPipe<Triple, ObjectReceiver<Triple>> {
39+
public abstract class AbstractTripleSort extends DefaultObjectPipe<Triple, ObjectReceiver<Triple>> implements Listener {
3940
/**
4041
* specifies the comparator
4142
*/
4243
public enum Compare {
4344
SUBJECT, PREDICATE, OBJECT, ALL;
4445
}
45-
46+
4647
/**
4748
* sort order
48-
*
49+
*
4950
*/
5051
public enum Order {
5152
INCREASING {
5253
@Override
5354
public int order(final int indicator) {
5455
return indicator;
5556
}
56-
}, DECREASING {
57+
},
58+
DECREASING {
5759
@Override
5860
public int order(final int indicator) {
5961
return -indicator;
@@ -62,25 +64,22 @@ public int order(final int indicator) {
6264
public abstract int order(int indicator);
6365
}
6466

65-
66-
private static final int KILO = 1024;
67-
private static final int DEFUALT_BLOCKSIZE = 128 * KILO * KILO;
68-
private static final int STRING_OVERHEAD = 124;
69-
7067
private final List<Triple> buffer = new ArrayList<Triple>();
71-
private final List<File> tempFiles = new ArrayList<File>();
68+
private final List<File> tempFiles;
7269
private Compare compare = Compare.SUBJECT;
7370
private Order order = Order.INCREASING;
74-
//private Comparator<Triple> comparator = createComparator(compareBy, order);
75-
private long bufferSizeEstimate;
71+
private volatile boolean memoryLow;
7672

77-
private long blockSize = DEFUALT_BLOCKSIZE;
73+
public AbstractTripleSort() {
74+
MemoryWarningSystem.addListener(this);
75+
tempFiles = new ArrayList<File>(); // Initialized here to let the
76+
// compiler enforce the call to
77+
// super() in subclasses.
78+
}
7879

79-
/**
80-
* @param blockSize in MB
81-
*/
82-
public final void setBlockSize(final int blockSize) {
83-
this.blockSize = blockSize * KILO * KILO;
80+
@Override
81+
public final void memoryLow(final long usedMemory, final long maxMemory) {
82+
memoryLow = true;
8483
}
8584

8685
protected final void setCompare(final Compare compare) {
@@ -90,29 +89,23 @@ protected final void setCompare(final Compare compare) {
9089
protected final Compare getCompare() {
9190
return compare;
9291
}
93-
94-
protected final void setSortOrder(final Order order){
92+
93+
protected final void setSortOrder(final Order order) {
9594
this.order = order;
9695
}
9796

98-
99-
10097
@Override
10198
public final void process(final Triple namedValue) {
102-
103-
buffer.add(namedValue);
104-
// padding is ignored for efficiency (overhead is 45 for name + 45 for
105-
// value + 8 for namedValue + 28 goodwill)
106-
bufferSizeEstimate += ((namedValue.getSubject().length() + namedValue.getPredicate().length() + namedValue.getObject().length()) * 2) + STRING_OVERHEAD;
107-
if (bufferSizeEstimate > blockSize) {
108-
bufferSizeEstimate = 0;
99+
if (memoryLow) {
109100
try {
110101
nextBatch();
111102
} catch (IOException e) {
112103
throw new MetafactureException("Error writing to temp file after sorting", e);
104+
} finally {
105+
memoryLow = false;
113106
}
114-
115107
}
108+
buffer.add(namedValue);
116109
}
117110

118111
private void nextBatch() throws IOException {
@@ -132,11 +125,9 @@ private void nextBatch() throws IOException {
132125
tempFiles.add(tempFile);
133126
}
134127

135-
136128
@Override
137129
public final void onCloseStream() {
138130

139-
140131
if (tempFiles.isEmpty()) {
141132
Collections.sort(buffer, createComparator(compare, order));
142133
for (Triple triple : buffer) {
@@ -147,7 +138,8 @@ public final void onCloseStream() {
147138
final Comparator<Triple> comparator = createComparator(compare, order);
148139
final PriorityQueue<SortedTripleFileFacade> queue = new PriorityQueue<SortedTripleFileFacade>(11,
149140
new Comparator<SortedTripleFileFacade>() {
150-
// private final Comparator<Triple> comparator = getComparator();
141+
// private final Comparator<Triple> comparator =
142+
// getComparator();
151143

152144
@Override
153145
public int compare(final SortedTripleFileFacade o1, final SortedTripleFileFacade o2) {
@@ -183,15 +175,15 @@ public int compare(final SortedTripleFileFacade o1, final SortedTripleFileFacade
183175

184176
protected void onFinished() {
185177
// nothing to do
186-
178+
187179
}
188180

189181
protected abstract void sortedTriple(Triple namedValue);
190182

191-
public final Comparator<Triple> createComparator(){
183+
public final Comparator<Triple> createComparator() {
192184
return createComparator(compare, order);
193185
}
194-
186+
195187
public static Comparator<Triple> createComparator(final Compare compareBy, final Order order) {
196188
final Comparator<Triple> comparator;
197189
switch (compareBy) {
@@ -236,8 +228,8 @@ public int compare(final Triple o1, final Triple o2) {
236228
@Override
237229
public final void onResetStream() {
238230
buffer.clear();
239-
for(File file: tempFiles){
240-
if(file.exists()){
231+
for (File file : tempFiles) {
232+
if (file.exists()) {
241233
file.delete();
242234
}
243235
}

src/main/java/org/culturegraph/mf/stream/pipe/sort/TripleCollect.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,22 +35,21 @@
3535
@In(Triple.class)
3636
@Out(StreamReceiver.class)
3737
public final class TripleCollect extends DefaultObjectPipe<Triple, StreamReceiver> {
38-
private final FormetaParser parser = new FormetaParser();
39-
private final PartialRecordEmitter emitter = new PartialRecordEmitter();
40-
38+
4139
private String currentSubject;
40+
private final PartialRecordEmitter emitter = new PartialRecordEmitter();
41+
private final FormetaParser parser = new FormetaParser();
4242

4343
public TripleCollect() {
4444
parser.setEmitter(emitter);
4545
}
46-
46+
4747
@Override
4848
public void process(final Triple triple) {
4949
if (currentSubject == null) {
5050
currentSubject = triple.getSubject();
5151
getReceiver().startRecord(currentSubject);
5252
}
53-
5453
if (currentSubject.equals(triple.getSubject())) {
5554
decodeTriple(triple);
5655
} else {
@@ -61,14 +60,14 @@ public void process(final Triple triple) {
6160
}
6261
}
6362

64-
public void decodeTriple(final Triple triple) {
63+
private void decodeTriple(final Triple triple) {
6564
if(triple.getObjectType() == ObjectType.STRING){
6665
getReceiver().literal(triple.getPredicate(), triple.getObject());
67-
}else if (triple.getObjectType() == ObjectType.ENTITY){
66+
}else{
67+
//getReceiver().startEntity(triple.getPredicate());
6868
emitter.setDefaultName(triple.getPredicate());
6969
parser.parse(triple.getObject());
70-
}else{
71-
throw new UnsupportedOperationException(triple.getObjectType() + " can not yet be decoded");
70+
//getReceiver().endEntity();
7271
}
7372
}
7473

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Copyright 2013 Deutsche Nationalbibliothek
3+
*
4+
* Licensed under the Apache License, Version 2.0 the "License";
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
/*
17+
* Code based on http://www.javaspecialists.eu/archive/Issue092.html
18+
*/
19+
20+
package org.culturegraph.mf.util;
21+
22+
import java.lang.management.ManagementFactory;
23+
import java.lang.management.MemoryMXBean;
24+
import java.lang.management.MemoryNotificationInfo;
25+
import java.lang.management.MemoryPoolMXBean;
26+
import java.lang.management.MemoryType;
27+
import java.util.Collection;
28+
import java.util.concurrent.CopyOnWriteArrayList;
29+
30+
import javax.management.Notification;
31+
import javax.management.NotificationEmitter;
32+
import javax.management.NotificationListener;
33+
34+
/**
35+
* This memory warning system will call the listener when we exceed the
36+
* percentage of available memory specified. The class is static, since the usage threshold can only be set to one
37+
* number.
38+
*/
39+
public final class MemoryWarningSystem {
40+
private static final MemoryPoolMXBean TENURED_GEN_POOL = findTenuredGenPool();
41+
private static final double DEFAULT_THRESHOLD = 0.8;
42+
private static final Collection<Listener> LISTENERS = new CopyOnWriteArrayList<Listener>();
43+
44+
private MemoryWarningSystem() {
45+
// no instances
46+
}
47+
48+
static {
49+
setUsageThreshold(DEFAULT_THRESHOLD);
50+
final MemoryMXBean mbean = ManagementFactory.getMemoryMXBean();
51+
final NotificationEmitter emitter = (NotificationEmitter) mbean;
52+
emitter.addNotificationListener(new NotificationListener() {
53+
54+
@Override
55+
public void handleNotification(final Notification notification, final Object handback) {
56+
if (notification.getType().equals(MemoryNotificationInfo.MEMORY_THRESHOLD_EXCEEDED)) {
57+
for (Listener listener : getListeners()) {
58+
listener.memoryLow(getUsedMemory(), getMaxMemory());
59+
}
60+
}
61+
}
62+
}, null, null);
63+
}
64+
65+
protected static Collection<Listener> getListeners(){
66+
return LISTENERS;
67+
}
68+
69+
public static long getMaxMemory() {
70+
return TENURED_GEN_POOL.getUsage().getMax();
71+
}
72+
73+
public static long getUsedMemory() {
74+
return TENURED_GEN_POOL.getUsage().getUsed();
75+
}
76+
77+
public static boolean addListener(final Listener listener) {
78+
return LISTENERS.add(listener);
79+
}
80+
81+
public static boolean removeListener(final Listener listener) {
82+
return LISTENERS.remove(listener);
83+
}
84+
85+
public static void setUsageThreshold(final double threshold) {
86+
if (threshold <= 0.0 || threshold > 1.0) {
87+
throw new IllegalArgumentException("'threshold' must be in [0.0, 1.0]");
88+
}
89+
TENURED_GEN_POOL.setUsageThreshold((long) (getMaxMemory() * threshold));
90+
}
91+
92+
/**
93+
* Tenured Space Pool can be determined by it being of type HEAP and by it
94+
* being possible to set the usage threshold.
95+
*/
96+
private static MemoryPoolMXBean findTenuredGenPool() {
97+
for (MemoryPoolMXBean pool : ManagementFactory.getMemoryPoolMXBeans()) {
98+
// I don't know whether this approach is better, or whether
99+
// we should rather check for the pool name "Tenured Gen"?
100+
if (pool.getType() == MemoryType.HEAP && pool.isUsageThresholdSupported()) {
101+
return pool;
102+
}
103+
}
104+
throw new AssertionError("Could not find tenured space");
105+
}
106+
107+
/**
108+
* Interface for low memory listeners
109+
*/
110+
public interface Listener {
111+
void memoryLow(long usedMemory, long maxMemory);
112+
}
113+
}

src/test/java/org/culturegraph/mf/morph/collectors/Misc.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@
8686
<input type="text/x-cg+xml">
8787
<cgxml:cgxml version="1.0">
8888
<cgxml:records>
89-
<cgxml:record id="1">
89+
<cgxml:record id="entity end info">
9090
<cgxml:entity name="e1">
9191
<cgxml:entity name="e2">
9292
<cgxml:literal name="d" value="a" />
@@ -111,7 +111,7 @@
111111
<result type="text/x-cg+xml">
112112
<cgxml:cgxml version="1.0">
113113
<cgxml:records>
114-
<cgxml:record id="1">
114+
<cgxml:record id="entity end info">
115115
<cgxml:literal name="e1.e2.d" value="a" />
116116
<cgxml:literal name="e1.e2" value="" />
117117
<cgxml:literal name="e1" value="" />

src/test/java/org/culturegraph/mf/morph/functions/Misc.xml

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
xmlns="http://www.culturegraph.org/metamorph-test" xmlns:mm="http://www.culturegraph.org/metamorph"
44
xmlns:cgxml="http://www.culturegraph.org/cgxml">
55

6-
7-
86
<test-case name="regexp">
97
<input type="text/x-cg+xml">
108
<cgxml:cgxml version="1.0">
@@ -44,6 +42,38 @@
4442
</cgxml:cgxml>
4543
</result>
4644
</test-case>
45+
46+
<test-case name="substring">
47+
<input type="text/x-cg+xml">
48+
<cgxml:cgxml version="1.0">
49+
<cgxml:records>
50+
<cgxml:record id="1">
51+
<cgxml:literal name="a" value="012345" />
52+
</cgxml:record>
53+
</cgxml:records>
54+
</cgxml:cgxml>
55+
</input>
56+
57+
<transformation type="text/x-metamorph+xml">
58+
<mm:metamorph version="1">
59+
<mm:rules>
60+
<mm:data source="a">
61+
<mm:substring start="3" end="5"/>
62+
</mm:data>
63+
</mm:rules>
64+
</mm:metamorph>
65+
</transformation>
66+
67+
<result type="text/x-cg+xml">
68+
<cgxml:cgxml version="1.0">
69+
<cgxml:records>
70+
<cgxml:record id="1">
71+
<cgxml:literal name="a" value="34" />
72+
</cgxml:record>
73+
</cgxml:records>
74+
</cgxml:cgxml>
75+
</result>
76+
</test-case>
4777

4878
<test-case name="constant">
4979
<input type="text/x-cg+xml">

0 commit comments

Comments
 (0)