Skip to content

Commit 774830c

Browse files
committed
Merge pull request #10 from nathanlws/iterable-sorter
Sounds good -- I may do minor tweaks after merge.
2 parents de0f7ae + 20d686e commit 774830c

File tree

5 files changed

+667
-405
lines changed

5 files changed

+667
-405
lines changed
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
package com.fasterxml.sort;
2+
3+
import com.fasterxml.sort.util.SegmentedBuffer;
4+
5+
import java.io.Closeable;
6+
import java.io.File;
7+
import java.io.IOException;
8+
import java.util.ArrayList;
9+
import java.util.Arrays;
10+
import java.util.Comparator;
11+
import java.util.Iterator;
12+
import java.util.List;
13+
import java.util.NoSuchElementException;
14+
15+
public class IteratingSorter<T> extends SorterBase<T> implements Closeable
16+
{
17+
// Set iff sort spilled to disk
18+
private List<File> _mergerInputs;
19+
private DataReader<T> _merger;
20+
21+
22+
protected IteratingSorter(SortConfig config,
23+
DataReaderFactory<T> readerFactory,
24+
DataWriterFactory<T> writerFactory,
25+
Comparator<T> comparator)
26+
{
27+
super(config, readerFactory, writerFactory, comparator);
28+
}
29+
30+
protected IteratingSorter() {
31+
super();
32+
}
33+
34+
protected IteratingSorter(SortConfig config) {
35+
super(config);
36+
}
37+
38+
/**
39+
* Method that will perform full sort on input data read using given
40+
* {@link DataReader}.
41+
*
42+
* Conversions to and from intermediate sort files is done
43+
* using {@link DataReaderFactory} and {@link DataWriterFactory} configured
44+
* for this sorter.
45+
*
46+
* @return Iterator if sorting complete and output is ready to be written; null if it was cancelled
47+
*/
48+
public Iterator<T> sort(DataReader<T> inputReader)
49+
throws IOException
50+
{
51+
// Clean up any previous sort
52+
close();
53+
54+
// First, pre-sort:
55+
_phase = Phase.PRE_SORTING;
56+
boolean inputClosed = false;
57+
58+
SegmentedBuffer buffer = new SegmentedBuffer();
59+
_presortFileCount = 0;
60+
_sortRoundCount = -1;
61+
_currentSortRound = -1;
62+
63+
Iterator<T> iterator = null;
64+
try {
65+
Object[] items = _readMax(inputReader, buffer, _config.getMaxMemoryUsage(), null);
66+
if (_checkForCancel()) {
67+
close();
68+
return null;
69+
}
70+
Arrays.sort(items, _rawComparator());
71+
T next = inputReader.readNext();
72+
/* Minor optimization: in case all entries might fit in
73+
* in-memory sort buffer, avoid writing intermediate file
74+
* and just write results directly.
75+
*/
76+
if (next == null) {
77+
inputClosed = true;
78+
inputReader.close();
79+
_phase = Phase.SORTING;
80+
iterator = new CastingIterator<T>(Arrays.asList(items).iterator());
81+
} else { // but if more data than memory-buffer-full, do it right:
82+
List<File> presorted = new ArrayList<File>();
83+
presorted.add(_writePresorted(items));
84+
items = null; // it's a big array, clear refs as early as possible
85+
_presort(inputReader, buffer, next, presorted);
86+
inputClosed = true;
87+
inputReader.close();
88+
_phase = Phase.SORTING;
89+
if (_checkForCancel(presorted)) {
90+
close();
91+
return null;
92+
}
93+
_mergerInputs = presorted;
94+
_merger = _createMergeReader(merge(presorted));
95+
iterator = new MergerIterator<T>(_merger);
96+
}
97+
} finally {
98+
if (!inputClosed) {
99+
try {
100+
inputReader.close();
101+
} catch (IOException e) {
102+
// Ignore
103+
}
104+
}
105+
}
106+
if (_checkForCancel()) {
107+
close();
108+
return null;
109+
}
110+
_phase = Phase.COMPLETE;
111+
return iterator;
112+
}
113+
114+
115+
/*
116+
/**********************************************************************
117+
/* Closeable API
118+
/**********************************************************************
119+
*/
120+
121+
@Override
122+
public void close() {
123+
if (_merger != null) {
124+
try {
125+
_merger.close();
126+
}
127+
catch (IOException e) {
128+
// Ignore
129+
}
130+
}
131+
if (_mergerInputs != null) {
132+
for (File input : _mergerInputs) {
133+
input.delete();
134+
}
135+
}
136+
_mergerInputs = null;
137+
_merger = null;
138+
}
139+
140+
141+
/*
142+
/**********************************************************************
143+
/* Exception API
144+
/**********************************************************************
145+
*/
146+
147+
public static class IterableSorterException extends RuntimeException {
148+
public IterableSorterException(IOException cause) {
149+
super(cause);
150+
}
151+
}
152+
153+
154+
/*
155+
/**********************************************************************
156+
/* Iterator API
157+
/**********************************************************************
158+
*/
159+
160+
private static class CastingIterator<T> implements Iterator<T> {
161+
private final Iterator<Object> _it;
162+
163+
public CastingIterator(Iterator<Object> it) {
164+
_it = it;
165+
}
166+
167+
@Override
168+
public boolean hasNext() {
169+
return _it.hasNext();
170+
}
171+
172+
@SuppressWarnings("unchecked")
173+
@Override
174+
public T next() {
175+
return (T)_it.next();
176+
}
177+
178+
@Override
179+
public void remove() {
180+
throw new UnsupportedOperationException();
181+
}
182+
}
183+
184+
private static class MergerIterator<T> implements Iterator<T> {
185+
private final DataReader<T> _merger;
186+
private T _next;
187+
188+
private MergerIterator(DataReader<T> merger) {
189+
_merger = merger;
190+
}
191+
192+
private void prepNext() {
193+
if (_next != null) {
194+
try {
195+
_next = _merger.readNext();
196+
} catch (IOException e) {
197+
throw new IterableSorterException(e);
198+
}
199+
}
200+
}
201+
202+
@Override
203+
public boolean hasNext() {
204+
prepNext();
205+
return (_next != null);
206+
}
207+
208+
@Override
209+
public T next() {
210+
prepNext();
211+
if (_next == null) {
212+
throw new NoSuchElementException();
213+
}
214+
T t = _next;
215+
_next = null;
216+
return t;
217+
}
218+
219+
@Override
220+
public void remove() {
221+
throw new UnsupportedOperationException();
222+
}
223+
}
224+
}

0 commit comments

Comments
 (0)