Skip to content

Commit 28d8542

Browse files
committed
Defines async document in ES
1 parent 3eb2bac commit 28d8542

File tree

1 file changed

+167
-0
lines changed

1 file changed

+167
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
* Copyright (c) 2017 Otávio Santana and others
3+
* All rights reserved. This program and the accompanying materials
4+
* are made available under the terms of the Eclipse Public License v1.0
5+
* and Apache License v2.0 which accompanies this distribution.
6+
* The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html
7+
* and the Apache License v2.0 is available at http://www.opensource.org/licenses/apache2.0.php.
8+
*
9+
* You may elect to redistribute this code under either of these licenses.
10+
*
11+
* Contributors:
12+
*
13+
* Otavio Santana
14+
*/
15+
package org.jnosql.diana.elasticsearch.document;
16+
17+
18+
import org.elasticsearch.action.ActionListener;
19+
import org.elasticsearch.action.bulk.BulkRequestBuilder;
20+
import org.elasticsearch.action.bulk.BulkResponse;
21+
import org.elasticsearch.client.Client;
22+
import org.elasticsearch.index.query.QueryBuilder;
23+
import org.jnosql.diana.api.ExecuteAsyncQueryException;
24+
import org.jnosql.diana.api.document.Document;
25+
import org.jnosql.diana.api.document.DocumentDeleteQuery;
26+
import org.jnosql.diana.api.document.DocumentEntity;
27+
import org.jnosql.diana.api.document.DocumentQuery;
28+
29+
import javax.json.bind.Jsonb;
30+
import javax.json.bind.JsonbBuilder;
31+
import java.time.Duration;
32+
import java.util.List;
33+
import java.util.Map;
34+
import java.util.function.Consumer;
35+
36+
import static java.nio.charset.StandardCharsets.UTF_8;
37+
import static java.util.Objects.requireNonNull;
38+
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
39+
import static org.jnosql.diana.elasticsearch.document.EntityConverter.ID_FIELD;
40+
import static org.jnosql.diana.elasticsearch.document.EntityConverter.getMap;
41+
42+
class DefaultElasticsearchDocumentCollectionManagerAsync implements ElasticsearchDocumentCollectionManagerAsync {
43+
44+
protected static final Jsonb JSONB = JsonbBuilder.create();
45+
46+
private static final Consumer<DocumentEntity> NOOP = e -> {
47+
};
48+
49+
private final Client client;
50+
private final String index;
51+
52+
DefaultElasticsearchDocumentCollectionManagerAsync(Client client, String index) {
53+
this.client = client;
54+
this.index = index;
55+
}
56+
57+
@Override
58+
public void insert(DocumentEntity entity) throws ExecuteAsyncQueryException, UnsupportedOperationException, NullPointerException {
59+
insert(entity, NOOP);
60+
}
61+
62+
@Override
63+
public void insert(DocumentEntity entity, Duration ttl) throws ExecuteAsyncQueryException, UnsupportedOperationException, NullPointerException {
64+
insert(entity, ttl, e -> {
65+
});
66+
}
67+
68+
@Override
69+
public void insert(DocumentEntity entity, Consumer<DocumentEntity> callBack) throws ExecuteAsyncQueryException, UnsupportedOperationException, NullPointerException {
70+
requireNonNull(entity, "entity is required");
71+
requireNonNull(callBack, "callBack is required");
72+
Document id = entity.find(ID_FIELD)
73+
.orElseThrow(() -> new ElasticsearchKeyFoundException(entity.toString()));
74+
Map<String, Object> jsonObject = getMap(entity);
75+
byte[] bytes = JSONB.toJson(jsonObject).getBytes(UTF_8);
76+
client.prepareIndex(index, entity.getName(), id.get(String.class)).setSource(bytes).execute()
77+
.addListener(new SaveActionListener(callBack, entity));
78+
79+
80+
}
81+
82+
@Override
83+
public void insert(DocumentEntity entity, Duration ttl, Consumer<DocumentEntity> callBack) throws ExecuteAsyncQueryException,
84+
UnsupportedOperationException, NullPointerException {
85+
requireNonNull(entity, "entity is required");
86+
requireNonNull(ttl, "ttl is required");
87+
requireNonNull(callBack, "callBack is required");
88+
Document id = entity.find(ID_FIELD)
89+
.orElseThrow(() -> new ElasticsearchKeyFoundException(entity.toString()));
90+
Map<String, Object> jsonObject = getMap(entity);
91+
byte[] bytes = JSONB.toJson(jsonObject).getBytes(UTF_8);
92+
client.prepareIndex(index, entity.getName(), id.get(String.class)).setSource(bytes).
93+
setTTL(timeValueMillis(ttl.toMillis())).execute()
94+
.addListener(new SaveActionListener(callBack, entity));
95+
96+
}
97+
98+
@Override
99+
public void update(DocumentEntity entity) throws ExecuteAsyncQueryException, UnsupportedOperationException, NullPointerException {
100+
insert(entity);
101+
}
102+
103+
@Override
104+
public void update(DocumentEntity entity, Consumer<DocumentEntity> callBack) throws ExecuteAsyncQueryException, UnsupportedOperationException, NullPointerException {
105+
insert(entity, callBack);
106+
}
107+
108+
@Override
109+
public void delete(DocumentDeleteQuery query) throws ExecuteAsyncQueryException, UnsupportedOperationException {
110+
delete(query, d -> {
111+
});
112+
113+
}
114+
115+
@Override
116+
public void delete(DocumentDeleteQuery query, Consumer<Void> callBack) throws ExecuteAsyncQueryException, UnsupportedOperationException, NullPointerException {
117+
requireNonNull(query, "query is required");
118+
requireNonNull(callBack, "callBack is required");
119+
120+
List<DocumentEntity> entities = EntityConverter.query(DocumentQuery.of(query.getCollection())
121+
.and(query.getCondition()
122+
.orElseThrow(() -> new IllegalArgumentException("condition is required"))), client, index);
123+
124+
BulkRequestBuilder bulkRequest = client.prepareBulk();
125+
entities.stream()
126+
.map(entity -> entity.find(ID_FIELD).get().get(String.class))
127+
.map(id -> client.prepareDelete(index, query.getCollection(), id))
128+
.forEach(bulkRequest::add);
129+
130+
ActionListener<BulkResponse> s = new ActionListener<BulkResponse>() {
131+
@Override
132+
public void onResponse(BulkResponse bulkItemResponses) {
133+
callBack.accept(null);
134+
}
135+
136+
@Override
137+
public void onFailure(Exception e) {
138+
throw new ExecuteAsyncQueryException("An error when delete on elasticsearch", e);
139+
}
140+
};
141+
bulkRequest.execute().addListener(s);
142+
}
143+
144+
@Override
145+
public void select(DocumentQuery query, Consumer<List<DocumentEntity>> callBack) throws ExecuteAsyncQueryException, UnsupportedOperationException, NullPointerException {
146+
requireNonNull(query, "query is required");
147+
requireNonNull(callBack, "callBack is required");
148+
EntityConverter.queryAsync(query, client, index, callBack);
149+
}
150+
151+
152+
@Override
153+
public void find(QueryBuilder query, Consumer<List<DocumentEntity>> callBack, String... types) throws NullPointerException, ExecuteAsyncQueryException {
154+
requireNonNull(query, "query is required");
155+
requireNonNull(callBack, "callBack is required");
156+
157+
client.prepareSearch(index)
158+
.setTypes(types)
159+
.setQuery(query)
160+
.execute().addListener(new FindQueryBuilderListener(callBack));
161+
}
162+
163+
@Override
164+
public void close() {
165+
166+
}
167+
}

0 commit comments

Comments
 (0)