Skip to content

Commit f715cd0

Browse files
[code optimize]
1 parent e31772b commit f715cd0

File tree

10 files changed

+22
-20
lines changed

10 files changed

+22
-20
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package com.dtstack.flink.sql;
2222

2323
import com.dtstack.flink.sql.classloader.DtClassLoader;
24+
import com.dtstack.flink.sql.constrant.ConfigConstrant;
2425
import com.dtstack.flink.sql.enums.ClusterMode;
2526
import com.dtstack.flink.sql.enums.ECacheType;
2627
import com.dtstack.flink.sql.environment.MyLocalStreamEnvironment;
@@ -99,12 +100,6 @@ public class Main {
99100

100101
private static final Logger LOG = LoggerFactory.getLogger(Main.class);
101102

102-
private static final int failureRate = 3;
103-
104-
private static final int failureInterval = 6; //min
105-
106-
private static final int delayInterval = 10; //sec
107-
108103
public static void main(String[] args) throws Exception {
109104

110105
OptionParser optionParser = new OptionParser(args);
@@ -337,9 +332,9 @@ private static StreamExecutionEnvironment getStreamExeEnv(Properties confPropert
337332
env.setBufferTimeout(FlinkUtil.getBufferTimeoutMillis(confProperties));
338333
}
339334
env.setRestartStrategy(RestartStrategies.failureRateRestart(
340-
failureRate,
341-
Time.of(failureInterval, TimeUnit.MINUTES),
342-
Time.of(delayInterval, TimeUnit.SECONDS)
335+
ConfigConstrant.failureRate,
336+
Time.of(ConfigConstrant.failureInterval, TimeUnit.MINUTES),
337+
Time.of(ConfigConstrant.delayInterval, TimeUnit.SECONDS)
343338
));
344339
FlinkUtil.setStreamTimeCharacteristic(env, confProperties);
345340
FlinkUtil.openCheckpoint(env, confProperties);

core/src/main/java/com/dtstack/flink/sql/util/ConfigConstrant.java renamed to core/src/main/java/com/dtstack/flink/sql/constrant/ConfigConstrant.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919

2020

21-
package com.dtstack.flink.sql.util;
21+
package com.dtstack.flink.sql.constrant;
2222

2323

2424
/**
@@ -51,4 +51,11 @@ public class ConfigConstrant {
5151

5252
public static final String FLINK_TIME_CHARACTERISTIC_KEY = "time.characteristic";
5353

54+
// restart plocy
55+
public static final int failureRate = 3;
56+
57+
public static final int failureInterval = 6; //min
58+
59+
public static final int delayInterval = 10; //sec
60+
5461
}

core/src/main/java/com/dtstack/flink/sql/threadFactory/DTThreadFactory.java renamed to core/src/main/java/com/dtstack/flink/sql/factory/DTThreadFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919

2020

21-
package com.dtstack.flink.sql.threadFactory;
21+
package com.dtstack.flink.sql.factory;
2222

2323
import java.util.concurrent.ThreadFactory;
2424
import java.util.concurrent.atomic.AtomicInteger;

core/src/main/java/com/dtstack/flink/sql/side/AllReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
package com.dtstack.flink.sql.side;
2222

23-
import com.dtstack.flink.sql.threadFactory.DTThreadFactory;
23+
import com.dtstack.flink.sql.factory.DTThreadFactory;
2424
import org.apache.flink.api.common.functions.RichFlatMapFunction;
2525
import org.apache.flink.configuration.Configuration;
2626
import org.apache.flink.types.Row;

core/src/main/java/com/dtstack/flink/sql/util/FlinkUtil.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package com.dtstack.flink.sql.util;
2222

2323

24+
import com.dtstack.flink.sql.constrant.ConfigConstrant;
2425
import org.apache.commons.lang3.StringUtils;
2526
import org.apache.flink.api.common.typeinfo.TypeInformation;
2627
import org.apache.flink.runtime.state.filesystem.FsStateBackend;

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import com.dtstack.flink.sql.side.hbase.rowkeydealer.PreRowKeyModeDealerDealer;
3131
import com.dtstack.flink.sql.side.hbase.rowkeydealer.RowKeyEqualModeDealer;
3232
import com.dtstack.flink.sql.side.hbase.table.HbaseSideTableInfo;
33-
import com.dtstack.flink.sql.threadFactory.DTThreadFactory;
33+
import com.dtstack.flink.sql.factory.DTThreadFactory;
3434
import com.google.common.collect.Maps;
3535
import com.stumbleupon.async.Deferred;
3636
import org.apache.flink.api.java.typeutils.RowTypeInfo;

launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterClientFactory.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020

2121
import com.dtstack.flink.sql.option.Options;
2222
import com.dtstack.flink.sql.util.PluginUtil;
23-
import com.dtstack.flink.yarn.JobParameter;
24-
import com.dtstack.flink.yarn.YarnClusterConfiguration;
23+
import com.dtstack.flink.sql.yarn.JobParameter;
24+
import com.dtstack.flink.sql.yarn.YarnClusterConfiguration;
2525
import org.apache.commons.io.Charsets;
2626
import org.apache.commons.lang.StringUtils;
2727
import org.apache.flink.client.program.ClusterClient;
@@ -129,7 +129,7 @@ public static ClusterClient createYarnClient(Options launcherOptions, String mod
129129
//jobmanager+taskmanager param
130130
JobParameter appConf = new JobParameter(confProperties);
131131

132-
com.dtstack.flink.yarn.YarnClusterDescriptor clusterDescriptor = new com.dtstack.flink.yarn.YarnClusterDescriptor(
132+
com.dtstack.flink.sql.yarn.YarnClusterDescriptor clusterDescriptor = new com.dtstack.flink.sql.yarn.YarnClusterDescriptor(
133133
clusterConf, yarnClient, appConf,applicationId, launcherOptions.getName(),null );
134134
clusterClient = clusterDescriptor.deploy();
135135

core/src/main/java/com/dtstack/flink/yarn/JobParameter.java renamed to launcher/src/main/java/com/dtstack/flink/sql/yarn/JobParameter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package com.dtstack.flink.yarn;
16+
package com.dtstack.flink.sql.yarn;
1717

1818
import java.util.Objects;
1919
import java.util.Properties;

core/src/main/java/com/dtstack/flink/yarn/YarnClusterConfiguration.java renamed to launcher/src/main/java/com/dtstack/flink/sql/yarn/YarnClusterConfiguration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package com.dtstack.flink.yarn;
16+
package com.dtstack.flink.sql.yarn;
1717

1818
import org.apache.flink.configuration.Configuration;
1919
import org.apache.hadoop.fs.Path;

core/src/main/java/com/dtstack/flink/yarn/YarnClusterDescriptor.java renamed to launcher/src/main/java/com/dtstack/flink/sql/yarn/YarnClusterDescriptor.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package com.dtstack.flink.yarn;
16+
package com.dtstack.flink.sql.yarn;
1717

1818
import org.apache.flink.client.deployment.ClusterDeploymentException;
1919
import org.apache.flink.client.deployment.ClusterSpecification;
@@ -42,7 +42,6 @@
4242
import java.util.*;
4343
import java.util.concurrent.*;
4444
import java.util.stream.Collectors;
45-
import java.util.stream.Stream;
4645

4746
import static java.util.Objects.requireNonNull;
4847
import static org.apache.hadoop.yarn.api.records.YarnApplicationState.NEW;

0 commit comments

Comments
 (0)