Skip to content

Commit 7b7ce95

Browse files
akscjoAmit Chauhan
andauthored
Added enhancements to Cassandra Workload (#13)
Co-authored-by: Amit Chauhan <[email protected]>
1 parent 5dcaf1f commit 7b7ce95

File tree

4 files changed

+150
-24
lines changed

4 files changed

+150
-24
lines changed

src/main/java/com/yugabyte/simulation/service/GenericCassandraWorkload.java

Lines changed: 47 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,8 @@ public String getName() {
8787
private enum WorkloadType {
8888
CREATE_TABLES,
8989
SEED_DATA,
90-
RUN_SIMULATION,
91-
RUN_LIKE_QUERY_ON_GENERIC2,
92-
RUN_LIKE_QUERY_ON_GENERIC3
90+
RUN_SIMULATION_FIXED_WORKLOAD,
91+
RUN_SIMULATION
9392
}
9493

9594
private final FixedStepsWorkloadType createTablesWorkloadType;
@@ -120,25 +119,36 @@ public GenericCassandraWorkload() {
120119
private WorkloadDesc seedingWorkload = new WorkloadDesc(
121120
GenericCassandraWorkload.WorkloadType.SEED_DATA.toString(),
122121
"Seed Data",
123-
"Load data into the 3 tables (Latency on charts will show cumulative value for 3 inserts)",
122+
"Populate new data into the table",
124123
new WorkloadParamDesc("Items to generate:", 1, Integer.MAX_VALUE, 1000),
125124
new WorkloadParamDesc("Threads", 1, 500, 32)
126125
);
127126

128127
private WorkloadDesc runningWorkload = new WorkloadDesc(
129128
GenericCassandraWorkload.WorkloadType.RUN_SIMULATION.toString(),
130-
"Simulation",
131-
"Run a simulation of a reads from 3 tables (Latency on charts will show cumulative value for 3 selects and 3 inserts)",
129+
"Simulation - TPS",
130+
"Run a simulation of point reads and inserts",
132131
new WorkloadParamDesc("Throughput (tps)", 1, 1000000, 500),
133132
new WorkloadParamDesc("Max Threads", 1, 500, 64),
134-
new WorkloadParamDesc("Include new Inserts (to 3 tables)", false)
133+
new WorkloadParamDesc("Include new Inserts", false)
135134
);
136135

136+
private WorkloadDesc simulationFixedWorkload = new WorkloadDesc(
137+
GenericCassandraWorkload.WorkloadType.RUN_SIMULATION_FIXED_WORKLOAD.toString(),
138+
"Simulation",
139+
"Run a simulation of point reads and inserts",
140+
new WorkloadParamDesc("Invocations", 1, 10000000, 1000000),
141+
new WorkloadParamDesc("Max Threads", 1, 500, 64),
142+
new WorkloadParamDesc("Include new inserts", false)
143+
);
144+
145+
137146
@Override
138147
public List<WorkloadDesc> getWorkloads() {
139148
return Arrays.asList(
140149
createTablesWorkload
141150
, seedingWorkload
151+
, simulationFixedWorkload
142152
, runningWorkload
143153
);
144154
}
@@ -160,6 +170,9 @@ public InvocationResult invokeWorkload(String workloadId, ParamValue[] values) {
160170
case RUN_SIMULATION:
161171
this.runSimulation(values[0].getIntValue(), values[1].getIntValue(), values[2].getBoolValue());
162172
return new InvocationResult("Ok");
173+
case RUN_SIMULATION_FIXED_WORKLOAD:
174+
this.runSimulationFixedWorkload(values);
175+
return new InvocationResult("Ok");
163176
}
164177
throw new IllegalArgumentException("Unknown workload "+ workloadId);
165178
}
@@ -177,21 +190,7 @@ private void seedData(int numberToGenerate, int threads) {
177190
.createInstance(serviceManager)
178191
.execute(threads, numberToGenerate, (customData, threadData) -> {
179192
runInserts();
180-
UUID uuid = LoadGeneratorUtils.getUUID();
181-
CqlSession session = this.getCassandraClient();
182-
PreparedStatement ps = session.prepare(INSERT_RECORD_GENERIC1);
183-
//ps.bind(uuid,LoadGeneratorUtils.getName());
184-
session.execute(ps.bind(uuid,LoadGeneratorUtils.getName()));
185-
186-
// jdbcTemplate.update(INSERT_RECORD_GENERIC2,
187-
// uuid,
188-
// LoadGeneratorUtils.getAlphaString(LoadGeneratorUtils.getInt(1,30))
189-
// );
190-
// jdbcTemplate.update(INSERT_RECORD_GENERIC3,
191-
// uuid,
192-
// LoadGeneratorUtils.getAlphaString(LoadGeneratorUtils.getInt(1,255)),
193-
// LoadGeneratorUtils.getAlphaString(LoadGeneratorUtils.getInt(1,30))
194-
// );
193+
195194
return threadData;
196195
});
197196
}
@@ -207,6 +206,27 @@ private List<UUID> getQueryList() {
207206
return results;
208207
}
209208

209+
private void runSimulationFixedWorkload(ParamValue[] values) {
210+
int numOfInvocations = values[0].getIntValue();
211+
int maxThreads = values[1].getIntValue();
212+
boolean runInserts = values[2].getBoolValue();
213+
System.out.println("**** Preloading data...");
214+
final List<UUID> uuids = getQueryList();
215+
System.out.println("**** Preloading complete...");
216+
Random random = ThreadLocalRandom.current();
217+
seedingWorkloadType
218+
.createInstance(serviceManager)
219+
.execute(maxThreads, numOfInvocations, (customData, threadData) -> {
220+
UUID id = uuids.get(random.nextInt(uuids.size()));
221+
runPointReadgeneric1(id);
222+
runPointReadgeneric2(id);
223+
runPointReadgeneric3(id);
224+
if(runInserts){
225+
runInserts();
226+
}
227+
return threadData;
228+
});
229+
}
210230

211231
private void runSimulation(int tps, int maxThreads, boolean runInserts) {
212232
System.out.println("**** Preloading data...");
@@ -240,7 +260,7 @@ private void runPointReadgeneric1(UUID id){
240260
for (Row row : rs) {
241261
// process the row
242262
int count = 0;
243-
System.out.println(row.getUuid("pkid")+","+row.getString("col1"));
263+
// System.out.println(row.getUuid("pkid")+","+row.getString("col1"));
244264
}
245265
}
246266

@@ -256,7 +276,10 @@ private void runPointReadgeneric3(UUID id){
256276

257277
private void runInserts(){
258278
UUID uuid = LoadGeneratorUtils.getUUID();
259-
// TODO
279+
CqlSession session = this.getCassandraClient();
280+
PreparedStatement ps = session.prepare(INSERT_RECORD_GENERIC1);
281+
//ps.bind(uuid,LoadGeneratorUtils.getName());
282+
session.execute(ps.bind(uuid,LoadGeneratorUtils.getName()));
260283
}
261284

262285

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package com.yugabyte.simulation.util;
2+
3+
import com.datastax.oss.driver.api.core.CqlSession;
4+
import com.datastax.oss.driver.api.core.cql.ResultSet;
5+
import com.datastax.oss.driver.api.core.cql.Row;
6+
7+
import javax.net.ssl.SSLContext;
8+
import javax.net.ssl.TrustManagerFactory;
9+
import java.io.FileInputStream;
10+
import java.net.InetSocketAddress;
11+
import java.security.KeyStore;
12+
import java.security.cert.CertificateFactory;
13+
import java.security.cert.X509Certificate;
14+
import java.util.List;
15+
16+
public class SSLContextUtility {
17+
18+
// Load the cluster root certificate
19+
public static SSLContext createSSLHandler(String certfile) {
20+
try {
21+
CertificateFactory cf = CertificateFactory.getInstance("X.509");
22+
FileInputStream fis = new FileInputStream(certfile);
23+
X509Certificate ca;
24+
try {
25+
ca = (X509Certificate) cf.generateCertificate(fis);
26+
} catch (Exception e) {
27+
System.err.println("Exception generating certificate from input file: " + e);
28+
return null;
29+
} finally {
30+
fis.close();
31+
}
32+
33+
// Create a KeyStore containing our trusted CAs
34+
String keyStoreType = KeyStore.getDefaultType();
35+
KeyStore keyStore = KeyStore.getInstance(keyStoreType);
36+
keyStore.load(null, null);
37+
keyStore.setCertificateEntry("ca", ca);
38+
39+
// Create a TrustManager that trusts the CAs in our KeyStore
40+
String tmfAlgorithm = TrustManagerFactory.getDefaultAlgorithm();
41+
TrustManagerFactory tmf = TrustManagerFactory.getInstance(tmfAlgorithm);
42+
tmf.init(keyStore);
43+
44+
SSLContext sslContext = SSLContext.getInstance("TLS");
45+
sslContext.init(null, tmf.getTrustManagers(), null);
46+
return sslContext;
47+
} catch (Exception e) {
48+
System.err.println("Exception creating sslContext: " + e);
49+
return null;
50+
}
51+
}
52+
53+
public static void main(String args[]) {
54+
try {
55+
// Create a YCQL client.
56+
CqlSession session = CqlSession
57+
.builder()
58+
.addContactPoint(new InetSocketAddress("XXXXXXXX", 9042))
59+
.withSslContext(createSSLHandler("/Users/amitchauhan/Downloads/root.crt"))
60+
.withAuthCredentials("admin","XXXXXXX")
61+
.withLocalDatacenter("us-east-1")
62+
.build();
63+
// Create keyspace 'ybdemo' if it does not exist.
64+
String createKeyspace = "CREATE KEYSPACE IF NOT EXISTS ybdemo;";
65+
session.execute(createKeyspace);
66+
System.out.println("Created keyspace ybdemo");
67+
// Create table 'employee', if it does not exist.
68+
String createTable = "CREATE TABLE IF NOT EXISTS ybdemo.employee (id int PRIMARY KEY, " +
69+
"name varchar, " + "age int, " + "language varchar);";
70+
session.execute(createTable);
71+
System.out.println("Created table employee");
72+
// Insert a row.
73+
String insert = "INSERT INTO ybdemo.employee (id, name, age, language)" +
74+
" VALUES (1, 'John', 35, 'Java');";
75+
session.execute(insert);
76+
System.out.println("Inserted data: " + insert);
77+
// Query the row and print out the result.
78+
String select = "SELECT name, age, language FROM ybdemo.employee WHERE id = 1;";
79+
ResultSet selectResult = session.execute(select);
80+
List<Row> rows = selectResult.all();
81+
String name = rows.get(0).getString(0);
82+
int age = rows.get(0).getInt(1);
83+
String language = rows.get(0).getString(2);
84+
System.out.println("Query returned " + rows.size() + " row: " + "name=" + name +
85+
", age=" + age + ", language: " + language);
86+
// Close the client.
87+
session.close();
88+
} catch (Exception e) {
89+
System.err.println("Error: " + e.getMessage());
90+
}
91+
}
92+
93+
94+
95+
}

src/main/java/com/yugabyte/simulation/workload/WorkloadSimulationBase.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
import com.datastax.oss.driver.api.core.CqlSession;
55
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
6+
import com.yugabyte.simulation.util.SSLContextUtility;
67
import org.springframework.beans.factory.annotation.Autowired;
78
import org.springframework.core.env.Environment;
89

@@ -51,6 +52,7 @@ protected synchronized CqlSession getCassandraClient() {
5152
int port = Integer.parseInt(env.getProperty("spring.data.cassandra.port"));
5253
String datacenter = env.getProperty("spring.data.cassandra.local-datacenter");
5354
String contactPoints = env.getProperty("spring.data.cassandra.contact-points");
55+
String sslRootCertPath = env.getProperty("spring.data.cassandra.sslcertpath");
5456

5557
CqlSessionBuilder builder = CqlSession.builder();
5658
builder.addContactPoints(Arrays.stream(contactPoints.split(","))
@@ -61,6 +63,11 @@ protected synchronized CqlSession getCassandraClient() {
6163
builder.withAuthCredentials(userId,password);
6264
}
6365

66+
if(sslRootCertPath != null && !sslRootCertPath.equals("NA")){
67+
builder.withSslContext(SSLContextUtility.createSSLHandler(sslRootCertPath));
68+
}
69+
70+
6471
cassandra_session = builder.build();
6572
}
6673
return cassandra_session;

src/main/resources/application.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ spring:
2828
contact-points: ${node_c:127.0.0.1}
2929
userid: ${userid_c:cassandra}
3030
password: ${password_c:yugabyte}
31+
sslcertpath: ${sslcertpath_c:NA}
3132

3233
logging.level:
3334
root: ERROR

0 commit comments

Comments
 (0)