Skip to content

Commit 460ae31

Browse files
committed
fix: verify bucket readiness on each bucket operation
Signed-off-by: Maximillian Arruda <[email protected]>
1 parent 55b19de commit 460ae31

File tree

2 files changed

+106
-72
lines changed

2 files changed

+106
-72
lines changed

jnosql-couchbase/src/main/java/org/eclipse/jnosql/databases/couchbase/communication/CouchbaseBucketManager.java

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import java.time.Duration;
2929
import java.util.Optional;
30+
import java.util.function.Supplier;
3031

3132
import static java.util.Objects.requireNonNull;
3233
import static java.util.stream.Collectors.toList;
@@ -67,8 +68,7 @@ public String name() {
6768
public <K, V> void put(K key, V value) {
6869
requireNonNull(key, "key is required");
6970
requireNonNull(value, "value is required");
70-
collection.upsert(key.toString(), value);
71-
71+
waitBucketBeReadyAndGet(() -> collection.upsert(key.toString(), value));
7272
}
7373

7474
@Override
@@ -78,13 +78,14 @@ public void put(KeyValueEntity entity) throws NullPointerException {
7878
}
7979

8080
@Override
81-
public void put(KeyValueEntity entity, Duration ttl) {
81+
public void put(final KeyValueEntity entity, final Duration ttl) {
8282
requireNonNull(entity, "entity is required");
8383
requireNonNull(ttl, "ttl is required");
84-
String key = entity.key(String.class);
85-
Object value = convert(Value.of(entity.value()));
86-
collection.upsert(key, value, UpsertOptions.upsertOptions().expiry(ttl));
87-
84+
waitBucketBeReadyAndDo(() -> {
85+
String key = entity.key(String.class);
86+
Object value = convert(Value.of(entity.value()));
87+
collection.upsert(key, value, UpsertOptions.upsertOptions().expiry(ttl));
88+
});
8889
}
8990

9091
@Override
@@ -104,8 +105,10 @@ public void put(Iterable<KeyValueEntity> keyValueEntities, Duration ttl) {
104105
public <K> Optional<Value> get(K key) throws NullPointerException {
105106
requireNonNull(key, "key is required");
106107
try {
107-
GetResult result = this.collection.get(key.toString());
108-
return Optional.of(new CouchbaseValue(result));
108+
return waitBucketBeReadyAndGet(() -> {
109+
GetResult result = this.collection.get(key.toString());
110+
return Optional.of(new CouchbaseValue(result));
111+
});
109112
} catch (DocumentNotFoundException exp) {
110113
return Optional.empty();
111114
}
@@ -124,9 +127,21 @@ public <K> Iterable<Value> get(Iterable<K> keys) {
124127
@Override
125128
public <K> void delete(K key) {
126129
requireNonNull(key, "key is required");
127-
collection.remove(key.toString());
130+
waitBucketBeReadyAndDo(() -> collection.remove(key.toString()));
131+
}
132+
133+
private void waitBucketBeReadyAndDo(Runnable runnable) {
134+
bucket.waitUntilReady(bucket.environment().timeoutConfig().kvDurableTimeout());
135+
runnable.run();
128136
}
129137

138+
139+
private <T> T waitBucketBeReadyAndGet(Supplier<T> supplier) {
140+
bucket.waitUntilReady(bucket.environment().timeoutConfig().kvDurableTimeout());
141+
return supplier.get();
142+
}
143+
144+
130145
@Override
131146
public <K> void delete(Iterable<K> keys) {
132147
requireNonNull(keys, "keys is required");

jnosql-couchbase/src/main/java/org/eclipse/jnosql/databases/couchbase/communication/DefaultCouchbaseDocumentManager.java

Lines changed: 81 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.ArrayList;
3434
import java.util.List;
3535
import java.util.Objects;
36+
import java.util.function.Supplier;
3637
import java.util.logging.Level;
3738
import java.util.logging.Logger;
3839
import java.util.stream.Collectors;
@@ -67,27 +68,29 @@ public String name() {
6768
@Override
6869
public DocumentEntity insert(DocumentEntity entity) throws NullPointerException {
6970
requireNonNull(entity, "entity is required");
70-
entity.add(EntityConverter.COLLECTION_FIELD, entity.name());
71-
JsonObject json = EntityConverter.convert(entity);
72-
Document id = entity.find(EntityConverter.ID_FIELD)
73-
.orElseThrow(() -> new CouchbaseNoKeyFoundException(entity.toString()));
74-
75-
Collection collection = bucket.collection(entity.name());
76-
collection.insert(id.get(String.class), json);
77-
return entity;
71+
return waitBucketBeReadyAndGet(() -> {
72+
entity.add(EntityConverter.COLLECTION_FIELD, entity.name());
73+
JsonObject json = EntityConverter.convert(entity);
74+
Document id = entity.find(EntityConverter.ID_FIELD)
75+
.orElseThrow(() -> new CouchbaseNoKeyFoundException(entity.toString()));
76+
Collection collection = bucket.collection(entity.name());
77+
collection.insert(id.get(String.class), json);
78+
return entity;
79+
});
7880
}
7981

8082
@Override
8183
public DocumentEntity insert(DocumentEntity entity, Duration ttl) {
8284
requireNonNull(entity, "entity is required");
8385
requireNonNull(ttl, "ttl is required");
84-
JsonObject json = EntityConverter.convert(entity);
85-
Document id = entity.find(EntityConverter.ID_FIELD)
86-
.orElseThrow(() -> new CouchbaseNoKeyFoundException(entity.toString()));
87-
88-
Collection collection = bucket.collection(entity.name());
89-
collection.insert(id.get(String.class), json, InsertOptions.insertOptions().expiry(ttl));
90-
return entity;
86+
return waitBucketBeReadyAndGet(() -> {
87+
JsonObject json = EntityConverter.convert(entity);
88+
Document id = entity.find(EntityConverter.ID_FIELD)
89+
.orElseThrow(() -> new CouchbaseNoKeyFoundException(entity.toString()));
90+
Collection collection = bucket.collection(entity.name());
91+
collection.insert(id.get(String.class), json, InsertOptions.insertOptions().expiry(ttl));
92+
return entity;
93+
});
9194
}
9295

9396
@Override
@@ -108,14 +111,15 @@ public Iterable<DocumentEntity> insert(Iterable<DocumentEntity> entities, Durati
108111
@Override
109112
public DocumentEntity update(DocumentEntity entity) {
110113
requireNonNull(entity, "entity is required");
111-
entity.add(EntityConverter.COLLECTION_FIELD, entity.name());
112-
JsonObject json = EntityConverter.convert(entity);
113-
Document id = entity.find(EntityConverter.ID_FIELD)
114-
.orElseThrow(() -> new CouchbaseNoKeyFoundException(entity.toString()));
115-
116-
Collection collection = bucket.collection(entity.name());
117-
collection.upsert(id.get(String.class), json);
118-
return entity;
114+
return waitBucketBeReadyAndGet(() -> {
115+
entity.add(EntityConverter.COLLECTION_FIELD, entity.name());
116+
JsonObject json = EntityConverter.convert(entity);
117+
Document id = entity.find(EntityConverter.ID_FIELD)
118+
.orElseThrow(() -> new CouchbaseNoKeyFoundException(entity.toString()));
119+
Collection collection = bucket.collection(entity.name());
120+
collection.upsert(id.get(String.class), json);
121+
return entity;
122+
});
119123
}
120124

121125
@Override
@@ -127,47 +131,59 @@ public Iterable<DocumentEntity> update(Iterable<DocumentEntity> entities) {
127131

128132
@Override
129133
public void delete(DocumentDeleteQuery query) {
130-
Objects.requireNonNull(query, "query is required");
134+
waitBucketBeReadyAndDo(() -> {
135+
Objects.requireNonNull(query, "query is required");
136+
Collection collection = bucket.collection(query.name());
137+
DocumentQuery delete = DeleteQueryWrapper.of(query);
138+
Stream<DocumentEntity> entities = select(delete);
139+
entities.flatMap(d -> d.find(EntityConverter.ID_FIELD).stream())
140+
.filter(Objects::nonNull)
141+
.map(d -> d.get(String.class))
142+
.forEach(collection::remove);
143+
});
144+
}
131145

132-
Collection collection = bucket.collection(query.name());
133-
DocumentQuery delete = DeleteQueryWrapper.of(query);
134-
Stream<DocumentEntity> entities = select(delete);
135-
entities.flatMap(d -> d.find(EntityConverter.ID_FIELD).stream())
136-
.filter(Objects::nonNull)
137-
.map(d -> d.get(String.class))
138-
.forEach(collection::remove);
146+
private void waitBucketBeReadyAndDo(Runnable runnable) {
147+
bucket.waitUntilReady(bucket.environment().timeoutConfig().kvDurableTimeout());
148+
runnable.run();
149+
}
139150

151+
152+
private <T> T waitBucketBeReadyAndGet(Supplier<T> supplier) {
153+
bucket.waitUntilReady(bucket.environment().timeoutConfig().kvDurableTimeout());
154+
return supplier.get();
140155
}
141156

142157
@Override
143-
public Stream<DocumentEntity> select(DocumentQuery query) throws NullPointerException {
158+
public Stream<DocumentEntity> select(final DocumentQuery query) throws NullPointerException {
144159
Objects.requireNonNull(query, "query is required");
145-
N1QLQuery n1QLQuery = N1QLBuilder.of(query, database, bucket.defaultScope().name()).get();
146-
List<JsonObject> jsons = new ArrayList<>();
147-
148-
if (n1QLQuery.hasIds()) {
149-
Collection collection = bucket.collection(query.name());
150-
for (String id : n1QLQuery.getIds()) {
151-
try {
152-
GetResult result = collection.get(id);
153-
jsons.add(result.contentAsObject());
154-
} catch (DocumentNotFoundException exp) {
155-
LOGGER.log(Level.FINEST, "The id was not found: " + id);
160+
return waitBucketBeReadyAndGet(() -> {
161+
N1QLQuery n1QLQuery = N1QLBuilder.of(query, database, bucket.defaultScope().name()).get();
162+
List<JsonObject> jsons = new ArrayList<>();
163+
if (n1QLQuery.hasIds()) {
164+
Collection collection = bucket.collection(query.name());
165+
for (String id : n1QLQuery.getIds()) {
166+
try {
167+
GetResult result = collection.get(id);
168+
jsons.add(result.contentAsObject());
169+
} catch (DocumentNotFoundException exp) {
170+
LOGGER.log(Level.FINEST, "The id was not found: " + id);
171+
}
156172
}
157173
}
158-
}
159-
160-
if (!n1QLQuery.hasOnlyIds()) {
161-
QueryResult result;
162-
if (n1QLQuery.hasParameter()) {
163-
result = cluster.query(n1QLQuery.getQuery());
164-
} else {
165-
result = cluster.query(n1QLQuery.getQuery(), QueryOptions
166-
.queryOptions().parameters(n1QLQuery.getParams()));
174+
175+
if (!n1QLQuery.hasOnlyIds()) {
176+
QueryResult result;
177+
if (n1QLQuery.hasParameter()) {
178+
result = cluster.query(n1QLQuery.getQuery());
179+
} else {
180+
result = cluster.query(n1QLQuery.getQuery(), QueryOptions
181+
.queryOptions().parameters(n1QLQuery.getParams()));
182+
}
183+
jsons.addAll(result.rowsAsObject());
167184
}
168-
jsons.addAll(result.rowsAsObject());
169-
}
170-
return EntityConverter.convert(jsons, database);
185+
return EntityConverter.convert(jsons, database);
186+
});
171187
}
172188

173189
@Override
@@ -177,21 +193,24 @@ public long count(String documentCollection) {
177193

178194

179195
@Override
180-
public Stream<DocumentEntity> n1qlQuery(String n1ql, JsonObject params) throws NullPointerException {
196+
public Stream<DocumentEntity> n1qlQuery(final String n1ql, final JsonObject params) throws NullPointerException {
181197
requireNonNull(n1ql, "n1qlQuery is required");
182198
requireNonNull(params, "params is required");
183-
184-
QueryResult query = cluster.query(n1ql, QueryOptions
185-
.queryOptions().parameters(params));
186-
return EntityConverter.convert(query.rowsAsObject(), database);
199+
return waitBucketBeReadyAndGet(() -> {
200+
QueryResult query = cluster.query(n1ql, QueryOptions
201+
.queryOptions().parameters(params));
202+
return EntityConverter.convert(query.rowsAsObject(), database);
203+
});
187204
}
188205

189206

190207
@Override
191208
public Stream<DocumentEntity> n1qlQuery(String n1ql) throws NullPointerException {
192209
requireNonNull(n1ql, "n1qlQuery is required");
193-
QueryResult query = cluster.query(n1ql);
194-
return EntityConverter.convert(query.rowsAsObject(), database);
210+
return waitBucketBeReadyAndGet(() -> {
211+
QueryResult query = cluster.query(n1ql);
212+
return EntityConverter.convert(query.rowsAsObject(), database);
213+
});
195214
}
196215

197216

0 commit comments

Comments
 (0)