Skip to content

Commit d19d90b

Browse files
committed
Merge branch 'develop' into codestyle
# Conflicts: # flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/TableApiAbstractJob.java # flink-cyber/metron-parser-chain/parser-chains-config-service/src/main/java/com/cloudera/parserchains/queryservice/config/AppProperties.java # flink-cyber/metron-parser-chain/parser-chains-config-service/src/main/java/com/cloudera/parserchains/queryservice/controller/ChainController.java # flink-cyber/metron-parser-chain/parser-chains-config-service/src/main/java/com/cloudera/parserchains/queryservice/model/describe/OcsfIndexMappingDescriptor.java # flink-cyber/metron-parser-chain/parser-chains-config-service/src/main/java/com/cloudera/parserchains/queryservice/service/IndexingService.java
2 parents ee259d5 + e5295c6 commit d19d90b

File tree

29 files changed

+1341
-522
lines changed

29 files changed

+1341
-522
lines changed

flink-cyber/flink-cyber-api/src/main/java/com/cloudera/cyber/indexing/MappingColumnDto.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
import com.fasterxml.jackson.annotation.JsonIgnore;
44
import com.fasterxml.jackson.annotation.JsonProperty;
5+
import java.util.Arrays;
6+
import java.util.Collections;
7+
import java.util.List;
8+
import java.util.stream.Collectors;
59
import lombok.AllArgsConstructor;
610
import lombok.Data;
711
import lombok.NoArgsConstructor;
@@ -28,23 +32,35 @@ public class MappingColumnDto {
2832
private Boolean isMap;
2933

3034
@JsonIgnore
31-
public String getKafkaName() {
35+
public List<String> getKafkaNameList() {
3236
final String properName = kafkaName == null ? name : kafkaName;
3337
if (getIsMap()) {
34-
return String.format("['%s']", properName);
35-
} else {
36-
if (getPath().equals("..")) {
37-
return String.format("%s", properName);
38-
}
39-
return String.format(".%s", properName);
38+
return Collections.singletonList(String.format("['%s']", properName));
4039
}
40+
41+
String[] kafkaNamesSplit = properName.split(",");
42+
43+
return Arrays.stream(kafkaNamesSplit)
44+
.map(singleKafkaName -> {
45+
if (getPath().equals("..")) {
46+
return String.format("%s", singleKafkaName);
47+
}
48+
return String.format(".%s", singleKafkaName);
49+
})
50+
.collect(Collectors.toList());
4151
}
4252

4353
@JsonIgnore
4454
public String getRawKafkaName() {
4555
return kafkaName;
4656
}
4757

58+
@JsonProperty("path")
59+
public String getRawPath() {
60+
return this.path;
61+
}
62+
63+
@JsonIgnore
4864
public String getPath() {
4965
if (StringUtils.isEmpty(path)) {
5066
return "extensions";

flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/TableApiAbstractJob.java

Lines changed: 411 additions & 421 deletions
Large diffs are not rendered by default.

flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/TableApiJobFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.cloudera.cyber.indexing.hive.tableapi;
22

3+
import com.cloudera.cyber.indexing.hive.tableapi.impl.TableApiFilesystemJob;
34
import com.cloudera.cyber.indexing.hive.tableapi.impl.TableApiHiveJob;
45
import com.cloudera.cyber.indexing.hive.tableapi.impl.TableApiKafkaJob;
56
import com.cloudera.cyber.scoring.ScoredMessage;
@@ -21,6 +22,8 @@ public static TableApiAbstractJob getJobByConnectorName(String typeName, Paramet
2122
return new TableApiHiveJob(params, env, source);
2223
case "kafka":
2324
return new TableApiKafkaJob(params, env, source);
25+
case "filesystem":
26+
return new TableApiFilesystemJob(params, env, source);
2427
default:
2528
throw new RuntimeException(String.format(
2629
"Unknown job type name [%s] provided while the Flink writer is selected as TableAPI", typeName));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package com.cloudera.cyber.indexing.hive.tableapi.impl;
2+
3+
import com.cloudera.cyber.indexing.hive.tableapi.TableApiAbstractJob;
4+
import com.cloudera.cyber.scoring.ScoredMessage;
5+
import java.io.IOException;
6+
import org.apache.flink.api.java.utils.ParameterTool;
7+
import org.apache.flink.streaming.api.datastream.DataStream;
8+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
9+
import org.apache.flink.table.api.FormatDescriptor;
10+
import org.apache.flink.table.api.TableDescriptor;
11+
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
12+
13+
14+
public class TableApiFilesystemJob extends TableApiAbstractJob {
15+
16+
private static final String BASE_TABLE_JSON = "base-hive-table.json";
17+
private final String format;
18+
private final String path;
19+
20+
public TableApiFilesystemJob(ParameterTool params, StreamExecutionEnvironment env, DataStream<ScoredMessage> source)
21+
throws IOException {
22+
super(params, env, source, "Filesystem", BASE_TABLE_JSON);
23+
format = params.get("flink.files.format", "json");
24+
path = params.getRequired("flink.files.path");
25+
}
26+
27+
@Override
28+
protected StreamExecutionEnvironment jobReturnValue() {
29+
return null;
30+
}
31+
32+
@Override
33+
protected String getTableConnector() {
34+
return "filesystem";
35+
}
36+
37+
@Override
38+
protected FormatDescriptor getFormatDescriptor() {
39+
return FormatDescriptor.forFormat(format).build();
40+
}
41+
42+
@Override
43+
protected void registerCatalog(StreamTableEnvironment tableEnv) {
44+
}
45+
46+
@Override
47+
protected TableDescriptor.Builder fillTableOptions(TableDescriptor.Builder builder) {
48+
return super.fillTableOptions(builder)
49+
.option("path", path)
50+
.option("format", format)
51+
.option("sink.partition-commit.policy.kind", "success-file");
52+
}
53+
54+
}

flink-cyber/flink-indexing/flink-indexing-hive/src/test/java/com/cloudera/cyber/indexing/hive/tableapi/TableApiAbstractJobTest.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package com.cloudera.cyber.indexing.hive.tableapi;
22

3+
import static org.assertj.core.api.Assertions.assertThat;
34
import static org.assertj.core.api.Assertions.assertThatThrownBy;
5+
46
import com.cloudera.cyber.indexing.MappingColumnDto;
57
import com.cloudera.cyber.indexing.MappingDto;
68
import com.cloudera.cyber.indexing.hive.tableapi.impl.TableApiKafkaJob;
@@ -51,7 +53,40 @@ public static Stream<Arguments> mappingsData() {
5153
new MappingColumnDto("column6", null, null, null, false),
5254
new MappingColumnDto("column7", null, null, null, false),
5355
new MappingColumnDto("column8", null, null, null, false),
54-
new MappingColumnDto("column9", null, null, null, false))))));
56+
new MappingColumnDto("column9", null, null, null, false))))),
57+
58+
Arguments.of(Collections.singletonMap(GIVEN_TABLE_NAME, ResolvedSchema.of(
59+
Column.physical("column1", DataTypes.STRING()),
60+
Column.physical("column2", DataTypes.STRING()),
61+
Column.physical("column3", DataTypes.STRING()))),
62+
Collections.singletonMap(GIVEN_SOURCE,
63+
new MappingDto(GIVEN_TABLE_NAME, new ArrayList<>(), Arrays.asList(
64+
new MappingColumnDto("column1", "column1,column2", null, null, false),
65+
new MappingColumnDto("column2", "column3", null, null, false))))));
66+
}
67+
68+
public static Stream<Arguments> insertSqlData() {
69+
return Stream.of(
70+
Arguments.of("topic",
71+
new MappingDto("tableName", Collections.emptyList(), Collections.emptyList()),
72+
ResolvedSchema.of(),
73+
"CREATE TEMPORARY VIEW topic_tmpview( ) AS \n" +
74+
" SELECT \n" +
75+
" from KafkaTempView\n" +
76+
" where `source`='topic'"),
77+
Arguments.of("topic",
78+
new MappingDto("tableName", Collections.emptyList(), Arrays.asList(
79+
new MappingColumnDto("column1", "column1,column2", null, "ROW(%s, %s)", false),
80+
new MappingColumnDto("column2", "column3", null, null, false))),
81+
ResolvedSchema.of(
82+
Column.physical("column1", DataTypes.STRING()),
83+
Column.physical("column2", DataTypes.STRING()),
84+
Column.physical("column3", DataTypes.STRING())),
85+
"CREATE TEMPORARY VIEW topic_tmpview( column1, column2 ) AS \n" +
86+
" SELECT ROW((message.extensions.column1), (message.extensions.column2)), (message.extensions.column3) \n" +
87+
" from KafkaTempView\n" +
88+
" where `source`='topic'")
89+
);
5590
}
5691

