Skip to content

Commit 35b6c12

Browse files
authored
[CYB-185] indexing doesnt start when two sources map to the same topic (#64)
1 parent f80be7a commit 35b6c12

File tree

2 files changed

+11
-2
lines changed

2 files changed

+11
-2
lines changed

flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/TableApiAbstractJob.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,13 +334,17 @@ private void configure(StreamTableEnvironment tableEnv) {
334334

335335
protected final String buildInsertSql(String topic, MappingDto mappingDto, ResolvedSchema tableSchema) {
336336
return String.join("\n",
337-
getInsertSqlPrefix() + " " + mappingDto.getTableName() + "(" + getInsertColumns(mappingDto) + ") "
337+
getInsertSqlPrefix() + " " + getTableName(topic, mappingDto) + "(" + getInsertColumns(mappingDto) + ") "
338338
+ getInsertSqlSuffix(),
339339
" SELECT " + getFromColumns(mappingDto, tableSchema),
340340
" from " + KAFKA_TABLE,
341341
String.format(" where `source`='%s'", topic));
342342
}
343343

344+
protected String getTableName(String source, MappingDto mappingDto) {
345+
return mappingDto.getTableName();
346+
}
347+
344348
protected String getInsertSqlPrefix() {
345349
return "INSERT INTO ";
346350
}

flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/impl/TableApiKafkaJob.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ protected void executeInsert(StreamTableEnvironment tableEnv, Map<String, Mappin
5151
mappingDto.getTableName(), "indexing-job", params);
5252

5353
//read from view and write to kafka sink
54-
final Table table = tableEnv.from(mappingDto.getTableName());
54+
final Table table = tableEnv.from(getTableName(topic, mappingDto));
5555
final String schemaString = AvroSchemaUtil.convertToAvro(tablesConfig.get(mappingDto.getTableName()))
5656
.toString();
5757

@@ -98,6 +98,11 @@ protected FormatDescriptor getFormatDescriptor() {
9898
return null;
9999
}
100100

101+
@Override
102+
protected String getTableName(String source, MappingDto mappingDto) {
103+
return source.concat("_tmpview");
104+
}
105+
101106
@Override
102107
protected String getInsertSqlPrefix() {
103108
return "CREATE TEMPORARY VIEW ";

0 commit comments

Comments
 (0)