1616import com .didiglobal .logi .log .LogFactory ;
1717import com .google .common .collect .Lists ;
1818import com .xiaojukeji .know .streaming .km .common .bean .po .BaseESPO ;
19- import com .xiaojukeji .know .streaming .km .common .constant .ESConstant ;
2019import com .xiaojukeji .know .streaming .km .common .utils .EnvUtil ;
2120import org .apache .commons .collections .CollectionUtils ;
2221import org .apache .commons .lang3 .StringUtils ;
3736
3837@ Component
3938public class ESOpClient {
40-
4139 private static final ILog LOGGER = LogFactory .getLog ("ES_LOGGER" );
4240
4341 /**
4442 * es 地址
4543 */
4644 @ Value ("${es.client.address}" )
4745 private String esAddress ;
46+
4847 /**
4948 * es 访问密码
5049 */
@@ -54,22 +53,32 @@ public class ESOpClient {
5453 /**
5554 * 客户端个数
5655 */
57- private static final int ES_CLIENT_COUNT = 30 ;
58-
59- private static final int MAX_RETRY_CNT = 5 ;
56+ @ Value ("${es.client.client-cnt:10}" )
57+ private Integer clientCnt ;
6058
61- private static final int ES_IO_THREAD_COUNT = 4 ;
59+ /**
60+ * 最大重试次数
61+ */
62+ @ Value ("${es.client.max-retry-cnt:5}" )
63+ private Integer maxRetryCnt ;
6264
65+ /**
66+ * IO线程数
67+ */
68+ @ Value ("${es.client.io-thread-cnt:2}" )
69+ private Integer ioThreadCnt ;
6370
6471 /**
6572 * 更新es数据的客户端连接队列
6673 */
67- private LinkedBlockingQueue <ESClient > esClientPool = new LinkedBlockingQueue <>( ES_CLIENT_COUNT ) ;
74+ private LinkedBlockingQueue <ESClient > esClientPool ;
6875
6976 @ PostConstruct
7077 public void init (){
71- for (int i = 0 ; i < ES_CLIENT_COUNT ; ++i ) {
72- ESClient esClient = buildEsClient (esAddress , esPass , "" , "" );
78+ esClientPool = new LinkedBlockingQueue <>( clientCnt );
79+
80+ for (int i = 0 ; i < clientCnt ; ++i ) {
81+ ESClient esClient = this .buildEsClient (esAddress , esPass , "" , "" );
7382 if (esClient != null ) {
7483 this .esClientPool .add (esClient );
7584 LOGGER .info ("class=ESOpClient||method=init||msg=add new es client {}" , esAddress );
@@ -245,7 +254,7 @@ public boolean index(String indexName, String id, String source) {
245254 esIndexRequest .source (source );
246255 esIndexRequest .id (id );
247256
248- for (int i = 0 ; i < MAX_RETRY_CNT ; ++i ) {
257+ for (int i = 0 ; i < this . maxRetryCnt ; ++i ) {
249258 response = esClient .index (esIndexRequest ).actionGet (10 , TimeUnit .SECONDS );
250259 if (response == null ) {
251260 continue ;
@@ -307,7 +316,7 @@ public boolean batchInsert(String indexName, List<? extends BaseESPO> pos) {
307316 batchRequest .addNode (BatchType .INDEX , indexName , null , po .getKey (), JSON .toJSONString (po ));
308317 }
309318
310- for (int i = 0 ; i < MAX_RETRY_CNT ; ++i ) {
319+ for (int i = 0 ; i < this . maxRetryCnt ; ++i ) {
311320 response = esClient .batch (batchRequest ).actionGet (2 , TimeUnit .MINUTES );
312321 if (response == null ) {continue ;}
313322
@@ -428,8 +437,8 @@ private ESClient buildEsClient(String address,String password,String clusterName
428437 if (StringUtils .isNotBlank (password )){
429438 esClient .setPassword (password );
430439 }
431- if (ES_IO_THREAD_COUNT > 0 ) {
432- esClient .setIoThreadCount ( ES_IO_THREAD_COUNT );
440+ if (this . ioThreadCnt > 0 ) {
441+ esClient .setIoThreadCount ( this . ioThreadCnt );
433442 }
434443
435444 // 配置http超时
@@ -439,11 +448,13 @@ private ESClient buildEsClient(String address,String password,String clusterName
439448
440449 return esClient ;
441450 } catch (Exception e ) {
442- esClient .close ();
443-
444- LOGGER .error ("class=ESESOpClient||method=buildEsClient||errMsg={}||address={}" , e .getMessage (), address ,
445- e );
451+ try {
452+ esClient .close ();
453+ } catch (Exception innerE ) {
454+ // ignore
455+ }
446456
457+ LOGGER .error ("class=ESESOpClient||method=buildEsClient||errMsg={}||address={}" , e .getMessage (), address , e );
447458 return null ;
448459 }
449460 }
0 commit comments