1515package org .jnosql .diana .elasticsearch .document ;
1616
1717
18- import org .elasticsearch .action .ActionListener ;
19- import org .elasticsearch .action .bulk .BulkRequestBuilder ;
20- import org .elasticsearch .action .bulk .BulkResponse ;
21- import org .elasticsearch .client .Client ;
2218import org .elasticsearch .index .query .QueryBuilder ;
2319import org .jnosql .diana .api .ExecuteAsyncQueryException ;
24- import org .jnosql .diana .api .document .Document ;
2520import org .jnosql .diana .api .document .DocumentCollectionManagerAsync ;
26- import org .jnosql .diana .api .document .DocumentDeleteQuery ;
2721import org .jnosql .diana .api .document .DocumentEntity ;
28- import org .jnosql .diana .api .document .DocumentQuery ;
2922
30- import javax .json .bind .Jsonb ;
31- import javax .json .bind .JsonbBuilder ;
32- import java .time .Duration ;
3323import java .util .List ;
34- import java .util .Map ;
3524import java .util .function .Consumer ;
3625
37- import static java .nio .charset .StandardCharsets .UTF_8 ;
38- import static java .util .Objects .requireNonNull ;
39- import static org .elasticsearch .common .unit .TimeValue .timeValueMillis ;
40- import static org .jnosql .diana .elasticsearch .document .EntityConverter .ID_FIELD ;
41- import static org .jnosql .diana .elasticsearch .document .EntityConverter .getMap ;
42-
43- public class ElasticsearchDocumentCollectionManagerAsync implements DocumentCollectionManagerAsync {
44-
45- protected static final Jsonb JSONB = JsonbBuilder .create ();
46-
47- private static final Consumer <DocumentEntity > NOOP = e -> {
48- };
49-
50- private final Client client ;
51- private final String index ;
52-
53- ElasticsearchDocumentCollectionManagerAsync (Client client , String index ) {
54-
55- this .client = client ;
56- this .index = index ;
57- }
58-
59- @ Override
60- public void insert (DocumentEntity entity ) throws ExecuteAsyncQueryException , UnsupportedOperationException , NullPointerException {
61- insert (entity , NOOP );
62- }
63-
64- @ Override
65- public void insert (DocumentEntity entity , Duration ttl ) throws ExecuteAsyncQueryException , UnsupportedOperationException , NullPointerException {
66- insert (entity , ttl , e -> {
67- });
68- }
69-
70- @ Override
71- public void insert (DocumentEntity entity , Consumer <DocumentEntity > callBack ) throws ExecuteAsyncQueryException , UnsupportedOperationException , NullPointerException {
72- requireNonNull (entity , "entity is required" );
73- requireNonNull (callBack , "callBack is required" );
74- Document id = entity .find (ID_FIELD )
75- .orElseThrow (() -> new ElasticsearchKeyFoundException (entity .toString ()));
76- Map <String , Object > jsonObject = getMap (entity );
77- byte [] bytes = JSONB .toJson (jsonObject ).getBytes (UTF_8 );
78- client .prepareIndex (index , entity .getName (), id .get (String .class )).setSource (bytes ).execute ()
79- .addListener (new SaveActionListener (callBack , entity ));
80-
81-
82- }
83-
84- @ Override
85- public void insert (DocumentEntity entity , Duration ttl , Consumer <DocumentEntity > callBack ) throws ExecuteAsyncQueryException ,
86- UnsupportedOperationException , NullPointerException {
87- requireNonNull (entity , "entity is required" );
88- requireNonNull (ttl , "ttl is required" );
89- requireNonNull (callBack , "callBack is required" );
90- Document id = entity .find (ID_FIELD )
91- .orElseThrow (() -> new ElasticsearchKeyFoundException (entity .toString ()));
92- Map <String , Object > jsonObject = getMap (entity );
93- byte [] bytes = JSONB .toJson (jsonObject ).getBytes (UTF_8 );
94- client .prepareIndex (index , entity .getName (), id .get (String .class )).setSource (bytes ).
95- setTTL (timeValueMillis (ttl .toMillis ())).execute ()
96- .addListener (new SaveActionListener (callBack , entity ));
97-
98- }
99-
100- @ Override
101- public void update (DocumentEntity entity ) throws ExecuteAsyncQueryException , UnsupportedOperationException , NullPointerException {
102- insert (entity );
103- }
104-
105- @ Override
106- public void update (DocumentEntity entity , Consumer <DocumentEntity > callBack ) throws ExecuteAsyncQueryException , UnsupportedOperationException , NullPointerException {
107- insert (entity , callBack );
108- }
109-
110- @ Override
111- public void delete (DocumentDeleteQuery query ) throws ExecuteAsyncQueryException , UnsupportedOperationException {
112- delete (query , d -> {
113- });
114-
115- }
116-
117- @ Override
118- public void delete (DocumentDeleteQuery query , Consumer <Void > callBack ) throws ExecuteAsyncQueryException , UnsupportedOperationException , NullPointerException {
119- requireNonNull (query , "query is required" );
120- requireNonNull (callBack , "callBack is required" );
121-
122- List <DocumentEntity > entities = EntityConverter .query (DocumentQuery .of (query .getCollection ())
123- .and (query .getCondition ()
124- .orElseThrow (()-> new IllegalArgumentException ("condition is required" ))), client , index );
125-
126- BulkRequestBuilder bulkRequest = client .prepareBulk ();
127- entities .stream ()
128- .map (entity -> entity .find (ID_FIELD ).get ().get (String .class ))
129- .map (id -> client .prepareDelete (index , query .getCollection (), id ))
130- .forEach (bulkRequest ::add );
131-
132- ActionListener <BulkResponse > s = new ActionListener <BulkResponse >() {
133- @ Override
134- public void onResponse (BulkResponse bulkItemResponses ) {
135- callBack .accept (null );
136- }
137-
138- @ Override
139- public void onFailure (Exception e ) {
140- throw new ExecuteAsyncQueryException ("An error when delete on elasticsearch" , e );
141- }
142- };
143- bulkRequest .execute ().addListener (s );
144- }
145-
146- @ Override
147- public void select (DocumentQuery query , Consumer <List <DocumentEntity >> callBack ) throws ExecuteAsyncQueryException , UnsupportedOperationException , NullPointerException {
148- requireNonNull (query , "query is required" );
149- requireNonNull (callBack , "callBack is required" );
150- EntityConverter .queryAsync (query , client , index , callBack );
151- }
26+ public interface ElasticsearchDocumentCollectionManagerAsync extends DocumentCollectionManagerAsync {
15227
15328
15429 /**
@@ -159,18 +34,6 @@ public void select(DocumentQuery query, Consumer<List<DocumentEntity>> callBack)
15934 * @param callBack the callback
16035 * @throws NullPointerException when query is null
16136 */
162- public void find (QueryBuilder query , Consumer <List <DocumentEntity >> callBack , String ... types ) throws NullPointerException , ExecuteAsyncQueryException {
163- requireNonNull (query , "query is required" );
164- requireNonNull (callBack , "callBack is required" );
165-
166- client .prepareSearch (index )
167- .setTypes (types )
168- .setQuery (query )
169- .execute ().addListener (new FindQueryBuilderListener (callBack ));
170- }
171-
172- @ Override
173- public void close () {
174-
175- }
37+ void find (QueryBuilder query , Consumer <List <DocumentEntity >> callBack , String ... types ) throws
38+ NullPointerException , ExecuteAsyncQueryException ;
17639}
0 commit comments