Skip to content

Commit 5a6124e

Browse files
Send fqn when recording lineage for DB plugins
1 parent 4eeaf34 commit 5a6124e

File tree

31 files changed

+269
-19
lines changed

31 files changed

+269
-19
lines changed

cloudsql-mysql-plugin/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@
126126
io.cdap.plugin.cloudsql.mysql.*;
127127
io.cdap.plugin.db.batch.source.*;
128128
io.cdap.plugin.db.batch.sink.*;
129-
org.apache.commons.lang;
129+
org.apache.commons.lang.*;
130130
org.apache.commons.logging.*;
131131
org.codehaus.jackson.*
132132
</_exportcontents>

cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSink.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,18 @@
2626
import io.cdap.cdap.etl.api.FailureCollector;
2727
import io.cdap.cdap.etl.api.PipelineConfigurer;
2828
import io.cdap.cdap.etl.api.batch.BatchSink;
29+
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
2930
import io.cdap.cdap.etl.api.connector.Connector;
31+
import io.cdap.plugin.common.Asset;
3032
import io.cdap.plugin.common.ConfigUtil;
33+
import io.cdap.plugin.common.LineageRecorder;
3134
import io.cdap.plugin.db.CommonSchemaReader;
3235
import io.cdap.plugin.db.SchemaReader;
3336
import io.cdap.plugin.db.batch.config.AbstractDBSpecificSinkConfig;
3437
import io.cdap.plugin.db.batch.sink.AbstractDBSink;
3538
import io.cdap.plugin.util.CloudSQLUtil;
39+
import io.cdap.plugin.util.DBUtils;
40+
import org.apache.commons.lang.StringUtils;
3641

3742
import java.util.Map;
3843
import javax.annotation.Nullable;
@@ -71,6 +76,29 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
7176
protected SchemaReader getSchemaReader() {
7277
return new CommonSchemaReader();
7378
}
79+
80+
@Override
81+
protected LineageRecorder getLineageRecorder(BatchSinkContext context) {
82+
String host;
83+
String location = "";
84+
if (CloudSQLUtil.PRIVATE_INSTANCE.equalsIgnoreCase(cloudsqlMysqlSinkConfig.getConnection().getInstanceType())) {
85+
// connection is the private IP address
86+
host = cloudsqlMysqlSinkConfig.getConnection().getConnectionName();
87+
} else {
88+
// connection is of the form <projectId>:<region>:<instanceName>
89+
String[] connectionParams = cloudsqlMysqlSinkConfig.getConnection().getConnectionName().split(":");
90+
host = connectionParams[2];
91+
location = connectionParams[1];
92+
}
93+
String fqn = DBUtils.constructFQN("mysql", host, 3306,
94+
cloudsqlMysqlSinkConfig.getConnection().getDatabase(),
95+
cloudsqlMysqlSinkConfig.getReferenceName());
96+
Asset.Builder assetBuilder = Asset.builder(cloudsqlMysqlSinkConfig.getReferenceName()).setFqn(fqn);
97+
if (!StringUtils.isEmpty(location)) {
98+
assetBuilder.setLocation(location);
99+
}
100+
return new LineageRecorder(context, assetBuilder.build());
101+
}
74102

75103
/** CloudSQL MySQL sink configuration. */
76104
public static class CloudSQLMySQLSinkConfig extends AbstractDBSpecificSinkConfig {

cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSource.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,18 @@
2525
import io.cdap.cdap.etl.api.FailureCollector;
2626
import io.cdap.cdap.etl.api.PipelineConfigurer;
2727
import io.cdap.cdap.etl.api.batch.BatchSource;
28+
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
2829
import io.cdap.cdap.etl.api.connector.Connector;
30+
import io.cdap.plugin.common.Asset;
2931
import io.cdap.plugin.common.ConfigUtil;
32+
import io.cdap.plugin.common.LineageRecorder;
3033
import io.cdap.plugin.db.CommonSchemaReader;
3134
import io.cdap.plugin.db.SchemaReader;
3235
import io.cdap.plugin.db.batch.config.AbstractDBSpecificSourceConfig;
3336
import io.cdap.plugin.db.batch.source.AbstractDBSource;
3437
import io.cdap.plugin.util.CloudSQLUtil;
38+
import io.cdap.plugin.util.DBUtils;
39+
import org.apache.commons.lang.StringUtils;
3540

3641
import java.util.Collections;
3742
import java.util.HashMap;
@@ -90,6 +95,29 @@ protected String createConnectionString() {
9095
cloudsqlMysqlSourceConfig.connection.getConnectionName());
9196
}
9297