5792
public static Stream<Arguments> mappingsExceptionData() {
@@ -92,13 +127,21 @@ public static Stream<Arguments> mappingsExceptionData() {
92127
"Found column mappings of non-string type without transformations for source [%s]: %s",
93128
GIVEN_SOURCE, "[column1]")));
94129
}
130+
95131
@ParameterizedTest
96132
@MethodSource("mappingsData")
97133
void shouldValidateMappings(Map<String, ResolvedSchema> givenTableSchemaMap,
98134
Map<String, MappingDto> givenTopicMapping) {
99135
job.validateMappings(givenTableSchemaMap, givenTopicMapping);
100136
}
101137

138+
@ParameterizedTest
139+
@MethodSource("insertSqlData")
140+
void shouldGenerateInsertSql(String topic, MappingDto mappingDto, ResolvedSchema tableSchema, String expectedSql) {
141+
String actualSql = job.buildInsertSql(topic, mappingDto, tableSchema);
142+
assertThat(actualSql).isEqualTo(expectedSql);
143+
}
144+
102145
@ParameterizedTest
103146
@MethodSource("mappingsExceptionData")
104147
void shouldThrowExceptionWhenValidateMappings(Map<String, ResolvedSchema> givenTableSchemaMap,

flink-cyber/metron-parser-chain/parser-chains-config-service/frontend/parser-chains-client/src/app/app-routing.module.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import {ClusterPageComponent} from "./cluster/cluster-page/cluster-page.componen
2424
import {PipelinesComponent} from 'src/app/cluster/pipelines/pipelines.component';
2525
import {PipelineCreateComponent} from 'src/app/cluster/pipelines/pipeline-create/pipeline-create.component';
2626
import {PipelineSubmitComponent} from 'src/app/cluster/pipelines/pipeline-submit/pipeline-submit.component';
27+
import {PipelineStepperComponent} from 'src/app/cluster/pipelines/pipeline-stepper/pipeline-stepper.component';
2728

2829
export const routes: Routes = [
2930
{ path: '404', component: PageNotFoundComponent },
@@ -37,6 +38,7 @@ export const routes: Routes = [
3738
{ path: 'clusters', component: ClusterListPageComponent },
3839
{ path: 'clusters/pipelines', component: PipelinesComponent },
3940
{ path: 'clusters/pipelines/create', component: PipelineCreateComponent },
41+
{ path: 'clusters/pipelines/stepper', component: PipelineStepperComponent },
4042
{ path: 'clusters/pipelines/submit', component: PipelineSubmitComponent },
4143
{ path: 'clusters/:clusterId', component: ClusterPageComponent},
4244

flink-cyber/metron-parser-chain/parser-chains-config-service/frontend/parser-chains-client/src/app/chain-list-page/chain-list-page.component.spec.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ const mockChains: ChainModel[] = [{id: '1', name: 'test1'}, {id: '2', name: 'tes
4141
describe('PipelinesComponent', () => {
4242
let component: ChainListPageComponent;
4343
let fixture: ComponentFixture<ChainListPageComponent>;
44-
let pipelineService: jasmine.SpyObj<PipelineService>
45-
let chainListPageService: jasmine.SpyObj<ChainListPageService>
44+
let pipelineService: jasmine.SpyObj<PipelineService>;
45+
let chainListPageService: jasmine.SpyObj<ChainListPageService>;
4646

4747
beforeEach(waitForAsync(() => {
4848
TestBed.configureTestingModule({

flink-cyber/metron-parser-chain/parser-chains-config-service/frontend/parser-chains-client/src/app/chain-page/chain-page.models.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,12 @@ export interface IndexTableMapping {
6666
column_mapping: IndexingColumnMapping[]
6767
}
6868

69+
export interface TableColumnDto {
70+
name: string;
71+
type: string;
72+
nullable: boolean;
73+
}
74+
6975
export interface IndexingColumnMapping {
7076
name: string;
7177
kafka_name?: string;

flink-cyber/metron-parser-chain/parser-chains-config-service/frontend/parser-chains-client/src/app/chain-page/components/ocsf-form/ocsf-form.component.html

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,23 @@
6363
nzText="Import/Export"
6464
nzOrientation="center">
6565
</nz-divider>
66+
<nz-form-item>
67+
<nz-form-label>Table Config path</nz-form-label>
68+
<nz-form-control>
69+
<nz-input-group nzPrefixIcon="file">
70+
<input formControlName="_tableFilePath"
71+
nz-input
72+
placeholder="/path/to/tableConfig.json"/>
73+
</nz-input-group>
74+
</nz-form-control>
75+
</nz-form-item>
6676
<nz-form-item>
6777
<nz-form-label>Mapping path</nz-form-label>
6878
<nz-form-control>
6979
<nz-input-group nzPrefixIcon="file">
70-
<input formControlName="_filePath"
80+
<input formControlName="_mappingFilePath"
7181
nz-input
72-
placeholder="/path/to/file.json"/>
82+
placeholder="/path/to/mappingConfig.json"/>
7383
</nz-input-group>
7484
</nz-form-control>
7585
</nz-form-item>

0 commit comments

Comments
 (0)