Skip to content

Commit b70deab

Browse files
committed
* log-exporter: export action and event in parquet format
Signed-off-by: neo <1100909+neowu@users.noreply.github.com>
1 parent 533a8fe commit b70deab

File tree

4 files changed

+50
-40
lines changed

4 files changed

+50
-40
lines changed

ext/log-exporter/src/main/java/core/log/service/ArchiveService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.avro.generic.GenericData;
1212
import org.apache.avro.generic.GenericDatumReader;
1313
import org.apache.parquet.avro.AvroParquetWriter;
14+
import org.apache.parquet.conf.PlainParquetConfiguration;
1415
import org.apache.parquet.hadoop.ParquetWriter;
1516
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
1617
import org.apache.parquet.io.LocalOutputFile;
@@ -62,11 +63,14 @@ public void uploadArchive(LocalDate date) throws IOException {
6263
Path convertToParquet(Path sourcePath) throws IOException {
6364
var watch = new StopWatch();
6465
var targetPath = sourcePath.resolveSibling(sourcePath.getFileName() + "." + Randoms.alphaNumeric(5) + ".parquet");
66+
var config = new PlainParquetConfiguration();
67+
config.setBoolean("parquet.avro.write-old-list-structure", false);
6568
try (DataFileReader<GenericData.Record> reader = new DataFileReader<>(sourcePath.toFile(), new GenericDatumReader<>());
6669
ParquetWriter<GenericData.Record> writer = AvroParquetWriter
6770
.<GenericData.Record>builder(new LocalOutputFile(targetPath))
6871
.withSchema(reader.getSchema())
6972
.withCompressionCodec(CompressionCodecName.ZSTD)
73+
.withConf(config)
7074
.build()) {
7175

7276
for (GenericData.Record record : reader) {
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package core.log;
2+
3+
import core.framework.log.message.ActionLogMessage;
4+
import core.framework.log.message.PerformanceStatMessage;
5+
6+
import java.time.Instant;
7+
import java.util.ArrayList;
8+
import java.util.List;
9+
import java.util.Map;
10+
11+
public class ActionLogMessageFactory {
12+
public static ActionLogMessage create() {
13+
var message = new ActionLogMessage();
14+
message.date = Instant.parse("2022-11-07T00:00:00Z");
15+
message.id = "id";
16+
message.app = "app";
17+
message.action = "action";
18+
message.result = "OK";
19+
message.host = "host";
20+
message.elapsed = 1000L;
21+
List<String> keys = new ArrayList<>();
22+
keys.add(null);
23+
message.context = Map.of("customer_id", List.of("customer_id1", "customer_id2"), "key", keys);
24+
message.performanceStats = Map.of("kafka", perfStats(1, 1000L, 10, 5),
25+
"http", perfStats(2, 2000L, null, null));
26+
return message;
27+
}
28+
29+
private static PerformanceStatMessage perfStats(int count, long totalElapsed, Integer readEntries, Integer writeEntries) {
30+
final PerformanceStatMessage stats = new PerformanceStatMessage();
31+
stats.count = count;
32+
stats.totalElapsed = totalElapsed;
33+
stats.readEntries = readEntries;
34+
stats.writeEntries = writeEntries;
35+
return stats;
36+
}
37+
}

ext/log-exporter/src/test/java/core/log/kafka/ActionLogMessageHandlerTest.java

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,8 @@
22

33
import core.framework.inject.Inject;
44
import core.framework.kafka.Message;
5-
import core.framework.log.message.ActionLogMessage;
6-
import core.framework.log.message.PerformanceStatMessage;
75
import core.framework.util.Files;
6+
import core.log.ActionLogMessageFactory;
87
import core.log.IntegrationTest;
98
import core.log.domain.ActionLogSchema;
109
import core.log.service.ArchiveService;
@@ -13,10 +12,7 @@
1312
import org.junit.jupiter.api.Test;
1413

1514
import java.io.IOException;
16-
import java.time.Instant;
17-
import java.util.ArrayList;
1815
import java.util.List;
19-
import java.util.Map;
2016

2117
/**
2218
* @author neo
@@ -41,30 +37,10 @@ void cleanup() {
4137

4238
@Test
4339
void handle() throws IOException {
44-
var message = new ActionLogMessage();
45-
message.date = Instant.parse("2022-11-07T00:00:00Z");
46-
message.id = "id";
47-
message.app = "app";
48-
message.action = "action";
49-
message.result = "OK";
50-
message.host = "host";
51-
message.elapsed = 1000L;
52-
List<String> keys = new ArrayList<>();
53-
keys.add(null);
54-
message.context = Map.of("customer_id", List.of("customer_id1", "customer_id2"), "key", keys);
55-
message.performanceStats = Map.of("kafka", perfStats(1, 1000L, 10, 5),
56-
"http", perfStats(2, 2000L, null, null));
40+
var message = ActionLogMessageFactory.create();
5741
message.traceLog = "trace";
42+
5843
handler.handle(List.of(new Message<>("key", message)));
5944
handler.handle(List.of(new Message<>("key", message)));
6045
}
61-
62-
private PerformanceStatMessage perfStats(int count, long totalElapsed, Integer readEntries, Integer writeEntries) {
63-
final PerformanceStatMessage stats = new PerformanceStatMessage();
64-
stats.count = count;
65-
stats.totalElapsed = totalElapsed;
66-
stats.readEntries = readEntries;
67-
stats.writeEntries = writeEntries;
68-
return stats;
69-
}
7046
}

ext/log-exporter/src/test/java/core/log/service/ArchiveServiceTest.java

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package core.log.service;
22

33
import core.framework.inject.Inject;
4-
import core.framework.log.message.EventMessage;
54
import core.framework.util.Files;
5+
import core.log.ActionLogMessageFactory;
66
import core.log.IntegrationTest;
7-
import core.log.domain.EventSchema;
7+
import core.log.domain.ActionLogSchema;
88
import org.apache.avro.file.DataFileWriter;
99
import org.apache.avro.generic.GenericData;
1010
import org.apache.avro.specific.SpecificDatumWriter;
@@ -16,7 +16,6 @@
1616

1717
import java.io.IOException;
1818
import java.nio.file.Path;
19-
import java.time.Instant;
2019
import java.time.LocalDate;
2120

2221
import static org.assertj.core.api.Assertions.assertThat;
@@ -26,7 +25,7 @@
2625
*/
2726
class ArchiveServiceTest extends IntegrationTest {
2827
@Inject
29-
EventSchema schema;
28+
ActionLogSchema schema;
3029
private ArchiveService archiveService;
3130

3231
@BeforeEach
@@ -77,15 +76,9 @@ void uploadActionLog() throws IOException {
7776

7877
@Test
7978
void convertToParquet() throws IOException {
80-
var message = new EventMessage();
81-
message.date = Instant.parse("2022-11-07T00:00:00Z");
82-
message.id = "id";
83-
message.app = "app";
84-
message.action = "action";
85-
message.result = "OK";
86-
message.elapsed = 1000L;
87-
88-
Path avroPath = archiveService.localEventFilePath(LocalDate.parse("2022-11-07"));
79+
var message = ActionLogMessageFactory.create();
80+
81+
Path avroPath = archiveService.localActionLogFilePath(LocalDate.parse("2022-11-07"));
8982
archiveService.createParentDir(avroPath);
9083

9184
try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(new SpecificDatumWriter<>(schema.schema))) {

0 commit comments

Comments
 (0)