1515package org .jnosql .diana .elasticsearch .document ;
1616
1717
18+ import org .elasticsearch .action .bulk .BulkRequest ;
19+ import org .elasticsearch .action .delete .DeleteRequest ;
20+ import org .elasticsearch .action .index .IndexRequest ;
21+ import org .elasticsearch .action .search .SearchRequest ;
1822import org .elasticsearch .action .search .SearchResponse ;
19- import org .elasticsearch .client .Client ;
23+ import org .elasticsearch .client .RestHighLevelClient ;
2024import org .elasticsearch .index .query .QueryBuilder ;
25+ import org .elasticsearch .search .builder .SearchSourceBuilder ;
2126import org .jnosql .diana .api .document .Document ;
2227import org .jnosql .diana .api .document .DocumentDeleteQuery ;
2328import org .jnosql .diana .api .document .DocumentEntity ;
2429import org .jnosql .diana .api .document .DocumentQuery ;
2530
26- import javax .json .bind .Jsonb ;
27- import javax .json .bind .JsonbBuilder ;
31+ import java .io .IOException ;
2832import java .time .Duration ;
2933import java .util .List ;
3034import java .util .Map ;
3135import java .util .Objects ;
32- import java .util .concurrent .ExecutionException ;
3336import java .util .stream .Collectors ;
3437
35- import static java .nio .charset .StandardCharsets .UTF_8 ;
3638import static java .util .Objects .requireNonNull ;
3739import static java .util .stream .StreamSupport .stream ;
38- import static org .elasticsearch .common .unit .TimeValue .timeValueMillis ;
3940import static org .jnosql .diana .elasticsearch .document .EntityConverter .ID_FIELD ;
4041import static org .jnosql .diana .elasticsearch .document .EntityConverter .getMap ;
4142
4546class DefaultElasticsearchDocumentCollectionManager implements ElasticsearchDocumentCollectionManager {
4647
4748
48- protected static final Jsonb JSONB = JsonbBuilder .create ();
49-
50- private final Client client ;
49+ private final RestHighLevelClient client ;
5150
5251 private final String index ;
5352
54- DefaultElasticsearchDocumentCollectionManager (Client client , String index ) {
53+ DefaultElasticsearchDocumentCollectionManager (RestHighLevelClient client , String index ) {
5554 this .client = client ;
5655 this .index = index ;
5756 }
5857
5958 @ Override
60- public DocumentEntity insert (DocumentEntity entity ) throws NullPointerException {
59+ public DocumentEntity insert (DocumentEntity entity ) {
6160 requireNonNull (entity , "entity is required" );
6261 Document id = entity .find (ID_FIELD )
6362 .orElseThrow (() -> new ElasticsearchKeyFoundException (entity .toString ()));
6463 Map <String , Object > jsonObject = getMap (entity );
65- byte [] bytes = JSONB . toJson ( jsonObject ). getBytes ( UTF_8 );
64+ IndexRequest request = new IndexRequest ( index , entity . getName (), id . get ( String . class )). source ( jsonObject );
6665 try {
67- client .prepareIndex (index , entity .getName (), id .get (String .class )).setSource (bytes )
68- .execute ().get ();
69- return entity ;
70- } catch (InterruptedException | ExecutionException e ) {
71- throw new ElasticsearchException ("An error to try to save/update entity on elasticsearch" , e );
66+ client .index (request );
67+ } catch (IOException e ) {
68+ throw new ElasticsearchException ("An error to insert in Elastic search" , e );
7269 }
7370
71+ return entity ;
7472 }
7573
7674
7775 @ Override
78- public DocumentEntity insert (DocumentEntity entity , Duration ttl ) throws NullPointerException , UnsupportedOperationException {
79- requireNonNull (entity , "entity is required" );
80- requireNonNull (ttl , "ttl is required" );
81- Document id = entity .find (ID_FIELD )
82- .orElseThrow (() -> new ElasticsearchKeyFoundException (entity .toString ()));
83- Map <String , Object > jsonObject = getMap (entity );
84- byte [] bytes = JSONB .toJson (jsonObject ).getBytes (UTF_8 );
85- try {
86- client .prepareIndex (index , entity .getName (), id .get (String .class ))
87- .setSource (bytes )
88- .setTTL (timeValueMillis (ttl .toMillis ()))
89- .execute ().get ();
90- } catch (InterruptedException | ExecutionException e ) {
91- throw new ElasticsearchException ("An error to try to save with TTL entity on elasticsearch" , e );
92- }
93- return entity ;
76+ public DocumentEntity insert (DocumentEntity entity , Duration ttl ) {
77+ throw new UnsupportedOperationException ("The insert with TTL does not support" );
9478 }
9579
9680 @ Override
@@ -102,21 +86,27 @@ public DocumentEntity update(DocumentEntity entity) throws NullPointerException
10286 public void delete (DocumentDeleteQuery query ) throws NullPointerException {
10387 requireNonNull (query , "query is required" );
10488
105- query .getCondition ().orElseThrow (() -> new IllegalArgumentException ("condition is required" ));
106- DocumentQuery select = new ElasticsearchDocumentQuery (query );
89+ query .getCondition ().orElseThrow (() -> new IllegalArgumentException ("condition is required" ));
90+ DocumentQuery select = new ElasticsearchDocumentQuery (query );
10791
10892 List <DocumentEntity > entities = select (select );
10993
94+ if (entities .isEmpty ()) {
95+ return ;
96+ }
97+
98+ BulkRequest bulk = new BulkRequest ();
99+
110100 entities .stream ()
111101 .map (entity -> entity .find (ID_FIELD ).get ().get (String .class ))
112- .forEach (id -> {
113- try {
114- client .prepareDelete (index , query .getDocumentCollection (), id ).execute ().get ();
115- } catch (InterruptedException | ExecutionException e ) {
116- throw new ElasticsearchException ("An error to delete entities on elasticsearch" , e );
117- }
118- });
102+ .map (id -> new DeleteRequest (index , query .getDocumentCollection (), id ))
103+ .forEach (bulk ::add );
119104
105+ try {
106+ client .bulk (bulk );
107+ } catch (IOException e ) {
108+ throw new ElasticsearchException ("An error to delete entities on elasticsearch" , e );
109+ }
120110 }
121111
122112
@@ -130,19 +120,19 @@ public List<DocumentEntity> select(DocumentQuery query) throws NullPointerExcept
130120 public List <DocumentEntity > search (QueryBuilder query , String ... types ) throws NullPointerException {
131121 Objects .requireNonNull (query , "query is required" );
132122
133- SearchResponse searchResponse = null ;
134123 try {
135- searchResponse = client .prepareSearch (index )
136- .setTypes (types )
137- .setQuery (query )
138- .execute ().get ();
139-
140- return stream (searchResponse .getHits ().spliterator (), false )
141- .map (h -> new ElasticsearchEntry (h .getId (), h .getIndex (), h .sourceAsMap ()))
124+ SearchRequest searchRequest = new SearchRequest (index );
125+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder ();
126+ searchSourceBuilder .query (query );
127+ searchRequest .types (types );
128+ SearchResponse search = client .search (searchRequest );
129+
130+ return stream (search .getHits ().spliterator (), false )
131+ .map (h -> new ElasticsearchEntry (h .getId (), h .getIndex (), h .getSourceAsMap ()))
142132 .filter (ElasticsearchEntry ::isNotEmpty )
143133 .map (ElasticsearchEntry ::toEntity )
144134 .collect (Collectors .toList ());
145- } catch (InterruptedException | ExecutionException e ) {
135+ } catch (IOException e ) {
146136 throw new ElasticsearchException ("An error when do search from QueryBuilder on elasticsearch" , e );
147137 }
148138
@@ -151,6 +141,10 @@ public List<DocumentEntity> search(QueryBuilder query, String... types) throws N
151141
152142 @ Override
153143 public void close () {
154-
144+ try {
145+ client .close ();
146+ } catch (IOException e ) {
147+ throw new ElasticsearchException ("An error when close the client" , e );
148+ }
155149 }
156150}
0 commit comments