Skip to content

Commit 7c5f62a

Browse files
authored
Enable row count validation for SqlInserts and YCQL workloads (#94)
* Perform row count validation for SqlInserts and YCQL workloads extending from CassandraKeyValue.
1 parent 889aa99 commit 7c5f62a

File tree

6 files changed

+110
-13
lines changed

6 files changed

+110
-13
lines changed

src/main/java/com/yugabyte/sample/Main.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ public void run() throws Exception {
186186
IOType.Read, app.appConfig.printAllExceptions));
187187
}
188188

189+
app.recordExistingRowCount();
189190
// Start the reader and writer threads.
190191
for (IOPSThread iopsThread : iopsThreads) {
191192
iopsThread.start();
@@ -201,6 +202,7 @@ public void run() throws Exception {
201202
LOG.error("Error waiting for thread join()", e);
202203
}
203204
}
205+
app.verifyTotalRowsWritten();
204206
} finally {
205207
terminate();
206208
}

src/main/java/com/yugabyte/sample/apps/AppBase.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@
2020
import java.security.KeyStore;
2121
import java.security.cert.CertificateFactory;
2222
import java.security.cert.X509Certificate;
23-
import java.sql.Connection;
24-
import java.sql.DriverManager;
25-
import java.sql.PreparedStatement;
23+
import java.sql.*;
2624
import java.util.ArrayList;
2725
import java.util.Arrays;
2826
import java.util.Collections;
@@ -812,6 +810,18 @@ public void performRead() {
812810
}
813811
}
814812

