|
31 | 31 | import java.io.FileWriter;
|
32 | 32 | import java.io.Flushable;
|
33 | 33 | import java.io.IOException;
|
| 34 | +import java.nio.file.Files; |
| 35 | + |
| 36 | +import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; |
34 | 37 |
|
35 | 38 | /**
|
36 | 39 | * Accesses a OffsetRange file using the CSV format. On construction, this will create the file if
|
37 | 40 | * not present.
|
38 | 41 | */
|
39 |
| -public class OffsetRangeFile implements Flushable, Closeable { |
40 |
| - private final CsvMapper mapper; |
41 |
| - private final CsvSchema schema; |
42 |
| - private final File file; |
43 |
| - private final FileWriter fileWriter; |
44 |
| - private final BufferedWriter bufferedWriter; |
45 |
| - private final CsvGenerator generator; |
46 |
| - private final ObjectWriter writer; |
| 42 | +public final class OffsetRangeFile { |
| 43 | + private static final CsvSchema SCHEMA = CsvSchema.builder() |
| 44 | + .addColumn("topic") |
| 45 | + .addNumberColumn("partition") |
| 46 | + .addNumberColumn("offsetTo") |
| 47 | + .addNumberColumn("offsetFrom") |
| 48 | + .build(); |
| 49 | + |
| 50 | + private static final CsvFactory CSV_FACTORY = new CsvFactory(); |
| 51 | + private static final CsvMapper CSV_MAPPER = new CsvMapper(CSV_FACTORY); |
| 52 | + private static final ObjectReader CSV_READER = CSV_MAPPER.reader(SCHEMA.withHeader()) |
| 53 | + .forType(OffsetRange.class); |
47 | 54 |
|
48 |
| - public OffsetRangeFile(File file) throws IOException { |
49 |
| - this.file = file; |
50 |
| - boolean fileIsNew = !file.exists() || file.length() == 0; |
51 |
| - CsvFactory factory = new CsvFactory(); |
52 |
| - this.mapper = new CsvMapper(factory); |
53 |
| - this.schema = mapper.schemaFor(OffsetRange.class); |
54 |
| - this.fileWriter = new FileWriter(file, true); |
55 |
| - this.bufferedWriter = new BufferedWriter(this.fileWriter); |
56 |
| - this.generator = factory.createGenerator(bufferedWriter); |
57 |
| - this.writer = mapper.writerFor(OffsetRange.class) |
58 |
| - .with(fileIsNew ? schema.withHeader() : schema); |
| 55 | + private OffsetRangeFile() { |
| 56 | + // utility class |
59 | 57 | }
|
60 | 58 |
|
61 |
| - public void write(OffsetRange range) throws IOException { |
62 |
| - writer.writeValue(generator, range); |
| 59 | + public static void cleanUp(File file) throws IOException { |
| 60 | + File tmpFile = File.createTempFile("offsets", ".csv.tmp"); |
| 61 | + try (OffsetRangeFile.Writer offsets = new OffsetRangeFile.Writer(tmpFile)) { |
| 62 | + offsets.write(OffsetRangeFile.read(file)); |
| 63 | + } |
| 64 | + Files.move(tmpFile.toPath(), file.toPath(), REPLACE_EXISTING); |
63 | 65 | }
|
64 | 66 |
|
65 |
| - public OffsetRangeSet read() throws IOException { |
| 67 | + public static OffsetRangeSet read(File inputFile) throws IOException { |
66 | 68 | OffsetRangeSet set = new OffsetRangeSet();
|
67 |
| - ObjectReader reader = mapper.readerFor(OffsetRange.class).with(schema.withHeader()); |
68 | 69 |
|
69 |
| - try (FileReader fr = new FileReader(file); |
| 70 | + try (FileReader fr = new FileReader(inputFile); |
70 | 71 | BufferedReader br = new BufferedReader(fr)) {
|
71 |
| - MappingIterator<OffsetRange> ranges = reader.readValues(br); |
| 72 | + MappingIterator<OffsetRange> ranges = CSV_READER.readValues(br); |
72 | 73 | while(ranges.hasNext()) {
|
73 | 74 | set.add(ranges.next());
|
74 | 75 | }
|
75 | 76 | }
|
76 | 77 | return set;
|
77 | 78 | }
|
78 | 79 |
|
79 |
| - @Override |
80 |
| - public void flush() throws IOException { |
81 |
| - generator.flush(); |
82 |
| - } |
| 80 | + public static class Writer implements Flushable, Closeable { |
| 81 | + private final FileWriter fileWriter; |
| 82 | + private final BufferedWriter bufferedWriter; |
| 83 | + private final CsvGenerator generator; |
| 84 | + private final ObjectWriter writer; |
| 85 | + |
| 86 | + public Writer(File outputFile) throws IOException { |
| 87 | + boolean fileIsNew = !outputFile.exists() || outputFile.length() == 0; |
| 88 | + this.fileWriter = new FileWriter(outputFile, true); |
| 89 | + this.bufferedWriter = new BufferedWriter(this.fileWriter); |
| 90 | + this.generator = CSV_FACTORY.createGenerator(bufferedWriter); |
| 91 | + this.writer = CSV_MAPPER.writerFor(OffsetRange.class) |
| 92 | + .with(fileIsNew ? SCHEMA.withHeader() : SCHEMA); |
| 93 | + } |
| 94 | + |
| 95 | + public void write(OffsetRange range) throws IOException { |
| 96 | + writer.writeValue(generator, range); |
| 97 | + } |
| 98 | + |
| 99 | + public void write(OffsetRangeSet rangeSet) throws IOException { |
| 100 | + for (OffsetRange range : rangeSet) { |
| 101 | + write(range); |
| 102 | + } |
| 103 | + } |
83 | 104 |
|
84 |
| - @Override |
85 |
| - public void close() throws IOException { |
86 |
| - generator.close(); |
87 |
| - bufferedWriter.close(); |
88 |
| - fileWriter.close(); |
| 105 | + @Override |
| 106 | + public void flush() throws IOException { |
| 107 | + generator.flush(); |
| 108 | + } |
| 109 | + |
| 110 | + @Override |
| 111 | + public void close() throws IOException { |
| 112 | + generator.close(); |
| 113 | + bufferedWriter.close(); |
| 114 | + fileWriter.close(); |
| 115 | + } |
89 | 116 | }
|
90 | 117 | }
|
0 commit comments