Skip to content

Commit 0ffef16

Browse files
committed
Added CSV format and fixed offset handling
- Format can be specified with RecordConverterFactory - Offsets are handled including proper support for ranges - Added more tests
1 parent 0d96453 commit 0ffef16

14 files changed

+728
-127
lines changed

build.gradle

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,8 @@ dependencies {
2626
compile group: 'org.apache.avro', name: 'avro', version: avroVersion
2727
compile group: 'org.apache.avro', name: 'avro-mapred', version: avroVersion
2828
compile group: 'com.fasterxml.jackson.core' , name: 'jackson-databind', version: jacksonVersion
29+
compile group: 'com.fasterxml.jackson.dataformat' , name: 'jackson-dataformat-csv', version: jacksonVersion
2930

30-
runtime group: 'org.apache.logging.log4j', name: 'log4j-core', version: log4jVersion
31-
runtime group: 'org.apache.logging.log4j', name: 'log4j-slf4j-impl', version: log4jVersion
3231
runtime group: 'org.apache.hadoop', name: 'hadoop-hdfs', version: hadoopVersion
3332

3433
testCompile group: 'junit', name: 'junit', version: '4.12'
@@ -53,6 +52,10 @@ task fatJar(type: Jar) {
5352
with jar
5453
}
5554

55+
artifacts {
56+
archives fatJar
57+
}
58+
5659
task wrapper(type: Wrapper) {
5760
gradleVersion = '3.4.1'
5861
distributionUrl distributionUrl.replace("bin", "all")
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package org.radarcns;
2+
3+
import java.util.Objects;
4+
import javax.annotation.Nonnull;
5+
6+
/** POJO class for storing offsets. */
7+
public class OffsetRange implements Comparable<OffsetRange> {
8+
private String topic;
9+
private int partition;
10+
private long offsetFrom;
11+
private long offsetTo;
12+
13+
public static OffsetRange parse(String filename) throws NumberFormatException, IndexOutOfBoundsException {
14+
String[] fileNameParts = filename.split("[+.]");
15+
16+
OffsetRange range = new OffsetRange();
17+
range.topic = fileNameParts[0];
18+
range.partition = Integer.parseInt(fileNameParts[1]);
19+
range.offsetFrom = Long.parseLong(fileNameParts[2]);
20+
range.offsetTo = Long.parseLong(fileNameParts[3]);
21+
return range;
22+
}
23+
24+
public String getTopic() {
25+
return topic;
26+
}
27+
28+
public void setTopic(@Nonnull String topic) {
29+
Objects.requireNonNull(topic);
30+
this.topic = topic;
31+
}
32+
33+
public int getPartition() {
34+
return partition;
35+
}
36+
37+
public void setPartition(int partition) {
38+
this.partition = partition;
39+
}
40+
41+
public long getOffsetFrom() {
42+
return offsetFrom;
43+
}
44+
45+
public void setOffsetFrom(long offsetFrom) {
46+
this.offsetFrom = offsetFrom;
47+
}
48+
49+
public long getOffsetTo() {
50+
return offsetTo;
51+
}
52+
53+
public void setOffsetTo(long offsetTo) {
54+
this.offsetTo = offsetTo;
55+
}
56+
57+
@Override
58+
public String toString() {
59+
return topic + '+' + partition + '+' + offsetFrom + '+' + offsetTo;
60+
}
61+
62+
@Override
63+
public int hashCode() {
64+
int result = topic.hashCode();
65+
result = 31 * result + partition;
66+
result = 31 * result + (int) (offsetFrom ^ (offsetFrom >>> 32));
67+
result = 31 * result + (int) (offsetTo ^ (offsetTo >>> 32));
68+
return result;
69+
}
70+
71+
@Override
72+
public boolean equals(Object o) {
73+
if (o == null || !getClass().equals(o.getClass())) {
74+
return false;
75+
}
76+
OffsetRange other = (OffsetRange) o;
77+
return topic.equals(other.topic)
78+
&& partition == other.partition
79+
&& offsetFrom == other.offsetFrom
80+
&& offsetTo == other.offsetTo;
81+
}
82+
83+
@Override
84+
public int compareTo(OffsetRange o) {
85+
int ret = Long.compare(offsetFrom, o.offsetFrom);
86+
if (ret != 0) {
87+
return ret;
88+
}
89+
ret = Long.compare(offsetTo, o.offsetTo);
90+
if (ret != 0) {
91+
return ret;
92+
}
93+
ret = topic.compareTo(o.topic);
94+
if (ret != 0) {
95+
return ret;
96+
}
97+
return Integer.compare(partition, o.partition);
98+
}
99+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package org.radarcns;
2+
3+
import com.fasterxml.jackson.databind.MappingIterator;
4+
import com.fasterxml.jackson.databind.ObjectReader;
5+
import com.fasterxml.jackson.databind.ObjectWriter;
6+
import com.fasterxml.jackson.dataformat.csv.CsvFactory;
7+
import com.fasterxml.jackson.dataformat.csv.CsvGenerator;
8+
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
9+
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
10+
import java.io.BufferedReader;
11+
import java.io.BufferedWriter;
12+
import java.io.Closeable;
13+
import java.io.File;
14+
import java.io.FileReader;
15+
import java.io.FileWriter;
16+
import java.io.Flushable;
17+
import java.io.IOException;
18+
19+
public class OffsetRangeFile implements Flushable, Closeable {
20+
private final CsvMapper mapper;
21+
private final CsvSchema schema;
22+
private final File file;
23+
private final FileWriter fileWriter;
24+
private final BufferedWriter bufferedWriter;
25+
private final CsvGenerator generator;
26+
private final ObjectWriter writer;
27+
28+
public OffsetRangeFile(File file) throws IOException {
29+
this.file = file;
30+
boolean fileIsNew = !file.exists() || file.length() == 0;
31+
CsvFactory factory = new CsvFactory();
32+
this.mapper = new CsvMapper(factory);
33+
this.schema = mapper.schemaFor(OffsetRange.class);
34+
this.fileWriter = new FileWriter(file, true);
35+
this.bufferedWriter = new BufferedWriter(this.fileWriter);
36+
this.generator = factory.createGenerator(bufferedWriter);
37+
this.writer = mapper.writerFor(OffsetRange.class)
38+
.with(fileIsNew ? schema.withHeader() : schema);
39+
}
40+
41+
public void write(OffsetRange range) throws IOException {
42+
writer.writeValue(generator, range);
43+
}
44+
45+
public OffsetRangeSet read() throws IOException {
46+
OffsetRangeSet set = new OffsetRangeSet();
47+
ObjectReader reader = mapper.readerFor(OffsetRange.class).with(schema.withHeader());
48+
49+
try (FileReader fr = new FileReader(file);
50+
BufferedReader br = new BufferedReader(fr)) {
51+
MappingIterator<OffsetRange> ranges = reader.readValues(br);
52+
while(ranges.hasNext()) {
53+
set.add(ranges.next());
54+
}
55+
}
56+
return set;
57+
}
58+
59+
@Override
60+
public void flush() throws IOException {
61+
generator.flush();
62+
}
63+
64+
@Override
65+
public void close() throws IOException {
66+
generator.close();
67+
bufferedWriter.close();
68+
fileWriter.close();
69+
}
70+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package org.radarcns;
2+
3+
import java.util.HashMap;
4+
import java.util.Map;
5+
import java.util.SortedSet;
6+
import java.util.TreeSet;
7+
8+
/** Encompasses a range of offsets. */
9+
public class OffsetRangeSet {
10+
private final Map<String, SortedSet<OffsetRange>> ranges;
11+
12+
public OffsetRangeSet() {
13+
ranges = new HashMap<>();
14+
}
15+
16+
/** Add given offset range to seen offsets. */
17+
public void add(OffsetRange range) {
18+
SortedSet<OffsetRange> topicRanges = ranges.computeIfAbsent(
19+
range.getTopic() + '+' + range.getPartition(), k -> new TreeSet<>());
20+
21+
SortedSet<OffsetRange> tail = topicRanges.tailSet(range);
22+
SortedSet<OffsetRange> head = topicRanges.headSet(range);
23+
24+
if (!tail.isEmpty()) {
25+
if (tail.first().equals(range)) {
26+
return;
27+
}
28+
29+
if (tail.first().getOffsetFrom() <= range.getOffsetTo() + 1) {
30+
if (!head.isEmpty() && head.last().getOffsetTo() >= range.getOffsetFrom() - 1) {
31+
tail.first().setOffsetFrom(head.last().getOffsetFrom());
32+
topicRanges.remove(head.last());
33+
} else {
34+
tail.first().setOffsetFrom(range.getOffsetFrom());
35+
}
36+
return;
37+
}
38+
}
39+
40+
if (!head.isEmpty() && head.last().getOffsetTo() >= range.getOffsetFrom() - 1) {
41+
head.last().setOffsetTo(range.getOffsetTo());
42+
return;
43+
}
44+
45+
topicRanges.add(range);
46+
}
47+
48+
/** Whether this range set completely contains the given range. */
49+
public boolean contains(OffsetRange range) {
50+
String key = range.getTopic() + '+' + range.getPartition();
51+
SortedSet<OffsetRange> topicRanges = ranges.get(key);
52+
if (topicRanges == null) {
53+
return false;
54+
}
55+
56+
if (topicRanges.contains(range)) {
57+
return true;
58+
}
59+
60+
SortedSet<OffsetRange> tail = topicRanges.tailSet(range);
61+
if (!tail.isEmpty()
62+
&& tail.first().getOffsetFrom() == range.getOffsetFrom()
63+
&& tail.first().getOffsetTo() >= range.getOffsetTo()) {
64+
return true;
65+
}
66+
67+
SortedSet<OffsetRange> head = topicRanges.headSet(range);
68+
return !head.isEmpty() && head.last().getOffsetTo() >= range.getOffsetTo();
69+
}
70+
71+
@Override
72+
public String toString() {
73+
return "OffsetRangeSet" + ranges;
74+
}
75+
76+
public boolean isEmpty() {
77+
return ranges.isEmpty();
78+
}
79+
}

0 commit comments

Comments
 (0)