Skip to content

Commit cff6eb7

Browse files
authored
fix(interactive): increase write Kafka speed by using async method (#4549)
1 parent 1319d9b commit cff6eb7

File tree

6 files changed

+19
-4
lines changed

6 files changed

+19
-4
lines changed

interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GarbageCollectManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public void start() {
4747
for (int i = 0; i < CommonConfig.STORE_NODE_COUNT.get(configs); i++) {
4848
CoordinatorSnapshotClient client = clients.getClient(i);
4949
client.synchronizeSnapshot(offlineVersion);
50-
if (i == 0 && offlineVersion % 1000 == 0) {
50+
if (i == 0) {
5151
logger.info("Offline version updated to {}", offlineVersion);
5252
}
5353
}
@@ -63,6 +63,7 @@ public void start() {
6363
interval,
6464
interval,
6565
TimeUnit.MILLISECONDS);
66+
logger.info("GarbageCollectManaget started");
6667
}
6768

6869
public void stop() {

interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/KafkaAppender.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ private long processTask(LogWriter logWriter, IngestTask task) throws IOExceptio
161161
OperationBatch batch = entry.getValue().build();
162162
// logger.info("Log writer append storeId [{}], batch size: {}", storeId,
163163
// batch.getOperationCount());
164-
logWriter.append(storeId, new LogEntry(ingestSnapshotId.get(), batch));
164+
logWriter.appendAsync(storeId, new LogEntry(ingestSnapshotId.get(), batch));
165165
}
166166
} catch (Exception e) {
167167
// write failed, just throw out to fail this task
@@ -256,7 +256,7 @@ public List<Long> replayDMLRecordsFrom(long offset, long timestamp) throws IOExc
256256
if (batch.getOperationCount() == 0) {
257257
continue;
258258
}
259-
logWriter.append(storeId, new LogEntry(batchSnapshotId, batch));
259+
logWriter.appendAsync(storeId, new LogEntry(batchSnapshotId, batch));
260260
replayCount++;
261261
}
262262
ids.add(batchSnapshotId + 1);

interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreSnapshotService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public void synchronizeMinQuerySnapshotId(
2525
SynchronizeMinQuerySnapshotIdRequest request,
2626
StreamObserver<SynchronizeMinQuerySnapshotIdResponse> responseObserver) {
2727
long snapshotId = request.getSnapshotId();
28+
logger.info("synchronize min query snapshot id [{}]", snapshotId);
2829
this.storeService.garbageCollect(
2930
snapshotId,
3031
new CompletionCallback<Void>() {

interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/LogWriter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ public interface LogWriter extends AutoCloseable {
3434

3535
long append(int partition, LogEntry logEntry) throws IOException;
3636

37+
public Future<RecordMetadata> appendAsync(LogEntry logEntry) throws IOException;
38+
3739
public Future<RecordMetadata> appendAsync(int partition, LogEntry logEntry) throws IOException;
3840

3941
void close() throws IOException;

interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/kafka/KafkaLogWriter.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,18 @@ public long append(LogEntry logEntry) {
6767
}
6868

6969
@Override
70-
public Future<RecordMetadata> appendAsync(int partition, LogEntry logEntry) {
70+
public Future<RecordMetadata> appendAsync(LogEntry logEntry) {
7171
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topicName, logEntry));
7272
return future;
7373
}
7474

75+
@Override
76+
public Future<RecordMetadata> appendAsync(int partition, LogEntry logEntry) {
77+
Future<RecordMetadata> future =
78+
producer.send(new ProducerRecord<>(topicName, partition, null, logEntry));
79+
return future;
80+
}
81+
7582
public long waitFuture(Future<RecordMetadata> future) {
7683
RecordMetadata recordMetadata;
7784
try {

interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/readonly/ReadOnlyLogWriter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ public long append(int partition, LogEntry logEntry) throws IOException {
3636
return offset;
3737
}
3838

39+
public Future<RecordMetadata> appendAsync(LogEntry logEntry) throws IOException {
40+
return null;
41+
}
42+
3943
public Future<RecordMetadata> appendAsync(int partition, LogEntry logEntry) throws IOException {
4044
return null;
4145
}

0 commit comments

Comments
 (0)