Skip to content

Commit 694cfb1

Browse files
committed
Sorter now extends IterableSorter which sorts but doesn't write
1 parent e2a42d7 commit 694cfb1

File tree

4 files changed

+253
-94
lines changed

4 files changed

+253
-94
lines changed
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
package com.fasterxml.sort;
2+
3+
import com.fasterxml.sort.util.SegmentedBuffer;
4+
5+
import java.io.File;
6+
import java.io.IOException;
7+
import java.util.ArrayList;
8+
import java.util.Arrays;
9+
import java.util.Comparator;
10+
import java.util.Iterator;
11+
import java.util.List;
12+
import java.util.NoSuchElementException;
13+
14+
public class IterableSorter<T> extends SorterBase<T> implements Iterable<T>
15+
{
16+
// Set iff sort spilled to disk
17+
private List<File> _mergerInputs;
18+
private DataReader<T> _merger;
19+
// Set iff sorting completed without cancelation
20+
private Iterator<T> _iterator;
21+
22+
23+
protected IterableSorter(SortConfig config,
24+
DataReaderFactory<T> readerFactory,
25+
DataWriterFactory<T> writerFactory,
26+
Comparator<T> comparator)
27+
{
28+
super(config, readerFactory, writerFactory, comparator);
29+
}
30+
31+
protected IterableSorter() {
32+
super();
33+
}
34+
35+
protected IterableSorter(SortConfig config) {
36+
super(config);
37+
}
38+
39+
/**
40+
* Method that will perform full sort on input data read using given
41+
* {@link DataReader}.
42+
*
43+
* Conversions to and from intermediate sort files is done
44+
* using {@link DataReaderFactory} and {@link DataWriterFactory} configured
45+
* for this sorter.
46+
*
47+
* @return true if sorting complete and output is ready to be written; false if it was cancelled
48+
*/
49+
public boolean sort(DataReader<T> inputReader)
50+
throws IOException
51+
{
52+
// Clean up any previous sort
53+
close();
54+
55+
// First, pre-sort:
56+
_phase = Phase.PRE_SORTING;
57+
boolean inputClosed = false;
58+
59+
SegmentedBuffer buffer = new SegmentedBuffer();
60+
_presortFileCount = 0;
61+
_sortRoundCount = -1;
62+
_currentSortRound = -1;
63+
64+
try {
65+
Object[] items = _readMax(inputReader, buffer, _config.getMaxMemoryUsage(), null);
66+
if (_checkForCancel()) {
67+
return false;
68+
}
69+
Arrays.sort(items, _rawComparator());
70+
T next = inputReader.readNext();
71+
/* Minor optimization: in case all entries might fit in
72+
* in-memory sort buffer, avoid writing intermediate file
73+
* and just write results directly.
74+
*/
75+
if (next == null) {
76+
inputClosed = true;
77+
inputReader.close();
78+
_phase = Phase.SORTING;
79+
_iterator = new CastingIterator<T>(Arrays.asList(items).iterator());
80+
} else { // but if more data than memory-buffer-full, do it right:
81+
List<File> presorted = new ArrayList<File>();
82+
presorted.add(_writePresorted(items));
83+
items = null; // it's a big array, clear refs as early as possible
84+
_presort(inputReader, buffer, next, presorted);
85+
inputClosed = true;
86+
inputReader.close();
87+
_phase = Phase.SORTING;
88+
if (_checkForCancel(presorted)) {
89+
return false;
90+
}
91+
_mergerInputs = presorted;
92+
_merger = merge(presorted);
93+
_iterator = new MergerIterator<T>(_merger);
94+
}
95+
} finally {
96+
if (!inputClosed) {
97+
try {
98+
inputReader.close();
99+
} catch (IOException e) {
100+
// Ignore
101+
}
102+
}
103+
}
104+
if (_checkForCancel()) {
105+
return false;
106+
}
107+
_phase = Phase.COMPLETE;
108+
return true;
109+
}
110+
111+
/*
112+
/**********************************************************************
113+
/* Iterable API
114+
/**********************************************************************
115+
*/
116+
117+
@Override
118+
public Iterator<T> iterator() {
119+
if (_iterator == null) {
120+
throw new IllegalStateException("Not yet sorted");
121+
}
122+
return _iterator;
123+
}
124+
125+
public void close() {
126+
if (_merger != null) {
127+
try {
128+
_merger.close();
129+
}
130+
catch (IOException e) {
131+
// Ignore
132+
}
133+
}
134+
if (_mergerInputs != null) {
135+
for (File input : _mergerInputs) {
136+
input.delete();
137+
}
138+
}
139+
_mergerInputs = null;
140+
_merger = null;
141+
_iterator = null;
142+
}
143+
144+
145+
/*
146+
/**********************************************************************
147+
/* Exception API
148+
/**********************************************************************
149+
*/
150+
151+
public static class IterableSorterException extends RuntimeException {
152+
public IterableSorterException(IOException cause) {
153+
super(cause);
154+
}
155+
}
156+
157+
158+
/*
159+
/**********************************************************************
160+
/* Iterator API
161+
/**********************************************************************
162+
*/
163+
164+
private static class CastingIterator<T> implements Iterator<T> {
165+
private final Iterator<Object> _it;
166+
167+
public CastingIterator(Iterator<Object> it) {
168+
_it = it;
169+
}
170+
171+
@Override
172+
public boolean hasNext() {
173+
return _it.hasNext();
174+
}
175+
176+
@SuppressWarnings("unchecked")
177+
@Override
178+
public T next() {
179+
return (T)_it.next();
180+
}
181+
182+
@Override
183+
public void remove() {
184+
throw new UnsupportedOperationException();
185+
}
186+
}
187+
188+
private static class MergerIterator<T> implements Iterator<T> {
189+
private final DataReader<T> _merger;
190+
private T _next;
191+
192+
private MergerIterator(DataReader<T> merger) {
193+
_merger = merger;
194+
}
195+
196+
private void prepNext() {
197+
if (_next != null) {
198+
try {
199+
_next = _merger.readNext();
200+
} catch (IOException e) {
201+
throw new IterableSorterException(e);
202+
}
203+
}
204+
}
205+
206+
@Override
207+
public boolean hasNext() {
208+
prepNext();
209+
return (_next != null);
210+
}
211+
212+
@Override
213+
public T next() {
214+
prepNext();
215+
if (_next == null) {
216+
throw new NoSuchElementException();
217+
}
218+
T t = _next;
219+
_next = null;
220+
return t;
221+
}
222+
223+
@Override
224+
public void remove() {
225+
throw new UnsupportedOperationException();
226+
}
227+
}
228+
}

