1818
1919import com .google .common .collect .ImmutableMap ;
2020import io .cdap .cdap .api .annotation .Description ;
21+ import io .cdap .cdap .api .annotation .Macro ;
2122import io .cdap .cdap .api .annotation .Name ;
2223import io .cdap .cdap .api .annotation .Plugin ;
2324import io .cdap .cdap .etl .api .FailureCollector ;
2425import io .cdap .cdap .etl .api .PipelineConfigurer ;
2526import io .cdap .cdap .etl .api .action .Action ;
27+ import io .cdap .plugin .db .ConnectionConfig ;
2628import io .cdap .plugin .db .action .AbstractDBAction ;
2729import io .cdap .plugin .db .action .QueryConfig ;
2830import io .cdap .plugin .util .CloudSQLUtil ;
@@ -48,11 +50,13 @@ public CloudSQLMySQLAction(CloudSQLMySQLActionConfig cloudsqlMysqlActionConfig)
4850 @ Override
4951 public void configurePipeline (PipelineConfigurer pipelineConfigurer ) {
5052 FailureCollector failureCollector = pipelineConfigurer .getStageConfigurer ().getFailureCollector ();
51-
52- CloudSQLUtil .checkConnectionName (
53- failureCollector ,
54- cloudsqlMysqlActionConfig .instanceType ,
55- cloudsqlMysqlActionConfig .connectionName );
53+
54+ if (cloudsqlMysqlActionConfig .canConnect ()) {
55+ CloudSQLUtil .checkConnectionName (
56+ failureCollector ,
57+ cloudsqlMysqlActionConfig .instanceType ,
58+ cloudsqlMysqlActionConfig .connectionName );
59+ }
5660
5761 super .configurePipeline (pipelineConfigurer );
5862 }
@@ -69,10 +73,18 @@ public CloudSQLMySQLActionConfig() {
6973 "The CloudSQL instance to connect to. For a public instance, the connection string should be in the format "
7074 + "<PROJECT_ID>:<REGION>:<INSTANCE_NAME> which can be found in the instance overview page. For a private "
7175 + "instance, enter the internal IP address of the Compute Engine VM cloudsql proxy is running on." )
76+ @ Macro
7277 public String connectionName ;
7378
79+ @ Name (ConnectionConfig .PORT )
80+ @ Description ("Database port number" )
81+ @ Macro
82+ @ Nullable
83+ private Integer port ;
84+
7485 @ Name (DATABASE )
7586 @ Description ("Database name to connect to" )
87+ @ Macro
7688 public String database ;
7789
7890 @ Name (CloudSQLMySQLConstants .CONNECTION_TIMEOUT )
@@ -94,6 +106,7 @@ public String getConnectionString() {
94106 return String .format (
95107 CloudSQLMySQLConstants .PRIVATE_CLOUDSQL_MYSQL_CONNECTION_STRING_FORMAT ,
96108 connectionName ,
109+ getPort (),
97110 database );
98111 }
99112
@@ -103,10 +116,19 @@ public String getConnectionString() {
103116 connectionName );
104117 }
105118
119+ public int getPort () {
120+ return port == null ? 3306 : port ;
121+ }
122+
106123 @ Override
107124 public Map <String , String > getDBSpecificArguments () {
108125 return ImmutableMap .of (
109126 CloudSQLMySQLConstants .CONNECTION_TIMEOUT , String .valueOf (connectionTimeout ));
110127 }
128+
129+ public boolean canConnect () {
130+ return !containsMacro (CloudSQLUtil .CONNECTION_NAME ) && !containsMacro (ConnectionConfig .PORT ) &&
131+ !containsMacro (DATABASE );
132+ }
111133 }
112134}
0 commit comments