Skip to content

Commit f8bb36b

Browse files
committed
Rewrite offsets after update
1 parent 70cdd0b commit f8bb36b

File tree

8 files changed

+279
-104
lines changed

8 files changed

+279
-104
lines changed

src/main/java/org/radarcns/OffsetRange.java

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,44 +16,49 @@
1616

1717
package org.radarcns;
1818

19-
import java.util.Objects;
19+
import com.fasterxml.jackson.annotation.JsonCreator;
20+
import com.fasterxml.jackson.annotation.JsonProperty;
21+
2022
import javax.annotation.Nonnull;
2123

2224
/** POJO class for storing offsets. */
2325
public class OffsetRange implements Comparable<OffsetRange> {
24-
private String topic;
25-
private int partition;
26+
private final String topic;
27+
private final int partition;
2628
private long offsetFrom;
2729
private long offsetTo;
2830

29-
public static OffsetRange parse(String filename) throws NumberFormatException, IndexOutOfBoundsException {
31+
public static OffsetRange parseFilename(String filename) throws NumberFormatException, IndexOutOfBoundsException {
3032
String[] fileNameParts = filename.split("[+.]");
3133

32-
OffsetRange range = new OffsetRange();
33-
range.topic = fileNameParts[0];
34-
range.partition = Integer.parseInt(fileNameParts[1]);
35-
range.offsetFrom = Long.parseLong(fileNameParts[2]);
36-
range.offsetTo = Long.parseLong(fileNameParts[3]);
37-
return range;
34+
return new OffsetRange(
35+
fileNameParts[0],
36+
Integer.parseInt(fileNameParts[1]),
37+
Long.parseLong(fileNameParts[2]),
38+
Long.parseLong(fileNameParts[3]));
3839
}
3940

40-
public String getTopic() {
41-
return topic;
41+
/** Full constructor. */
42+
@JsonCreator
43+
public OffsetRange(
44+
@JsonProperty("topic") String topic,
45+
@JsonProperty("partition") int partition,
46+
@JsonProperty("offsetFrom") long offsetFrom,
47+
@JsonProperty("offsetTo") long offsetTo) {
48+
this.topic = topic;
49+
this.partition = partition;
50+
this.offsetFrom = offsetFrom;
51+
this.offsetTo = offsetTo;
4252
}
4353

44-
public void setTopic(@Nonnull String topic) {
45-
Objects.requireNonNull(topic);
46-
this.topic = topic;
54+
public String getTopic() {
55+
return topic;
4756
}
4857

4958
public int getPartition() {
5059
return partition;
5160
}
5261

53-
public void setPartition(int partition) {
54-
this.partition = partition;
55-
}
56-
5762
public long getOffsetFrom() {
5863
return offsetFrom;
5964
}
@@ -97,7 +102,7 @@ public boolean equals(Object o) {
97102
}
98103

99104
@Override
100-
public int compareTo(OffsetRange o) {
105+
public int compareTo(@Nonnull OffsetRange o) {
101106
int ret = Long.compare(offsetFrom, o.offsetFrom);
102107
if (ret != 0) {
103108
return ret;

src/main/java/org/radarcns/OffsetRangeFile.java

Lines changed: 61 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -31,60 +31,87 @@
3131
import java.io.FileWriter;
3232
import java.io.Flushable;
3333
import java.io.IOException;
34+
import java.nio.file.Files;
35+
36+
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
3437

3538
/**
3639
* Accesses a OffsetRange file using the CSV format. On construction, this will create the file if
3740
* not present.
3841
*/
39-
public class OffsetRangeFile implements Flushable, Closeable {
40-
private final CsvMapper mapper;
41-
private final CsvSchema schema;
42-
private final File file;
43-
private final FileWriter fileWriter;
44-
private final BufferedWriter bufferedWriter;
45-
private final CsvGenerator generator;
46-
private final ObjectWriter writer;
42+
public final class OffsetRangeFile {
43+
private static final CsvSchema SCHEMA = CsvSchema.builder()
44+
.addColumn("topic")
45+
.addNumberColumn("partition")
46+
.addNumberColumn("offsetTo")
47+
.addNumberColumn("offsetFrom")
48+
.build();
49+
50+
private static final CsvFactory CSV_FACTORY = new CsvFactory();
51+
private static final CsvMapper CSV_MAPPER = new CsvMapper(CSV_FACTORY);
52+
private static final ObjectReader CSV_READER = CSV_MAPPER.reader(SCHEMA.withHeader())
53+
.forType(OffsetRange.class);
4754

48-
public OffsetRangeFile(File file) throws IOException {
49-
this.file = file;
50-
boolean fileIsNew = !file.exists() || file.length() == 0;
51-
CsvFactory factory = new CsvFactory();
52-
this.mapper = new CsvMapper(factory);
53-
this.schema = mapper.schemaFor(OffsetRange.class);
54-
this.fileWriter = new FileWriter(file, true);
55-
this.bufferedWriter = new BufferedWriter(this.fileWriter);
56-
this.generator = factory.createGenerator(bufferedWriter);
57-
this.writer = mapper.writerFor(OffsetRange.class)
58-
.with(fileIsNew ? schema.withHeader() : schema);
55+
private OffsetRangeFile() {
56+
// utility class
5957
}
6058

61-
public void write(OffsetRange range) throws IOException {
62-
writer.writeValue(generator, range);
59+
public static void cleanUp(File file) throws IOException {
60+
File tmpFile = File.createTempFile("offsets", ".csv.tmp");
61+
try (OffsetRangeFile.Writer offsets = new OffsetRangeFile.Writer(tmpFile)) {
62+
offsets.write(OffsetRangeFile.read(file));
63+
}
64+
Files.move(tmpFile.toPath(), file.toPath(), REPLACE_EXISTING);
6365
}
6466

65-
public OffsetRangeSet read() throws IOException {
67+
public static OffsetRangeSet read(File inputFile) throws IOException {
6668
OffsetRangeSet set = new OffsetRangeSet();
67-
ObjectReader reader = mapper.readerFor(OffsetRange.class).with(schema.withHeader());
6869

69-
try (FileReader fr = new FileReader(file);
70+
try (FileReader fr = new FileReader(inputFile);
7071
BufferedReader br = new BufferedReader(fr)) {
71-
MappingIterator<OffsetRange> ranges = reader.readValues(br);
72+
MappingIterator<OffsetRange> ranges = CSV_READER.readValues(br);
7273
while(ranges.hasNext()) {
7374
set.add(ranges.next());
7475
}
7576
}
7677
return set;
7778
}
7879

79-
@Override
80-
public void flush() throws IOException {
81-
generator.flush();
82-
}
80+
public static class Writer implements Flushable, Closeable {
81+
private final FileWriter fileWriter;
82+
private final BufferedWriter bufferedWriter;
83+
private final CsvGenerator generator;
84+
private final ObjectWriter writer;
85+
86+
public Writer(File outputFile) throws IOException {
87+
boolean fileIsNew = !outputFile.exists() || outputFile.length() == 0;
88+
this.fileWriter = new FileWriter(outputFile, true);
89+
this.bufferedWriter = new BufferedWriter(this.fileWriter);
90+
this.generator = CSV_FACTORY.createGenerator(bufferedWriter);
91+
this.writer = CSV_MAPPER.writerFor(OffsetRange.class)
92+
.with(fileIsNew ? SCHEMA.withHeader() : SCHEMA);
93+
}
94+
95+
public void write(OffsetRange range) throws IOException {
96+
writer.writeValue(generator, range);
97+
}
98+
99+
public void write(OffsetRangeSet rangeSet) throws IOException {
100+
for (OffsetRange range : rangeSet) {
101+
write(range);
102+
}
103+
}
83104

84-
@Override
85-
public void close() throws IOException {
86-
generator.close();
87-
bufferedWriter.close();
88-
fileWriter.close();
105+
@Override
106+
public void flush() throws IOException {
107+
generator.flush();
108+
}
109+
110+
@Override
111+
public void close() throws IOException {
112+
generator.close();
113+
bufferedWriter.close();
114+
fileWriter.close();
115+
}
89116
}
90117
}

src/main/java/org/radarcns/OffsetRangeSet.java

Lines changed: 82 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,45 +16,53 @@
1616

1717
package org.radarcns;
1818

19+
import javax.annotation.Nonnull;
1920
import java.util.HashMap;
21+
import java.util.Iterator;
2022
import java.util.Map;
23+
import java.util.NoSuchElementException;
24+
import java.util.SortedMap;
2125
import java.util.SortedSet;
26+
import java.util.TreeMap;
2227
import java.util.TreeSet;
2328

2429
/** Encompasses a range of offsets. */
25-
public class OffsetRangeSet {
26-
private final Map<String, SortedSet<OffsetRange>> ranges;
30+
public class OffsetRangeSet implements Iterable<OffsetRange> {
31+
private final SortedMap<String, SortedSet<OffsetRange>> ranges;
2732

2833
public OffsetRangeSet() {
29-
ranges = new HashMap<>();
34+
ranges = new TreeMap<>();
3035
}
3136

3237
/** Add given offset range to seen offsets. */
3338
public void add(OffsetRange range) {
3439
SortedSet<OffsetRange> topicRanges = ranges.computeIfAbsent(
35-
range.getTopic() + '+' + range.getPartition(), k -> new TreeSet<>());
40+
key(range.getTopic(), range.getPartition()), k -> new TreeSet<>());
3641

3742
SortedSet<OffsetRange> tail = topicRanges.tailSet(range);
3843
SortedSet<OffsetRange> head = topicRanges.headSet(range);
3944

40-
if (!tail.isEmpty()) {
41-
if (tail.first().equals(range)) {
45+
OffsetRange next = !tail.isEmpty() ? tail.first() : null;
46+
OffsetRange previous = !head.isEmpty() ? head.last() : null;
47+
48+
if (next != null) {
49+
if (next.equals(range)) {
4250
return;
4351
}
4452

45-
if (tail.first().getOffsetFrom() <= range.getOffsetTo() + 1) {
46-
if (!head.isEmpty() && head.last().getOffsetTo() >= range.getOffsetFrom() - 1) {
47-
tail.first().setOffsetFrom(head.last().getOffsetFrom());
48-
topicRanges.remove(head.last());
53+
if (next.getOffsetFrom() <= range.getOffsetTo() + 1) {
54+
if (previous != null && previous.getOffsetTo() >= range.getOffsetFrom() - 1) {
55+
next.setOffsetFrom(previous.getOffsetFrom());
56+
topicRanges.remove(previous);
4957
} else {
50-
tail.first().setOffsetFrom(range.getOffsetFrom());
58+
next.setOffsetFrom(range.getOffsetFrom());
5159
}
5260
return;
5361
}
5462
}
5563

56-
if (!head.isEmpty() && head.last().getOffsetTo() >= range.getOffsetFrom() - 1) {
57-
head.last().setOffsetTo(range.getOffsetTo());
64+
if (previous != null && previous.getOffsetTo() >= range.getOffsetFrom() - 1) {
65+
previous.setOffsetTo(range.getOffsetTo());
5866
return;
5967
}
6068

@@ -63,8 +71,7 @@ public void add(OffsetRange range) {
6371

6472
/** Whether this range set completely contains the given range. */
6573
public boolean contains(OffsetRange range) {
66-
String key = range.getTopic() + '+' + range.getPartition();
67-
SortedSet<OffsetRange> topicRanges = ranges.get(key);
74+
SortedSet<OffsetRange> topicRanges = ranges.get(key(range.getTopic(), range.getPartition()));
6875
if (topicRanges == null) {
6976
return false;
7077
}
@@ -74,14 +81,26 @@ public boolean contains(OffsetRange range) {
7481
}
7582

7683
SortedSet<OffsetRange> tail = topicRanges.tailSet(range);
77-
if (!tail.isEmpty()
78-
&& tail.first().getOffsetFrom() == range.getOffsetFrom()
79-
&& tail.first().getOffsetTo() >= range.getOffsetTo()) {
84+
OffsetRange next = !tail.isEmpty() ? tail.first() : null;
85+
86+
if (next != null
87+
&& next.getOffsetFrom() == range.getOffsetFrom()
88+
&& next.getOffsetTo() >= range.getOffsetTo()) {
8089
return true;
8190
}
8291

8392
SortedSet<OffsetRange> head = topicRanges.headSet(range);
84-
return !head.isEmpty() && head.last().getOffsetTo() >= range.getOffsetTo();
93+
OffsetRange previous = !head.isEmpty() ? head.last() : null;
94+
return previous != null && previous.getOffsetTo() >= range.getOffsetTo();
95+
}
96+
97+
public int size(String topic, int partition) {
98+
SortedSet<OffsetRange> rangeSet = ranges.get(key(topic, partition));
99+
if (rangeSet != null) {
100+
return rangeSet.size();
101+
} else {
102+
return 0;
103+
}
85104
}
86105

87106
@Override
@@ -93,4 +112,48 @@ public String toString() {
93112
public boolean isEmpty() {
94113
return ranges.isEmpty();
95114
}
115+
116+
private static String key(String topic, int partition) {
117+
return topic + '+' + partition;
118+
}
119+
120+
@Override
121+
@Nonnull
122+
public Iterator<OffsetRange> iterator() {
123+
final Iterator<SortedSet<OffsetRange>> partitionIterator = ranges.values().iterator();
124+
return new Iterator<OffsetRange>() {
125+
Iterator<OffsetRange> rangeIterator;
126+
@Override
127+
public boolean hasNext() {
128+
while (rangeIterator == null || !rangeIterator.hasNext()) {
129+
if (!partitionIterator.hasNext()) {
130+
return false;
131+
}
132+
rangeIterator = partitionIterator.next().iterator();
133+
}
134+
return rangeIterator.hasNext();
135+
}
136+
137+
@Override
138+
public OffsetRange next() {
139+
if (!hasNext()) {
140+
throw new NoSuchElementException();
141+
}
142+
return rangeIterator.next();
143+
}
144+
};
145+
}
146+
147+
@Override
148+
public boolean equals(Object o) {
149+
return o == this
150+
|| o != null
151+
&& getClass().equals(o.getClass())
152+
&& ranges.equals(((OffsetRangeSet) o).ranges);
153+
}
154+
155+
@Override
156+
public int hashCode() {
157+
return ranges.hashCode();
158+
}
96159
}

0 commit comments

Comments
 (0)