813+
public void verifyTotalRowsWritten() throws Exception {
814+
throw new UnsupportedOperationException("Row count check is not supported for this workload");
815+
}
816+
817+
public void recordExistingRowCount() throws Exception {
818+
throw new UnsupportedOperationException("Row count check is not supported for this workload");
819+
}
820+
821+
public String getTableName() {
822+
return "AppBaseTable";
823+
}
824+
815825
@Override
816826
public String appenderName() {
817827
return this.getClass().getSimpleName();

src/main/java/com/yugabyte/sample/apps/CassandraKeyValue.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,12 @@
1717
import java.util.Arrays;
1818
import java.util.List;
1919

20-
import org.apache.log4j.Logger;
21-
2220
import com.datastax.driver.core.BoundStatement;
23-
import com.datastax.driver.core.ConsistencyLevel;
2421
import com.datastax.driver.core.PreparedStatement;
2522
import com.datastax.driver.core.ResultSet;
2623
import com.datastax.driver.core.Row;
24+
import org.apache.log4j.Logger;
25+
2726
import com.yugabyte.sample.common.SimpleLoadGenerator.Key;
2827

2928
/**
@@ -32,6 +31,7 @@
3231
*/
3332
public class CassandraKeyValue extends CassandraKeyValueBase {
3433

34+
private static final Logger LOG = Logger.getLogger(CassandraKeyValueBase.class);
3535
// The default table name to create and use for CRUD ops.
3636
private static final String DEFAULT_TABLE_NAME = CassandraKeyValue.class.getSimpleName();
3737
static {
@@ -43,6 +43,8 @@ public class CassandraKeyValue extends CassandraKeyValueBase {
4343
// updates).
4444
appConfig.numUniqueKeysToWrite = NUM_UNIQUE_KEYS_FOR_YSQL_AND_YCQL;
4545
}
46+
private long initialRowCount = 0;
47+
4648
@Override
4749
public List<String> getCreateTableStatements() {
4850
String create_stmt = String.format(
@@ -74,7 +76,26 @@ protected BoundStatement bindSelect(String key) {
7476
@Override
7577
protected BoundStatement bindInsert(String key, ByteBuffer value) {
7678
return getPreparedInsert().bind(key, value);
77-
}
79+
}
80+
81+
@Override
82+
public void verifyTotalRowsWritten() throws Exception {
83+
if (appConfig.numKeysToWrite >= 0) {
84+
long actual = getRowCount();
85+
LOG.info("Total rows count in table " + getTableName() + ": " + actual);
86+
long expected = numKeysWritten.get();
87+
if (actual != (expected + initialRowCount)) {
88+
LOG.fatal("New rows count does not match! Expected: " + (expected + initialRowCount) + ", actual: " + actual);
89+
} else {
90+
LOG.info("Table row count verified successfully");
91+
}
92+
}
93+
}
94+
95+
@Override
96+
public void recordExistingRowCount() throws Exception {
97+
initialRowCount = getRowCount();
98+
}
7899

79100
@Override
80101
public List<String> getWorkloadDescription() {

src/main/java/com/yugabyte/sample/apps/CassandraKeyValueBase.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public long doRead() {
110110
ResultSet rs = getCassandraClient().execute(select);
111111
List<Row> rows = rs.all();
112112
if (rows.size() != 1) {
113-
// If TTL is enabled, turn off correctness validation.
113+
// If TTL is disabled, turn on correctness validation.
114114
if (appConfig.tableTTLSeconds <= 0) {
115115
LOG.fatal("Read key: " + key.asString() + " expected 1 row in result, got " + rows.size());
116116
}
@@ -173,6 +173,14 @@ public long doWrite(int threadIdx) {
173173
}
174174
}
175175

176+
protected long getRowCount() {
177+
ResultSet rs = getCassandraClient().execute("SELECT COUNT(*) FROM " + getTableName());
178+
List<Row> rows = rs.all();
179+
long actual = rows.get(0).getLong(0);
180+
LOG.info("Found " + actual + " rows in " + getTableName());
181+
return actual;
182+
}
183+
176184
@Override
177185
public void appendMessage(StringBuilder sb) {
178186
super.appendMessage(sb);
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package com.yugabyte.sample.apps;
2+
3+
import org.apache.log4j.Logger;
4+
5+
import java.sql.Connection;
6+
import java.sql.ResultSet;
7+
import java.sql.Statement;
8+
9+
public class SQLAppBase extends AppBase {
10+
11+
private Logger LOG = Logger.getLogger(SQLAppBase.class);
12+
13+
private long initialRowCount = 0;
14+
15+
protected long getRowCount() throws Exception {
16+
long count = 0;
17+
String table = getTableName();
18+
Connection connection = getPostgresConnection();
19+
try {
20+
Statement statement = connection.createStatement();
21+
String query = "SELECT COUNT(*) FROM " + table;
22+
ResultSet rs = statement.executeQuery(query);
23+
if (rs.next()) {
24+
count = rs.getLong(1);
25+
LOG.info("Row count in table " + table + ": " + count);
26+
} else {
27+
LOG.error("No result received!");
28+
}
29+
} finally {
30+
if (connection != null) {
31+
connection.close();
32+
}
33+
}
34+
return count;
35+
}
36+
37+
@Override
38+
public void verifyTotalRowsWritten() throws Exception {
39+
if (appConfig.numKeysToWrite >= 0) {
40+
String table = getTableName();
41+
LOG.info("Verifying the inserts on table " + table + " (" + initialRowCount + " rows initially) ...");
42+
LOG.info("appConfig.numKeysToWrite: " + appConfig.numKeysToWrite + ", numKeysWritten: " + numKeysWritten);
43+
long actual = getRowCount();
44+
long expected = numKeysWritten.get();
45+
if (actual != (expected + initialRowCount)) {
46+
LOG.fatal("New rows count does not match! Expected: " + (expected + initialRowCount) + ", actual: " + actual);
47+
} else {
48+
LOG.info("Table row count verified successfully");
49+
}
50+
}
51+
}
52+
53+
@Override
54+
public void recordExistingRowCount() throws Exception {
55+
LOG.info("Recording the row count in table " + getTableName() + " if it exists ...");
56+
initialRowCount = getRowCount();
57+
}
58+
59+
}

src/main/java/com/yugabyte/sample/apps/SqlInserts.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,18 @@
1212
//
1313
package com.yugabyte.sample.apps;
1414

15-
import java.sql.Connection;
16-
import java.sql.PreparedStatement;
17-
import java.sql.ResultSet;
15+
import java.sql.*;
1816
import java.util.Arrays;
1917
import java.util.List;
2018

2119
import org.apache.log4j.Logger;
2220

23-
import com.yugabyte.sample.apps.AppBase.TableOp;
2421
import com.yugabyte.sample.common.SimpleLoadGenerator.Key;
2522

2623
/**
2724
* This workload writes and reads some random string keys from a postgresql table.
2825
*/
29-
public class SqlInserts extends AppBase {
26+
public class SqlInserts extends SQLAppBase {
3027
private static final Logger LOG = Logger.getLogger(SqlInserts.class);
3128

3229
// Static initialization of this workload's config. These are good defaults for getting a decent

0 commit comments

Comments
 (0)