98+
@Override
99+
protected LineageRecorder getLineageRecorder(BatchSourceContext context) {
100+
String host;
101+
String location = "";
102+
if (CloudSQLUtil.PRIVATE_INSTANCE.equalsIgnoreCase(cloudsqlMysqlSourceConfig.getConnection().getInstanceType())) {
103+
// connection is the private IP address
104+
host = cloudsqlMysqlSourceConfig.getConnection().getConnectionName();
105+
} else {
106+
// connection is of the form <projectId>:<region>:<instanceName>
107+
String[] connectionParams = cloudsqlMysqlSourceConfig.getConnection().getConnectionName().split(":");
108+
host = connectionParams[2];
109+
location = connectionParams[1];
110+
}
111+
String fqn = DBUtils.constructFQN("mysql", host, 3306,
112+
cloudsqlMysqlSourceConfig.getConnection().getDatabase(),
113+
cloudsqlMysqlSourceConfig.getReferenceName());
114+
Asset.Builder assetBuilder = Asset.builder(cloudsqlMysqlSourceConfig.getReferenceName()).setFqn(fqn);
115+
if (!StringUtils.isEmpty(location)) {
116+
assetBuilder.setLocation(location);
117+
}
118+
return new LineageRecorder(context, assetBuilder.build());
119+
}
120+
93121
/** CloudSQL MySQL source config. */
94122
public static class CloudSQLMySQLSourceConfig extends AbstractDBSpecificSourceConfig {
95123

cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsink.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@
9898
"label": "Reference Name",
9999
"name": "referenceName",
100100
"widget-attributes": {
101-
"placeholder": "Name used to identify this sink for lineage"
101+
"placeholder": "Name used to identify this sink for lineage. Typically, the name of the table/view."
102102
}
103103
},
104104
{

cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsource.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@
9898
"label": "Reference Name",
9999
"name": "referenceName",
100100
"widget-attributes": {
101-
"placeholder": "Name used to identify this source for lineage"
101+
"placeholder": "Name used to identify this source for lineage. Typically, the name of the table/view."
102102
}
103103
},
104104
{

cloudsql-postgresql-plugin/pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,10 @@
134134
<instructions>
135135
<_exportcontents>
136136
io.cdap.plugin.cloudsql.postgres.*;
137+
io.cdap.plugin.postgres.*;
137138
io.cdap.plugin.db.batch.source.*;
138139
io.cdap.plugin.db.batch.sink.*;
139-
org.apache.commons.lang;
140+
org.apache.commons.lang.*;
140141
org.apache.commons.logging.*;
141142
org.codehaus.jackson.*
142143
</_exportcontents>

cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSink.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,11 @@
2828
import io.cdap.cdap.etl.api.FailureCollector;
2929
import io.cdap.cdap.etl.api.PipelineConfigurer;
3030
import io.cdap.cdap.etl.api.batch.BatchSink;
31+
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
3132
import io.cdap.cdap.etl.api.connector.Connector;
33+
import io.cdap.plugin.common.Asset;
3234
import io.cdap.plugin.common.ConfigUtil;
35+
import io.cdap.plugin.common.LineageRecorder;
3336
import io.cdap.plugin.db.DBRecord;
3437
import io.cdap.plugin.db.SchemaReader;
3538
import io.cdap.plugin.db.batch.config.AbstractDBSpecificSinkConfig;
@@ -39,6 +42,8 @@
3942
import io.cdap.plugin.postgres.PostgresFieldsValidator;
4043
import io.cdap.plugin.postgres.PostgresSchemaReader;
4144
import io.cdap.plugin.util.CloudSQLUtil;
45+
import io.cdap.plugin.util.DBUtils;
46+
import org.apache.commons.lang.StringUtils;
4247

4348
import java.util.ArrayList;
4449
import java.util.Collections;
@@ -107,6 +112,30 @@ protected FieldsValidator getFieldsValidator() {
107112
return new PostgresFieldsValidator();
108113
}
109114

115+
@Override
116+
protected LineageRecorder getLineageRecorder(BatchSinkContext context) {
117+
String host;
118+
String location = "";
119+
if (CloudSQLUtil.PRIVATE_INSTANCE.equalsIgnoreCase(
120+
cloudsqlPostgresqlSinkConfig.getConnection().getInstanceType())) {
121+
// connection is the private IP address
122+
host = cloudsqlPostgresqlSinkConfig.getConnection().getConnectionName();
123+
} else {
124+
// connection is of the form <projectId>:<region>:<instanceName>
125+
String[] connectionParams = cloudsqlPostgresqlSinkConfig.getConnection().getConnectionName().split(":");
126+
host = connectionParams[2];
127+
location = connectionParams[1];
128+
}
129+
String fqn = DBUtils.constructFQN("postgres", host, 5432,
130+
cloudsqlPostgresqlSinkConfig.getConnection().getDatabase(),
131+
cloudsqlPostgresqlSinkConfig.getReferenceName());
132+
Asset.Builder assetBuilder = Asset.builder(cloudsqlPostgresqlSinkConfig.getReferenceName()).setFqn(fqn);
133+
if (!StringUtils.isEmpty(location)) {
134+
assetBuilder.setLocation(location);
135+
}
136+
return new LineageRecorder(context, assetBuilder.build());
137+
}
138+
110139
/** CloudSQL PostgreSQL sink config. */
111140
public static class CloudSQLPostgreSQLSinkConfig extends AbstractDBSpecificSinkConfig {
112141

cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSource.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,19 @@
2525
import io.cdap.cdap.etl.api.FailureCollector;
2626
import io.cdap.cdap.etl.api.PipelineConfigurer;
2727
import io.cdap.cdap.etl.api.batch.BatchSource;
28+
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
2829
import io.cdap.cdap.etl.api.connector.Connector;
30+
import io.cdap.plugin.common.Asset;
2931
import io.cdap.plugin.common.ConfigUtil;
32+
import io.cdap.plugin.common.LineageRecorder;
3033
import io.cdap.plugin.db.SchemaReader;
3134
import io.cdap.plugin.db.batch.config.AbstractDBSpecificSourceConfig;
3235
import io.cdap.plugin.db.batch.source.AbstractDBSource;
3336
import io.cdap.plugin.postgres.PostgresDBRecord;
3437
import io.cdap.plugin.postgres.PostgresSchemaReader;
3538
import io.cdap.plugin.util.CloudSQLUtil;
39+
import io.cdap.plugin.util.DBUtils;
40+
import org.apache.commons.lang.StringUtils;
3641
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
3742

3843
import java.util.Collections;
@@ -97,6 +102,30 @@ protected String createConnectionString() {
97102
cloudsqlPostgresqlSourceConfig.connection.getConnectionName());
98103
}
99104

105+
@Override
106+
protected LineageRecorder getLineageRecorder(BatchSourceContext context) {
107+
String host;
108+
String location = "";
109+
if (CloudSQLUtil.PRIVATE_INSTANCE.equalsIgnoreCase(
110+
cloudsqlPostgresqlSourceConfig.getConnection().getInstanceType())) {
111+
// connection is the private IP address
112+
host = cloudsqlPostgresqlSourceConfig.getConnection().getConnectionName();
113+
} else {
114+
// connection is of the form <projectId>:<region>:<instanceName>
115+
String[] connectionParams = cloudsqlPostgresqlSourceConfig.getConnection().getConnectionName().split(":");
116+
host = connectionParams[2];
117+
location = connectionParams[1];
118+
}
119+
String fqn = DBUtils.constructFQN("postgres", host, 5432,
120+
cloudsqlPostgresqlSourceConfig.getConnection().getDatabase(),
121+
cloudsqlPostgresqlSourceConfig.getReferenceName());
122+
Asset.Builder assetBuilder = Asset.builder(cloudsqlPostgresqlSourceConfig.getReferenceName()).setFqn(fqn);
123+
if (!StringUtils.isEmpty(location)) {
124+
assetBuilder.setLocation(location);
125+
}
126+
return new LineageRecorder(context, assetBuilder.build());
127+
}
128+
100129
/** CloudSQL PostgreSQL source config. */
101130
public static class CloudSQLPostgreSQLSourceConfig extends AbstractDBSpecificSourceConfig {
102131

cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsink.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@
9898
"label": "Reference Name",
9999
"name": "referenceName",
100100
"widget-attributes": {
101-
"placeholder": "Name used to identify this sink for lineage"
101+
"placeholder": "Name used to identify this sink for lineage. Typically, the name of the table/view."
102102
}
103103
},
104104
{

cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@
9898
"label": "Reference Name",
9999
"name": "referenceName",
100100
"widget-attributes": {
101-
"placeholder": "Name used to identify this source for lineage"
101+
"placeholder": "Name used to identify this source for lineage. Typically, the name of the table/view."
102102
}
103103
},
104104
{

0 commit comments

Comments
 (0)