Skip to content

Commit 524f0b7

Browse files
authored
Merge pull request #14 from mlozbin-cybervisiontech/feature/CDAP-15823_new-validation-api
CDAP-15823 Elasticsearch-plugins plugin validation
2 parents bf1ec6d + 2b03a96 commit 524f0b7

File tree

10 files changed

+831
-169
lines changed

10 files changed

+831
-169
lines changed

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -313,8 +313,8 @@
313313
<artifactId>maven-compiler-plugin</artifactId>
314314
<version>3.1</version>
315315
<configuration>
316-
<source>1.7</source>
317-
<target>1.7</target>
316+
<source>1.8</source>
317+
<target>1.8</target>
318318
</configuration>
319319
</plugin>
320320
<plugin>
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Copyright © 2019 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.elastic;
18+
19+
import com.google.common.base.Strings;
20+
import io.cdap.cdap.api.annotation.Description;
21+
import io.cdap.cdap.api.annotation.Macro;
22+
import io.cdap.cdap.api.annotation.Name;
23+
import io.cdap.cdap.etl.api.FailureCollector;
24+
import io.cdap.plugin.common.IdUtils;
25+
import io.cdap.plugin.common.ReferencePluginConfig;
26+
27+
/**
28+
* Basic config class for Elasticsearch plugin.
29+
*/
30+
public abstract class BaseElasticsearchConfig extends ReferencePluginConfig {
31+
public static final String INDEX_NAME = "es.index";
32+
public static final String TYPE_NAME = "es.type";
33+
public static final String HOST = "es.host";
34+
35+
private static final String HOST_DESCRIPTION = "The hostname and port for the Elasticsearch instance; " +
36+
"for example, localhost:9200.";
37+
private static final String INDEX_DESCRIPTION = "The name of the index to query.";
38+
private static final String TYPE_DESCRIPTION = "The name of the type where the data is stored.";
39+
40+
@Name(HOST)
41+
@Description(HOST_DESCRIPTION)
42+
@Macro
43+
private final String hostname;
44+
45+
@Name(INDEX_NAME)
46+
@Description(INDEX_DESCRIPTION)
47+
@Macro
48+
private final String index;
49+
50+
@Name(TYPE_NAME)
51+
@Description(TYPE_DESCRIPTION)
52+
@Macro
53+
private final String type;
54+
55+
public BaseElasticsearchConfig(String referenceName, String hostname, String index, String type) {
56+
super(referenceName);
57+
this.hostname = hostname;
58+
this.index = index;
59+
this.type = type;
60+
}
61+
62+
public String getHostname() {
63+
return hostname;
64+
}
65+
66+
public String getIndex() {
67+
return index;
68+
}
69+
70+
public String getType() {
71+
return type;
72+
}
73+
74+
public String getResource() {
75+
return String.format("%s/%s", index, type);
76+
}
77+
78+
public void validate(FailureCollector collector) {
79+
IdUtils.validateReferenceName(referenceName, collector);
80+
81+
if (!containsMacro(HOST)) {
82+
if (Strings.isNullOrEmpty(hostname)) {
83+
collector.addFailure("Hostname must be specified.", null).withConfigProperty(HOST);
84+
} else {
85+
validateHost(collector);
86+
}
87+
}
88+
89+
if (!containsMacro(INDEX_NAME) && Strings.isNullOrEmpty(index)) {
90+
collector.addFailure("Index must be specified.", null).withConfigProperty(INDEX_NAME);
91+
}
92+
93+
if (!containsMacro(TYPE_NAME) && Strings.isNullOrEmpty(type)) {
94+
collector.addFailure("Type must be specified.", null).withConfigProperty(TYPE_NAME);
95+
}
96+
}
97+
98+
private void validateHost(FailureCollector collector) {
99+
String[] hostParts = hostname.split(":");
100+
101+
// Elasticsearch Hadoop does not support IPV6 https://github.com/elastic/elasticsearch-hadoop/issues/1105
102+
if (hostParts.length != 2) {
103+
collector.addFailure(
104+
"Invalid format of hostname",
105+
"Hostname and port must be specified for the Elasticsearch instance, for example: 'localhost:9200'"
106+
).withConfigProperty(HOST);
107+
} else {
108+
String host = hostParts[0];
109+
String port = hostParts[1];
110+
111+
if (host.isEmpty()) {
112+
collector.addFailure("Host should not be empty.", null)
113+
.withConfigProperty(HOST);
114+
}
115+
116+
try {
117+
int portValue = Integer.parseInt(port);
118+
119+
if (portValue < 0 || portValue > 65535) {
120+
collector.addFailure("Invalid port: " + port, "Port should be in range [0;65535]")
121+
.withConfigProperty(HOST);
122+
}
123+
} catch (NumberFormatException e) {
124+
collector.addFailure(
125+
"Invalid value for port: " + port,
126+
"Port should be a number in range [0;65535]")
127+
.withConfigProperty(HOST);
128+
}
129+
}
130+
}
131+
}

src/main/java/io/cdap/plugin/elastic/ESProperties.java

Lines changed: 0 additions & 35 deletions
This file was deleted.

src/main/java/io/cdap/plugin/elastic/sink/BatchElasticsearchSink.java

