Skip to content

Commit d2e3204

Browse files
committed
adds counter
1 parent e88d73a commit d2e3204

File tree

2 files changed

+30
-0
lines changed

2 files changed

+30
-0
lines changed

cassandra-driver/src/main/java/org/jnosql/diana/cassandra/column/DefaultCassandraColumnFamilyManagerAsync.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package org.jnosql.diana.cassandra.column;
1616

1717
import com.datastax.driver.core.ConsistencyLevel;
18+
import com.datastax.driver.core.ResultSet;
1819
import com.datastax.driver.core.ResultSetFuture;
1920
import com.datastax.driver.core.Session;
2021
import com.datastax.driver.core.Statement;
@@ -29,6 +30,7 @@
2930
import java.time.Duration;
3031
import java.util.List;
3132
import java.util.Map;
33+
import java.util.concurrent.ExecutionException;
3234
import java.util.concurrent.Executor;
3335
import java.util.function.Consumer;
3436
import java.util.stream.StreamSupport;
@@ -223,6 +225,16 @@ public void select(ColumnQuery query, Consumer<List<ColumnEntity>> consumer)
223225
public void count(String columnFamily, Consumer<Long> callback) {
224226
requireNonNull(columnFamily, "columnFamily is required");
225227
requireNonNull(callback, "callback is required");
228+
ResultSetFuture resultSet = session.executeAsync(QueryUtils.count(columnFamily, keyspace));
229+
Runnable counter = () -> {
230+
try {
231+
Object object = resultSet.get().one().getObject(0);
232+
callback.accept(Number.class.cast(object).longValue());
233+
} catch (InterruptedException | ExecutionException e) {
234+
throw new ExecuteAsyncQueryException(e);
235+
}
236+
};
237+
resultSet.addListener(counter, executor);
226238
}
227239

228240
@Override

cassandra-driver/src/test/java/org/jnosql/diana/cassandra/column/CassandraColumnFamilyManagerAsyncTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.List;
3434
import java.util.Map;
3535
import java.util.concurrent.atomic.AtomicBoolean;
36+
import java.util.concurrent.atomic.AtomicLong;
3637
import java.util.concurrent.atomic.AtomicReference;
3738
import java.util.function.Consumer;
3839

@@ -307,6 +308,23 @@ public void shouldInsertColumnsAsyncWithConsistenceLevel() {
307308
columnEntityManager.save(columnEntity, CONSISTENCY_LEVEL);
308309
}
309310

311+
@Test
312+
public void shouldCount() {
313+
ColumnEntity entity = getColumnFamily();
314+
columnEntityManager.insert(entity);
315+
AtomicLong counter = new AtomicLong(0);
316+
AtomicBoolean condition = new AtomicBoolean(false);
317+
318+
Consumer<Long> callback = (l) ->{
319+
condition.set(true);
320+
counter.set(l);
321+
};
322+
columnEntityManager.count(COLUMN_FAMILY, callback);
323+
await().untilTrue(condition);
324+
assertTrue(counter.get() > 0);
325+
}
326+
327+
310328
private ColumnEntity getColumnFamily() {
311329
Map<String, Object> fields = new HashMap<>();
312330
fields.put("name", "Cassandra");

0 commit comments

Comments
 (0)