29
29
import org .culturegraph .mf .framework .DefaultObjectPipe ;
30
30
import org .culturegraph .mf .framework .ObjectReceiver ;
31
31
import org .culturegraph .mf .types .Triple ;
32
-
32
+ import org .culturegraph .mf .util .MemoryWarningSystem ;
33
+ import org .culturegraph .mf .util .MemoryWarningSystem .Listener ;
33
34
34
35
/**
35
36
* @author markus geipel
36
- *
37
+ *
37
38
*/
38
- public abstract class AbstractTripleSort extends DefaultObjectPipe <Triple , ObjectReceiver <Triple >> {
39
+ public abstract class AbstractTripleSort extends DefaultObjectPipe <Triple , ObjectReceiver <Triple >> implements Listener {
39
40
/**
40
41
* specifies the comparator
41
42
*/
42
43
public enum Compare {
43
44
SUBJECT , PREDICATE , OBJECT , ALL ;
44
45
}
45
-
46
+
46
47
/**
47
48
* sort order
48
- *
49
+ *
49
50
*/
50
51
public enum Order {
51
52
INCREASING {
52
53
@ Override
53
54
public int order (final int indicator ) {
54
55
return indicator ;
55
56
}
56
- }, DECREASING {
57
+ },
58
+ DECREASING {
57
59
@ Override
58
60
public int order (final int indicator ) {
59
61
return -indicator ;
@@ -62,25 +64,22 @@ public int order(final int indicator) {
62
64
public abstract int order (int indicator );
63
65
}
64
66
65
-
66
- private static final int KILO = 1024 ;
67
- private static final int DEFUALT_BLOCKSIZE = 128 * KILO * KILO ;
68
- private static final int STRING_OVERHEAD = 124 ;
69
-
70
67
private final List <Triple > buffer = new ArrayList <Triple >();
71
- private final List <File > tempFiles = new ArrayList < File >() ;
68
+ private final List <File > tempFiles ;
72
69
private Compare compare = Compare .SUBJECT ;
73
70
private Order order = Order .INCREASING ;
74
- //private Comparator<Triple> comparator = createComparator(compareBy, order);
75
- private long bufferSizeEstimate ;
71
+ private volatile boolean memoryLow ;
76
72
77
- private long blockSize = DEFUALT_BLOCKSIZE ;
73
+ public AbstractTripleSort () {
74
+ MemoryWarningSystem .addListener (this );
75
+ tempFiles = new ArrayList <File >(); // Initialized here to let the
76
+ // compiler enforce the call to
77
+ // super() in subclasses.
78
+ }
78
79
79
- /**
80
- * @param blockSize in MB
81
- */
82
- public final void setBlockSize (final int blockSize ) {
83
- this .blockSize = blockSize * KILO * KILO ;
80
+ @ Override
81
+ public final void memoryLow (final long usedMemory , final long maxMemory ) {
82
+ memoryLow = true ;
84
83
}
85
84
86
85
protected final void setCompare (final Compare compare ) {
@@ -90,29 +89,23 @@ protected final void setCompare(final Compare compare) {
90
89
protected final Compare getCompare () {
91
90
return compare ;
92
91
}
93
-
94
- protected final void setSortOrder (final Order order ){
92
+
93
+ protected final void setSortOrder (final Order order ) {
95
94
this .order = order ;
96
95
}
97
96
98
-
99
-
100
97
@ Override
101
98
public final void process (final Triple namedValue ) {
102
-
103
- buffer .add (namedValue );
104
- // padding is ignored for efficiency (overhead is 45 for name + 45 for
105
- // value + 8 for namedValue + 28 goodwill)
106
- bufferSizeEstimate += ((namedValue .getSubject ().length () + namedValue .getPredicate ().length () + namedValue .getObject ().length ()) * 2 ) + STRING_OVERHEAD ;
107
- if (bufferSizeEstimate > blockSize ) {
108
- bufferSizeEstimate = 0 ;
99
+ if (memoryLow ) {
109
100
try {
110
101
nextBatch ();
111
102
} catch (IOException e ) {
112
103
throw new MetafactureException ("Error writing to temp file after sorting" , e );
104
+ } finally {
105
+ memoryLow = false ;
113
106
}
114
-
115
107
}
108
+ buffer .add (namedValue );
116
109
}
117
110
118
111
private void nextBatch () throws IOException {
@@ -132,11 +125,9 @@ private void nextBatch() throws IOException {
132
125
tempFiles .add (tempFile );
133
126
}
134
127
135
-
136
128
@ Override
137
129
public final void onCloseStream () {
138
130
139
-
140
131
if (tempFiles .isEmpty ()) {
141
132
Collections .sort (buffer , createComparator (compare , order ));
142
133
for (Triple triple : buffer ) {
@@ -147,7 +138,8 @@ public final void onCloseStream() {
147
138
final Comparator <Triple > comparator = createComparator (compare , order );
148
139
final PriorityQueue <SortedTripleFileFacade > queue = new PriorityQueue <SortedTripleFileFacade >(11 ,
149
140
new Comparator <SortedTripleFileFacade >() {
150
- // private final Comparator<Triple> comparator = getComparator();
141
+ // private final Comparator<Triple> comparator =
142
+ // getComparator();
151
143
152
144
@ Override
153
145
public int compare (final SortedTripleFileFacade o1 , final SortedTripleFileFacade o2 ) {
@@ -183,15 +175,15 @@ public int compare(final SortedTripleFileFacade o1, final SortedTripleFileFacade
183
175
184
176
protected void onFinished () {
185
177
// nothing to do
186
-
178
+
187
179
}
188
180
189
181
protected abstract void sortedTriple (Triple namedValue );
190
182
191
- public final Comparator <Triple > createComparator (){
183
+ public final Comparator <Triple > createComparator () {
192
184
return createComparator (compare , order );
193
185
}
194
-
186
+
195
187
public static Comparator <Triple > createComparator (final Compare compareBy , final Order order ) {
196
188
final Comparator <Triple > comparator ;
197
189
switch (compareBy ) {
@@ -236,8 +228,8 @@ public int compare(final Triple o1, final Triple o2) {
236
228
@ Override
237
229
public final void onResetStream () {
238
230
buffer .clear ();
239
- for (File file : tempFiles ){
240
- if (file .exists ()){
231
+ for (File file : tempFiles ) {
232
+ if (file .exists ()) {
241
233
file .delete ();
242
234
}
243
235
}
0 commit comments