Lines changed: 19 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,19 @@
1717
package io.cdap.plugin.elastic.sink;
1818

1919
import io.cdap.cdap.api.annotation.Description;
20-
import io.cdap.cdap.api.annotation.Macro;
2120
import io.cdap.cdap.api.annotation.Name;
2221
import io.cdap.cdap.api.annotation.Plugin;
2322
import io.cdap.cdap.api.data.batch.Output;
2423
import io.cdap.cdap.api.data.format.StructuredRecord;
2524
import io.cdap.cdap.api.dataset.lib.KeyValue;
2625
import io.cdap.cdap.etl.api.Emitter;
26+
import io.cdap.cdap.etl.api.FailureCollector;
27+
import io.cdap.cdap.etl.api.PipelineConfigurer;
2728
import io.cdap.cdap.etl.api.batch.BatchSink;
2829
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
2930
import io.cdap.cdap.format.StructuredRecordStringConverter;
30-
31-
import io.cdap.plugin.common.ReferenceBatchSink;
32-
import io.cdap.plugin.common.ReferencePluginConfig;
3331
import io.cdap.plugin.common.batch.JobUtils;
3432
import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider;
35-
import io.cdap.plugin.elastic.ESProperties;
3633
import org.apache.hadoop.conf.Configuration;
3734
import org.apache.hadoop.io.Text;
3835
import org.apache.hadoop.io.Writable;
@@ -57,33 +54,35 @@
5754
@Name("Elasticsearch")
5855
@Description("Elasticsearch Batch Sink takes the structured record from the input source and converts it " +
5956
"to a JSON string, then indexes it in Elasticsearch using the index, type, and id specified by the user.")
60-
public class BatchElasticsearchSink extends ReferenceBatchSink<StructuredRecord, Writable, Writable> {
61-
private static final String INDEX_DESCRIPTION = "The name of the index where the data will be stored. " +
62-
"If the index does not already exist, it will be created using Elasticsearch's default properties.";
63-
private static final String TYPE_DESCRIPTION = "The name of the type where the data will be stored. " +
64-
"If it does not already exist, it will be created.";
65-
private static final String ID_DESCRIPTION = "The field that will determine the id for the document. " +
66-
"It should match a fieldname in the structured record of the input.";
67-
private static final String HOST_DESCRIPTION = "The hostname and port for the Elasticsearch server; " +
68-
"such as localhost:9200.";
69-
private final ESConfig config;
57+
public class BatchElasticsearchSink extends BatchSink<StructuredRecord, Writable, Writable> {
58+
private final ElasticsearchSinkConfig config;
7059

71-
public BatchElasticsearchSink(ESConfig config) {
72-
super(config);
60+
public BatchElasticsearchSink(ElasticsearchSinkConfig config) {
7361
this.config = config;
7462
}
7563

64+
@Override
65+
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
66+
FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
67+
config.validate(collector);
68+
collector.getOrThrowException();
69+
}
70+
7671
@Override
7772
public void prepareRun(BatchSinkContext context) throws IOException {
73+
FailureCollector collector = context.getFailureCollector();
74+
config.validate(collector);
75+
collector.getOrThrowException();
76+
7877
Job job = JobUtils.createInstance();
7978
Configuration conf = job.getConfiguration();
8079

8180
job.setSpeculativeExecution(false);
8281

83-
conf.set("es.nodes", config.hostname);
84-
conf.set("es.resource.write", String.format("%s/%s", config.index, config.type));
82+
conf.set("es.nodes", config.getHostname());
83+
conf.set("es.resource.write", config.getResource());
8584
conf.set("es.input.json", "yes");
86-
conf.set("es.mapping.id", config.idField);
85+
conf.set("es.mapping.id", config.getIdField());
8786

8887
context.addOutput(Output.of(config.referenceName, new SinkOutputFormatProvider(EsOutputFormat.class, conf)));
8988
}
@@ -93,37 +92,4 @@ public void transform(StructuredRecord record, Emitter<KeyValue<Writable, Writab
9392
emitter.emit(new KeyValue<Writable, Writable>(new Text(StructuredRecordStringConverter.toJsonString(record)),
9493
new Text(StructuredRecordStringConverter.toJsonString(record))));
9594
}
96-
97-
/**
98-
* Config class for BatchElasticsearchSink.java
99-
*/
100-
public static class ESConfig extends ReferencePluginConfig {
101-
@Name(ESProperties.HOST)
102-
@Description(HOST_DESCRIPTION)
103-
@Macro
104-
private String hostname;
105-
106-
@Name(ESProperties.INDEX_NAME)
107-
@Description(INDEX_DESCRIPTION)
108-
@Macro
109-
private String index;
110-
111-
@Name(ESProperties.TYPE_NAME)
112-
@Description(TYPE_DESCRIPTION)
113-
@Macro
114-
private String type;
115-
116-
@Name(ESProperties.ID_FIELD)
117-
@Description(ID_DESCRIPTION)
118-
@Macro
119-
private String idField;
120-
121-
public ESConfig(String referenceName, String hostname, String index, String type, String idField) {
122-
super(referenceName);
123-
this.hostname = hostname;
124-
this.index = index;
125-
this.type = type;
126-
this.idField = idField;
127-
}
128-
}
12995
}

0 commit comments

Comments
 (0)