src/main/java/com/fasterxml/sort/Sorter.java

Lines changed: 12 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,19 @@
11
package com.fasterxml.sort;
22

3-
import com.fasterxml.sort.util.SegmentedBuffer;
4-
5-
import java.io.File;
63
import java.io.IOException;
74
import java.io.InputStream;
85
import java.io.OutputStream;
9-
import java.util.ArrayList;
10-
import java.util.Arrays;
116
import java.util.Comparator;
12-
import java.util.List;
7+
import java.util.Iterator;
138

149
/**
1510
* Main entry point for sorting functionality; object that drives
1611
* the sorting process from pre-sort to final output.
1712
* Instances are not thread-safe, although they are reusable.
18-
* Since the cost of creating new instances is trivial, there is usally
13+
* Since the cost of creating new instances is trivial, there is usually
1914
* no benefit from reusing instances, other than possible convenience.
2015
*/
21-
public class Sorter<T> extends SorterBase<T>
16+
public class Sorter<T> extends IterableSorter<T>
2217
{
2318
/**
2419
* @param config Configuration for the sorter
@@ -43,18 +38,6 @@ protected Sorter(SortConfig config) {
4338
super(config);
4439
}
4540

46-
protected Sorter<T> withReaderFactory(DataReaderFactory<T> f) {
47-
return new Sorter<T>(_config, f, _writerFactory, _comparator);
48-
}
49-
50-
protected Sorter<T> withWriterFactory(DataWriterFactory<T> f) {
51-
return new Sorter<T>(_config, _readerFactory, f, _comparator);
52-
}
53-
54-
protected Sorter<T> withComparator(Comparator<T> cmp) {
55-
return new Sorter<T>(_config, _readerFactory, _writerFactory, cmp);
56-
}
57-
5841

5942
/*
6043
/**********************************************************************
@@ -82,68 +65,23 @@ public void sort(InputStream source, OutputStream destination)
8265
* using {@link DataReaderFactory} and {@link DataWriterFactory} configured
8366
* for this sorter.
8467
*
85-
* @return true if sorting completed succesfully; false if it was cancelled
68+
* @return true if sorting completed successfully; false if it was cancelled
8669
*/
8770
public boolean sort(DataReader<T> inputReader, DataWriter<T> resultWriter)
8871
throws IOException
8972
{
90-
// First, pre-sort:
91-
_phase = SortingState.Phase.PRE_SORTING;
92-
93-
SegmentedBuffer buffer = new SegmentedBuffer();
94-
boolean inputClosed = false;
95-
boolean resultClosed = false;
96-
97-
_presortFileCount = 0;
98-
_sortRoundCount = -1;
99-
_currentSortRound = -1;
100-
73+
if(!super.sort(inputReader)) {
74+
return false;
75+
}
76+
Iterator<T> it = super.iterator();
10177
try {
102-
Object[] items = _readMax(inputReader, buffer, _config.getMaxMemoryUsage(), null);
103-
if (_checkForCancel()) {
104-
return false;
105-
}
106-
Arrays.sort(items, _rawComparator());
107-
T next = inputReader.readNext();
108-
/* Minor optimization: in case all entries might fit in
109-
* in-memory sort buffer, avoid writing intermediate file
110-
* and just write results directly.
111-
*/
112-
if (next == null) {
113-
inputClosed = true;
114-
inputReader.close();
115-
_phase = SortingState.Phase.SORTING;
116-
_writeAll(resultWriter, items);
117-
} else { // but if more data than memory-buffer-full, do it right:
118-
List<File> presorted = new ArrayList<File>();
119-
presorted.add(_writePresorted(items));
120-
items = null; // it's a big array, clear refs as early as possible
121-
_presort(inputReader, buffer, next, presorted);
122-
inputClosed = true;
123-
inputReader.close();
124-
_phase = SortingState.Phase.SORTING;
125-
if (_checkForCancel(presorted)) {
126-
return false;
127-
}
128-
merge(presorted, resultWriter);
78+
while(it.hasNext()) {
79+
T value = it.next();
80+
resultWriter.writeEntry(value);
12981
}
130-
resultClosed = true;
13182
resultWriter.close();
132-
if (_checkForCancel()) {
133-
return false;
134-
}
135-
_phase = SortingState.Phase.COMPLETE;
13683
} finally {
137-
if (!inputClosed) {
138-
try {
139-
inputReader.close();
140-
} catch (IOException e) { }
141-
}
142-
if (!resultClosed) {
143-
try {
144-
resultWriter.close();
145-
} catch (IOException e) { }
146-
}
84+
super.close();
14785
}
14886
return true;
14987
}

0 commit comments

Comments
 (0)