99import co .elastic .clients .elasticsearch .core .BulkRequest ;
1010import co .elastic .clients .elasticsearch .core .BulkResponse ;
1111import co .elastic .clients .elasticsearch .core .IndexResponse ;
12+ import co .elastic .clients .transport .ElasticsearchTransport ;
1213import co .elastic .clients .transport .rest_client .RestClientTransport ;
1314import co .elastic .clients .elasticsearch .indices .*;
1415import co .elastic .clients .json .jackson .JacksonJsonpMapper ;
1516import com .fasterxml .jackson .databind .ObjectMapper ;
17+ import com .fasterxml .jackson .datatype .jsr310 .JavaTimeModule ;
1618import org .apache .http .HttpHost ;
1719import co .elastic .clients .elasticsearch .core .IndexRequest ;
1820import org .elasticsearch .client .RestClient ;
@@ -45,7 +47,7 @@ public class ElasticClientHelper {
4547
4648 private static RestClient restClient ;
4749
48- private static RestClientTransport transport ;
50+ private static ElasticsearchTransport transport ;
4951
5052 private static ElasticsearchClient client ;
5153 private static ElasticClientHelper instance ;
@@ -60,6 +62,8 @@ public class ElasticClientHelper {
6062 // State messages to be indexed
6163 BlockingQueue <SimpleImmutableEntry <String ,AlarmConfigMessage >> configMessagedQueue = new LinkedBlockingDeque <>();
6264
65+ private final ObjectMapper mapper = new ObjectMapper ();
66+
6367 private ElasticClientHelper () {
6468 try {
6569 Runtime .getRuntime ().addShutdownHook (new Thread (() -> {
@@ -75,9 +79,16 @@ private ElasticClientHelper() {
7579 }
7680 }
7781 }));
82+
83+ // Create the low-level client
7884 restClient = RestClient .builder (
79- new HttpHost (new HttpHost (props .getProperty ("es_host" ),Integer .parseInt (props .getProperty ("es_port" ))))).build ();
80- transport = new RestClientTransport (restClient , new JacksonJsonpMapper ());
85+ new HttpHost (props .getProperty ("es_host" ),Integer .parseInt (props .getProperty ("es_port" )))).build ();
86+
87+ mapper .registerModule (new JavaTimeModule ());
88+ transport = new RestClientTransport (
89+ restClient ,
90+ new JacksonJsonpMapper (mapper )
91+ );
8192 client = new ElasticsearchClient (transport );
8293 if (props .getProperty ("es_sniff" ).equals ("true" )) {
8394 sniffer = Sniffer .builder (restClient ).build ();
@@ -114,38 +125,6 @@ public ElasticsearchClient getClient() {
114125 return client ;
115126 }
116127
117- /**
118- * Check if an index exists with the given name
119- * Note: this is an synchronous call
120- *
121- * @param indexName elastic index name / pattern
122- * @return true if index exists
123- */
124- public boolean indexExists (String indexName ) {
125- ExistsRequest xRequest = new ExistsRequest .Builder ()
126- .index (indexName .toLowerCase ())
127- .build ();
128- try {
129- return client .indices ().exists (xRequest ).value ();
130- } catch (IOException e ) {
131- logger .log (Level .WARNING , "Failed to query elastic" , e );
132- return false ;
133- }
134- }
135-
136- public void indexAlarmStateDocument (String indexName , AlarmStateMessage alarmStateMessage ) {
137- IndexRequest <AlarmStateMessage > indexRequest = new IndexRequest .Builder <AlarmStateMessage >()
138- .index (indexName .toLowerCase ())
139- .document (alarmStateMessage )
140- .build ();
141- try {
142- IndexResponse idxResponse = client .index (indexRequest );
143- } catch (IOException e ) {
144- logger .log (Level .SEVERE , "failed to log message " + alarmStateMessage + " to index " + indexName , e );
145- }
146- }
147-
148-
149128 public void indexAlarmStateDocuments (String indexName , AlarmStateMessage alarmStateMessage ) {
150129 try {
151130 stateMessagedQueue .put (new SimpleImmutableEntry <>(indexName ,alarmStateMessage ));
@@ -168,20 +147,6 @@ public boolean indexAlarmCmdDocument(String indexName, AlarmCommandMessage alarm
168147 }
169148 }
170149
171- public boolean indexAlarmConfigDocument (String indexName , AlarmConfigMessage alarmConfigMessage ) {
172- IndexRequest <AlarmConfigMessage > indexRequest = new IndexRequest .Builder <AlarmConfigMessage >()
173- .index (indexName .toLowerCase ())
174- .document (alarmConfigMessage )
175- .build ();
176- try {
177- IndexResponse indexResponse = client .index (indexRequest );
178- return indexResponse .result ().equals (Result .Created );
179- } catch (IOException e ) {
180- logger .log (Level .SEVERE , "failed to log message " + alarmConfigMessage + " to index " + indexName , e );
181- return false ;
182- }
183- }
184-
185150 public void indexAlarmConfigDocuments (String indexName , AlarmConfigMessage alarmConfigMessage ) {
186151 try {
187152 configMessagedQueue .put (new SimpleImmutableEntry <>(indexName ,alarmConfigMessage ));
@@ -257,7 +222,6 @@ public void initializeIndices() throws IOException {
257222 boolean exists = client .indices ().existsTemplate (request ).value ();
258223
259224 if (!exists ) {
260- ObjectMapper mapper = new ObjectMapper ();
261225 InputStream is = ElasticClientHelper .class .getResourceAsStream ("/alarms_state_template.json" );
262226 PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest .Builder ()
263227 .name (ALARM_STATE_TEMPLATE )
@@ -277,7 +241,6 @@ public void initializeIndices() throws IOException {
277241 exists = client .indices ().existsTemplate (request ).value ();
278242
279243 if (!exists ) {
280- ObjectMapper mapper = new ObjectMapper ();
281244 InputStream is = ElasticClientHelper .class .getResourceAsStream ("/alarms_cmd_template.json" );
282245 PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest .Builder ()
283246 .name (ALARM_CMD_TEMPLATE )
@@ -297,7 +260,6 @@ public void initializeIndices() throws IOException {
297260 exists = client .indices ().existsTemplate (request ).value ();
298261
299262 if (!exists ) {
300- ObjectMapper mapper = new ObjectMapper ();
301263 InputStream is = ElasticClientHelper .class .getResourceAsStream ("/alarms_cmd_template.json" );
302264 PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest .Builder ()
303265 .name (ALARM_CONFIG_TEMPLATE )
0 commit comments