tableSizes = new Histogram<>(true);
-
- public Loader(T benchmark) {
- this.benchmark = benchmark;
- this.workConf = benchmark.getWorkloadConfiguration();
- this.scaleFactor = workConf.getScaleFactor();
- }
-
- /**
- * Each Loader will generate a list of Runnable objects that
- * will perform the loading operation for the benchmark.
- * The number of threads that will be launched at the same time
- * depends on the number of cores that are available. But they are
- * guaranteed to execute in the order specified in the list.
- * You will have to use your own protections if there are dependencies between
- * threads (i.e., if one table needs to be loaded before another).
- *
- * Each LoaderThread will be given a Connection handle to the DBMS when
- * it is invoked.
- *
- * If the benchmark does not support multi-threaded loading yet,
- * then this method should return null.
- *
- * @return The list of LoaderThreads the framework will launch.
- */
- public abstract List createLoaderThreads() throws SQLException;
-
- public void addToTableCount(String tableName, int delta) {
- this.tableSizes.put(tableName, delta);
+ protected static final Logger LOG = LoggerFactory.getLogger(Loader.class);
+
+ protected final T benchmark;
+
+ protected final WorkloadConfiguration workConf;
+ protected final double scaleFactor;
+ private final Histogram tableSizes = new Histogram<>(true);
+
+ public Loader(T benchmark) {
+ this.benchmark = benchmark;
+ this.workConf = benchmark.getWorkloadConfiguration();
+ this.scaleFactor = workConf.getScaleFactor();
+ }
+
+ /**
+ * Each Loader will generate a list of Runnable objects that will perform the loading operation
+ * for the benchmark. The number of threads that will be launched at the same time depends on the
+ * number of cores that are available. But they are guaranteed to execute in the order specified
+ * in the list. You will have to use your own protections if there are dependencies between
+ * threads (i.e., if one table needs to be loaded before another).
+ *
+ * Each LoaderThread will be given a Connection handle to the DBMS when it is invoked.
+ *
+ *
If the benchmark does not support multi-threaded loading yet, then this method should
+ * return null.
+ *
+ * @return The list of LoaderThreads the framework will launch.
+ */
+ public abstract List createLoaderThreads() throws SQLException;
+
+ public void addToTableCount(String tableName, int delta) {
+ this.tableSizes.put(tableName, delta);
+ }
+
+ public Histogram getTableCounts() {
+ return (this.tableSizes);
+ }
+
+ public DatabaseType getDatabaseType() {
+ return (this.workConf.getDatabaseType());
+ }
+
+ /**
+ * Get the pre-seeded Random generator for this Loader invocation
+ *
+ * @return
+ */
+ public Random rng() {
+ return (this.benchmark.rng());
+ }
+
+ /**
+ * Method that can be overriden to specifically unload the tables of the database. In the default
+ * implementation it checks for tables from the catalog to delete them using SQL. Any subclass can
+ * inject custom behavior here.
+ *
+ * @param catalog The catalog containing all loaded tables
+ * @throws SQLException
+ */
+ public void unload(Connection conn, AbstractCatalog catalog) throws SQLException {
+
+ boolean shouldEscapeNames = this.getDatabaseType().shouldEscapeNames();
+
+ try (Statement st = conn.createStatement()) {
+ for (Table catalog_tbl : catalog.getTables()) {
+ String tableName = shouldEscapeNames ? catalog_tbl.getEscapedName() : catalog_tbl.getName();
+ LOG.debug(String.format("Deleting data from table [%s]", tableName));
+
+ String sql = "DELETE FROM " + tableName;
+ st.execute(sql);
+ }
}
-
- public Histogram getTableCounts() {
- return (this.tableSizes);
+ }
+
+ protected void updateAutoIncrement(Connection conn, Column catalog_col, int value)
+ throws SQLException {
+ String sql = null;
+ String seqName = SQLUtil.getSequenceName(conn, getDatabaseType(), catalog_col);
+ DatabaseType dbType = getDatabaseType();
+ if (seqName != null) {
+ if (dbType == DatabaseType.POSTGRES) {
+ sql = String.format("SELECT setval('%s', %d)", seqName.toLowerCase(), value);
+ } else if (dbType == DatabaseType.SQLSERVER || dbType == DatabaseType.SQLAZURE) {
+ sql = String.format("ALTER SEQUENCE [%s] RESTART WITH %d", seqName, value);
+ }
}
- public DatabaseType getDatabaseType() {
- return (this.workConf.getDatabaseType());
- }
-
- /**
- * Get the pre-seeded Random generator for this Loader invocation
- *
- * @return
- */
- public Random rng() {
- return (this.benchmark.rng());
- }
-
-
- /**
- * Method that can be overriden to specifically unload the tables of the
- * database. In the default implementation it checks for tables from the
- * catalog to delete them using SQL. Any subclass can inject custom behavior
- * here.
- *
- * @param catalog The catalog containing all loaded tables
- * @throws SQLException
- */
- public void unload(Connection conn, AbstractCatalog catalog) throws SQLException {
-
- boolean shouldEscapeNames = this.getDatabaseType().shouldEscapeNames();
-
- try (Statement st = conn.createStatement()) {
- for (Table catalog_tbl : catalog.getTables()) {
- String tableName = shouldEscapeNames ? catalog_tbl.getEscapedName() : catalog_tbl.getName();
- LOG.debug(String.format("Deleting data from table [%s]", tableName));
-
- String sql = "DELETE FROM " + tableName;
- st.execute(sql);
- }
- }
- }
-
- protected void updateAutoIncrement(Connection conn, Column catalog_col, int value) throws SQLException {
- String sql = null;
- String seqName = SQLUtil.getSequenceName(conn, getDatabaseType(), catalog_col);
- DatabaseType dbType = getDatabaseType();
- if (seqName != null) {
- if (dbType == DatabaseType.POSTGRES) {
- sql = String.format("SELECT setval('%s', %d)", seqName.toLowerCase(), value);
- }
- else if (dbType == DatabaseType.SQLSERVER || dbType == DatabaseType.SQLAZURE) {
- sql = String.format("ALTER SEQUENCE [%s] RESTART WITH %d", seqName, value);
- }
- }
-
- if (sql != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Updating %s auto-increment counter %s with value '%d'", catalog_col.getName(), seqName, value));
- }
- try (Statement stmt = conn.createStatement()) {
- boolean result = stmt.execute(sql);
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("%s => [%s]", sql, result));
- }
- }
+ if (sql != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ String.format(
+ "Updating %s auto-increment counter %s with value '%d'",
+ catalog_col.getName(), seqName, value));
+ }
+ try (Statement stmt = conn.createStatement()) {
+ boolean result = stmt.execute(sql);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("%s => [%s]", sql, result));
}
+ }
}
+ }
}
diff --git a/src/main/java/com/oltpbenchmark/api/LoaderThread.java b/src/main/java/com/oltpbenchmark/api/LoaderThread.java
index 50f4ba9c3..f60d857eb 100644
--- a/src/main/java/com/oltpbenchmark/api/LoaderThread.java
+++ b/src/main/java/com/oltpbenchmark/api/LoaderThread.java
@@ -17,57 +17,57 @@
package com.oltpbenchmark.api;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.sql.Connection;
import java.sql.SQLException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * A LoaderThread is responsible for loading some portion of a
- * benchmark's database.
- * Note that each LoaderThread has its own database Connection handle.
+ * A LoaderThread is responsible for loading some portion of a benchmark's database. Note that each
+ * LoaderThread has its own database Connection handle.
*/
public abstract class LoaderThread implements Runnable {
- private static final Logger LOG = LoggerFactory.getLogger(LoaderThread.class);
+ private static final Logger LOG = LoggerFactory.getLogger(LoaderThread.class);
- private final BenchmarkModule benchmarkModule;
+ private final BenchmarkModule benchmarkModule;
- public LoaderThread(BenchmarkModule benchmarkModule) {
- this.benchmarkModule = benchmarkModule;
- }
+ public LoaderThread(BenchmarkModule benchmarkModule) {
+ this.benchmarkModule = benchmarkModule;
+ }
- @Override
- public final void run() {
- beforeLoad();
- try (Connection conn = benchmarkModule.makeConnection()) {
- load(conn);
- } catch (SQLException ex) {
- SQLException next_ex = ex.getNextException();
- String msg = String.format("Unexpected error when loading %s database", benchmarkModule.getBenchmarkName().toUpperCase());
- LOG.error(msg, next_ex);
- throw new RuntimeException(ex);
- } finally {
- afterLoad();
- }
+ @Override
+ public final void run() {
+ beforeLoad();
+ try (Connection conn = benchmarkModule.makeConnection()) {
+ load(conn);
+ } catch (SQLException ex) {
+ SQLException next_ex = ex.getNextException();
+ String msg =
+ String.format(
+ "Unexpected error when loading %s database",
+ benchmarkModule.getBenchmarkName().toUpperCase());
+ LOG.error(msg, next_ex);
+ throw new RuntimeException(ex);
+ } finally {
+ afterLoad();
}
+ }
- /**
- * This is the method that each LoaderThread has to implement
- *
- * @param conn
- * @throws SQLException
- */
- public abstract void load(Connection conn) throws SQLException;
-
- public void beforeLoad() {
- // useful for implementing waits for countdown latches, this ensures we open the connection right before its used to avoid stale connections
- }
-
- public void afterLoad() {
- // useful for counting down latches
- }
+ /**
+ * This is the method that each LoaderThread has to implement
+ *
+ * @param conn
+ * @throws SQLException
+ */
+ public abstract void load(Connection conn) throws SQLException;
+ public void beforeLoad() {
+ // useful for implementing waits for countdown latches, this ensures we open the connection
+ // right before its used to avoid stale connections
+ }
+ public void afterLoad() {
+ // useful for counting down latches
+ }
}
diff --git a/src/main/java/com/oltpbenchmark/api/Operation.java b/src/main/java/com/oltpbenchmark/api/Operation.java
index e56713dd4..cc2311aaf 100644
--- a/src/main/java/com/oltpbenchmark/api/Operation.java
+++ b/src/main/java/com/oltpbenchmark/api/Operation.java
@@ -17,6 +17,4 @@
package com.oltpbenchmark.api;
-public abstract class Operation {
-
-}
+public abstract class Operation {}
diff --git a/src/main/java/com/oltpbenchmark/api/Procedure.java b/src/main/java/com/oltpbenchmark/api/Procedure.java
index fa9c711a8..bc55a958e 100644
--- a/src/main/java/com/oltpbenchmark/api/Procedure.java
+++ b/src/main/java/com/oltpbenchmark/api/Procedure.java
@@ -19,9 +19,6 @@
import com.oltpbenchmark.jdbc.AutoIncrementPreparedStatement;
import com.oltpbenchmark.types.DatabaseType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.sql.Connection;
@@ -31,194 +28,193 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public abstract class Procedure {
- private static final Logger LOG = LoggerFactory.getLogger(Procedure.class);
-
- private final String procName;
- private DatabaseType dbType;
-
- public DatabaseType getDbType() {
- return dbType;
+ private static final Logger LOG = LoggerFactory.getLogger(Procedure.class);
+
+ private final String procName;
+ private DatabaseType dbType;
+
+ public DatabaseType getDbType() {
+ return dbType;
+ }
+
+ private Map name_stmt_xref;
+
+ /** Constructor */
+ protected Procedure() {
+ this.procName = this.getClass().getSimpleName();
+ }
+
+ /**
+ * Initialize all of the SQLStmt handles. This must be called separately from the constructor,
+ * otherwise we can't get access to all of our SQLStmts.
+ *
+ * @param
+ * @return
+ */
+ @SuppressWarnings("unchecked")
+ protected final T initialize(DatabaseType dbType) {
+ this.dbType = dbType;
+ this.name_stmt_xref = Procedure.getStatements(this);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ String.format(
+ "Initialized %s with %d SQLStmts: %s",
+ this, this.name_stmt_xref.size(), this.name_stmt_xref.keySet()));
}
-
- private Map name_stmt_xref;
-
- /**
- * Constructor
- */
- protected Procedure() {
- this.procName = this.getClass().getSimpleName();
+ return ((T) this);
+ }
+
+ /** Return the name of this Procedure */
+ protected final String getProcedureName() {
+ return (this.procName);
+ }
+
+ /**
+ * Return a PreparedStatement for the given SQLStmt handle The underlying Procedure API will make
+ * sure that the proper SQL for the target DBMS is used for this SQLStmt. This will automatically
+ * call setObject for all the parameters you pass in
+ *
+ * @param conn
+ * @param stmt
+ * @param params
+ * @return
+ * @throws SQLException
+ */
+ public final PreparedStatement getPreparedStatement(
+ Connection conn, SQLStmt stmt, Object... params) throws SQLException {
+ PreparedStatement pStmt = this.getPreparedStatementReturnKeys(conn, stmt, null);
+ for (int i = 0; i < params.length; i++) {
+ pStmt.setObject(i + 1, params[i]);
}
-
- /**
- * Initialize all of the SQLStmt handles. This must be called separately from
- * the constructor, otherwise we can't get access to all of our SQLStmts.
- *
- * @param
- * @return
- */
- @SuppressWarnings("unchecked")
- protected final T initialize(DatabaseType dbType) {
- this.dbType = dbType;
- this.name_stmt_xref = Procedure.getStatements(this);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Initialized %s with %d SQLStmts: %s",
- this, this.name_stmt_xref.size(), this.name_stmt_xref.keySet()));
- }
- return ((T) this);
+ return (pStmt);
+ }
+
+ /**
+ * Return a PreparedStatement for the given SQLStmt handle The underlying Procedure API will make
+ * sure that the proper SQL for the target DBMS is used for this SQLStmt.
+ *
+ * @param conn
+ * @param stmt
+ * @param is
+ * @return
+ * @throws SQLException
+ */
+ public final PreparedStatement getPreparedStatementReturnKeys(
+ Connection conn, SQLStmt stmt, int[] is) throws SQLException {
+
+ PreparedStatement pStmt = null;
+
+ // HACK: If the target system is Postgres, wrap the PreparedStatement in a special
+ // one that fakes the getGeneratedKeys().
+ if (is != null
+ && (this.dbType == DatabaseType.POSTGRES
+ || this.dbType == DatabaseType.COCKROACHDB
+ || this.dbType == DatabaseType.SQLSERVER
+ || this.dbType == DatabaseType.SQLAZURE)) {
+ pStmt = new AutoIncrementPreparedStatement(this.dbType, conn.prepareStatement(stmt.getSQL()));
}
-
- /**
- * Return the name of this Procedure
- */
- protected final String getProcedureName() {
- return (this.procName);
+ // Everyone else can use the regular getGeneratedKeys() method
+ else if (is != null) {
+ pStmt = conn.prepareStatement(stmt.getSQL(), is);
}
-
- /**
- * Return a PreparedStatement for the given SQLStmt handle
- * The underlying Procedure API will make sure that the proper SQL
- * for the target DBMS is used for this SQLStmt.
- * This will automatically call setObject for all the parameters you pass in
- *
- * @param conn
- * @param stmt
- * @param params
- * @return
- * @throws SQLException
- */
- public final PreparedStatement getPreparedStatement(Connection conn, SQLStmt stmt, Object... params) throws SQLException {
- PreparedStatement pStmt = this.getPreparedStatementReturnKeys(conn, stmt, null);
- for (int i = 0; i < params.length; i++) {
- pStmt.setObject(i + 1, params[i]);
- }
- return (pStmt);
+ // They don't care about keys
+ else {
+ pStmt = conn.prepareStatement(stmt.getSQL());
}
- /**
- * Return a PreparedStatement for the given SQLStmt handle
- * The underlying Procedure API will make sure that the proper SQL
- * for the target DBMS is used for this SQLStmt.
- *
- * @param conn
- * @param stmt
- * @param is
- * @return
- * @throws SQLException
- */
- public final PreparedStatement getPreparedStatementReturnKeys(Connection conn, SQLStmt stmt, int[] is) throws SQLException {
-
- PreparedStatement pStmt = null;
-
- // HACK: If the target system is Postgres, wrap the PreparedStatement in a special
- // one that fakes the getGeneratedKeys().
- if (is != null && (
- this.dbType == DatabaseType.POSTGRES
- || this.dbType == DatabaseType.COCKROACHDB
- || this.dbType == DatabaseType.SQLSERVER
- || this.dbType == DatabaseType.SQLAZURE
- )
- ) {
- pStmt = new AutoIncrementPreparedStatement(this.dbType, conn.prepareStatement(stmt.getSQL()));
- }
- // Everyone else can use the regular getGeneratedKeys() method
- else if (is != null) {
- pStmt = conn.prepareStatement(stmt.getSQL(), is);
- }
- // They don't care about keys
- else {
- pStmt = conn.prepareStatement(stmt.getSQL());
- }
-
- return (pStmt);
- }
-
- /**
- * Fetch the SQL from the dialect map
- *
- * @param dialects
- */
- protected final void loadSQLDialect(StatementDialects dialects) {
- Collection stmtNames = dialects.getStatementNames(this.procName);
- if (stmtNames == null) {
- return;
- }
- for (String stmtName : stmtNames) {
- String sql = dialects.getSQL(this.procName, stmtName);
-
-
- SQLStmt stmt = this.name_stmt_xref.get(stmtName);
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Setting %s SQL dialect for %s.%s",
- dialects.getDatabaseType(), this.procName, stmtName));
- }
- if (stmt == null) {
- throw new RuntimeException(String.format("Dialect file contains an unknown statement: Procedure %s, Statement %s", this.procName, stmtName));
- }
- stmt.setSQL(sql);
- }
+ return (pStmt);
+ }
+
+ /**
+ * Fetch the SQL from the dialect map
+ *
+ * @param dialects
+ */
+ protected final void loadSQLDialect(StatementDialects dialects) {
+ Collection stmtNames = dialects.getStatementNames(this.procName);
+ if (stmtNames == null) {
+ return;
}
-
- /**
- * Hook for testing
- *
- * @return
- */
- protected final Map getStatements() {
- return (Collections.unmodifiableMap(this.name_stmt_xref));
+ for (String stmtName : stmtNames) {
+ String sql = dialects.getSQL(this.procName, stmtName);
+
+ SQLStmt stmt = this.name_stmt_xref.get(stmtName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ String.format(
+ "Setting %s SQL dialect for %s.%s",
+ dialects.getDatabaseType(), this.procName, stmtName));
+ }
+ if (stmt == null) {
+ throw new RuntimeException(
+ String.format(
+ "Dialect file contains an unknown statement: Procedure %s, Statement %s",
+ this.procName, stmtName));
+ }
+ stmt.setSQL(sql);
}
-
- protected static Map getStatements(Procedure proc) {
- Class extends Procedure> c = proc.getClass();
- Map stmts = new HashMap<>();
- for (Field f : c.getDeclaredFields()) {
- int modifiers = f.getModifiers();
- if (!Modifier.isTransient(modifiers) &&
- Modifier.isPublic(modifiers) &&
- !Modifier.isStatic(modifiers)) {
- try {
- Object o = f.get(proc);
- if (o instanceof SQLStmt) {
- stmts.put(f.getName(), (SQLStmt) o);
- }
- } catch (Exception ex) {
- throw new RuntimeException("Failed to retrieve " + f + " from " + c.getSimpleName(), ex);
- }
- }
+ }
+
+ /**
+ * Hook for testing
+ *
+ * @return
+ */
+ protected final Map getStatements() {
+ return (Collections.unmodifiableMap(this.name_stmt_xref));
+ }
+
+ protected static Map getStatements(Procedure proc) {
+ Class extends Procedure> c = proc.getClass();
+ Map stmts = new HashMap<>();
+ for (Field f : c.getDeclaredFields()) {
+ int modifiers = f.getModifiers();
+ if (!Modifier.isTransient(modifiers)
+ && Modifier.isPublic(modifiers)
+ && !Modifier.isStatic(modifiers)) {
+ try {
+ Object o = f.get(proc);
+ if (o instanceof SQLStmt) {
+ stmts.put(f.getName(), (SQLStmt) o);
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException("Failed to retrieve " + f + " from " + c.getSimpleName(), ex);
}
- return (stmts);
+ }
}
+ return (stmts);
+ }
- @Override
- public String toString() {
- return (this.procName);
- }
+ @Override
+ public String toString() {
+ return (this.procName);
+ }
+
+ /**
+ * Thrown from a Procedure to indicate to the Worker that the procedure should be aborted and
+ * rolled back.
+ */
+ public static class UserAbortException extends RuntimeException {
+ private static final long serialVersionUID = -1L;
/**
- * Thrown from a Procedure to indicate to the Worker
- * that the procedure should be aborted and rolled back.
+ * Default Constructor
+ *
+ * @param msg
+ * @param ex
*/
- public static class UserAbortException extends RuntimeException {
- private static final long serialVersionUID = -1L;
-
- /**
- * Default Constructor
- *
- * @param msg
- * @param ex
- */
- public UserAbortException(String msg, Throwable ex) {
- super(msg, ex);
- }
+ public UserAbortException(String msg, Throwable ex) {
+ super(msg, ex);
+ }
- /**
- * Constructs a new UserAbortException
- * with the specified detail message.
- */
- public UserAbortException(String msg) {
- this(msg, null);
- }
+ /** Constructs a new UserAbortException with the specified detail message. */
+ public UserAbortException(String msg) {
+ this(msg, null);
}
+ }
}
diff --git a/src/main/java/com/oltpbenchmark/api/SQLStmt.java b/src/main/java/com/oltpbenchmark/api/SQLStmt.java
index f363d4e64..bdf2cdce2 100644
--- a/src/main/java/com/oltpbenchmark/api/SQLStmt.java
+++ b/src/main/java/com/oltpbenchmark/api/SQLStmt.java
@@ -15,14 +15,12 @@
*
*/
-
package com.oltpbenchmark.api;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Wrapper Class for SQL Statements
@@ -30,65 +28,63 @@
* @author pavlo
*/
public final class SQLStmt {
- private static final Logger LOG = LoggerFactory.getLogger(SQLStmt.class);
+ private static final Logger LOG = LoggerFactory.getLogger(SQLStmt.class);
- private static final Pattern SUBSTITUTION_PATTERN = Pattern.compile("\\?\\?");
+ private static final Pattern SUBSTITUTION_PATTERN = Pattern.compile("\\?\\?");
- private String orig_sql;
- private String sql;
+ private String orig_sql;
+ private String sql;
- /**
- * For each unique '??' that we encounter in the SQL for this Statement,
- * we will substitute it with the number of '?' specified in this array.
- */
- private final int[] substitutions;
+ /**
+ * For each unique '??' that we encounter in the SQL for this Statement, we will substitute it
+ * with the number of '?' specified in this array.
+ */
+ private final int[] substitutions;
- /**
- * Constructor
- *
- * @param sql
- * @param substitutions
- */
- public SQLStmt(String sql, int... substitutions) {
- this.substitutions = substitutions;
- this.setSQL(sql);
- }
+ /**
+ * Constructor
+ *
+ * @param sql
+ * @param substitutions
+ */
+ public SQLStmt(String sql, int... substitutions) {
+ this.substitutions = substitutions;
+ this.setSQL(sql);
+ }
- /**
- * Magic SQL setter!
- * Each occurrence of the pattern "??" will be replaced by a string
- * of repeated ?'s
- *
- * @param sql
- */
- public final void setSQL(String sql) {
- this.orig_sql = sql.trim();
- for (int ctr : this.substitutions) {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < ctr; i++) {
- sb.append(i > 0 ? ", " : "").append("?");
- }
- Matcher m = SUBSTITUTION_PATTERN.matcher(sql);
- String replace = sb.toString();
- sql = m.replaceFirst(replace);
- }
- this.sql = sql;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Initialized SQL:\n{}", this.sql);
- }
+ /**
+ * Magic SQL setter! Each occurrence of the pattern "??" will be replaced by a string of repeated
+ * ?'s
+ *
+ * @param sql
+ */
+ public final void setSQL(String sql) {
+ this.orig_sql = sql.trim();
+ for (int ctr : this.substitutions) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < ctr; i++) {
+ sb.append(i > 0 ? ", " : "").append("?");
+ }
+ Matcher m = SUBSTITUTION_PATTERN.matcher(sql);
+ String replace = sb.toString();
+ sql = m.replaceFirst(replace);
}
-
- public final String getSQL() {
- return (this.sql);
+ this.sql = sql;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initialized SQL:\n{}", this.sql);
}
+ }
- protected final String getOriginalSQL() {
- return (this.orig_sql);
- }
+ public final String getSQL() {
+ return (this.sql);
+ }
- @Override
- public String toString() {
- return "SQLStmt{" + this.sql + "}";
- }
+ protected final String getOriginalSQL() {
+ return (this.orig_sql);
+ }
+ @Override
+ public String toString() {
+ return "SQLStmt{" + this.sql + "}";
+ }
}
diff --git a/src/main/java/com/oltpbenchmark/api/StatementDialects.java b/src/main/java/com/oltpbenchmark/api/StatementDialects.java
index aed1cfbcb..6c1146223 100644
--- a/src/main/java/com/oltpbenchmark/api/StatementDialects.java
+++ b/src/main/java/com/oltpbenchmark/api/StatementDialects.java
@@ -20,265 +20,258 @@
import com.oltpbenchmark.WorkloadConfiguration;
import com.oltpbenchmark.api.dialects.*;
import com.oltpbenchmark.types.DatabaseType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.xml.sax.SAXException;
-
import jakarta.xml.bind.*;
-
-import javax.xml.transform.stream.StreamSource;
-import javax.xml.validation.Schema;
-import javax.xml.validation.SchemaFactory;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.util.*;
import java.util.Map.Entry;
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.SAXException;
/**
* @author pavlo
*/
public final class StatementDialects {
- private static final Logger LOG = LoggerFactory.getLogger(StatementDialects.class);
-
- private static final DatabaseType DEFAULT_DB_TYPE = DatabaseType.MYSQL;
+ private static final Logger LOG = LoggerFactory.getLogger(StatementDialects.class);
- private final WorkloadConfiguration workloadConfiguration;
+ private static final DatabaseType DEFAULT_DB_TYPE = DatabaseType.MYSQL;
+ private final WorkloadConfiguration workloadConfiguration;
- /**
- * ProcName -> StmtName -> SQL
- */
- private final Map> dialectsMap = new HashMap<>();
+ /** ProcName -> StmtName -> SQL */
+ private final Map> dialectsMap = new HashMap<>();
- /**
- * Constructor
- *
- * @param workloadConfiguration
- */
- public StatementDialects(WorkloadConfiguration workloadConfiguration) {
- this.workloadConfiguration = workloadConfiguration;
+ /**
+ * Constructor
+ *
+ * @param workloadConfiguration
+ */
+ public StatementDialects(WorkloadConfiguration workloadConfiguration) {
+ this.workloadConfiguration = workloadConfiguration;
- try {
- this.load();
- } catch (JAXBException | SAXException e) {
- throw new RuntimeException(String.format("Error loading dialect: %s", e.getMessage()), e);
- }
+ try {
+ this.load();
+ } catch (JAXBException | SAXException e) {
+ throw new RuntimeException(String.format("Error loading dialect: %s", e.getMessage()), e);
+ }
+ }
+
+ /**
+ * Return the File handle to the SQL Dialect XML file used for this benchmark
+ *
+ * @return
+ */
+ public String getSQLDialectPath(DatabaseType databaseType) {
+ String fileName = null;
+
+ if (databaseType != null) {
+ fileName = "dialect-" + databaseType.name().toLowerCase() + ".xml";
}
+ if (fileName != null) {
- /**
- * Return the File handle to the SQL Dialect XML file
- * used for this benchmark
- *
- * @return
- */
- public String getSQLDialectPath(DatabaseType databaseType) {
- String fileName = null;
-
- if (databaseType != null) {
- fileName = "dialect-" + databaseType.name().toLowerCase() + ".xml";
- }
-
-
- if (fileName != null) {
-
- final String path = "/benchmarks/" + workloadConfiguration.getBenchmarkName() + "/" + fileName;
-
- try (InputStream stream = this.getClass().getResourceAsStream(path)) {
-
- if (stream != null) {
- return path;
- }
+ final String path =
+ "/benchmarks/" + workloadConfiguration.getBenchmarkName() + "/" + fileName;
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
- }
+ try (InputStream stream = this.getClass().getResourceAsStream(path)) {
- LOG.debug("No dialect file in {}", path);
+ if (stream != null) {
+ return path;
}
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
- return (null);
+ LOG.debug("No dialect file in {}", path);
}
- /**
- * Load in the assigned XML file and populate the internal dialects map
- *
- * @return
- */
- protected boolean load() throws JAXBException, SAXException {
- final DatabaseType dbType = workloadConfiguration.getDatabaseType();
-
- final String sqlDialectPath = getSQLDialectPath(dbType);
+ return (null);
+ }
- if (sqlDialectPath == null) {
- LOG.debug("SKIP - No SQL dialect file was given.");
- return (false);
- }
-
- final String xmlContext = this.getClass().getPackage().getName() + ".dialects";
+ /**
+ * Load in the assigned XML file and populate the internal dialects map
+ *
+ * @return
+ */
+ protected boolean load() throws JAXBException, SAXException {
+ final DatabaseType dbType = workloadConfiguration.getDatabaseType();
- // COPIED FROM VoltDB's VoltCompiler.java
- JAXBContext jc = JAXBContext.newInstance(xmlContext);
- // This schema shot the sheriff.
- SchemaFactory sf = SchemaFactory.newInstance(javax.xml.XMLConstants.W3C_XML_SCHEMA_NS_URI);
- Schema schema = sf.newSchema(new StreamSource(this.getClass().getResourceAsStream("/dialect.xsd")));
- Unmarshaller unmarshaller = jc.createUnmarshaller();
- // But did not shoot unmarshaller!
- unmarshaller.setSchema(schema);
-
- StreamSource streamSource = new StreamSource(this.getClass().getResourceAsStream(sqlDialectPath));
- JAXBElement result = unmarshaller.unmarshal(streamSource, DialectsType.class);
- DialectsType dialects = result.getValue();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Loading the SQL dialect file for path {}", sqlDialectPath);
- }
+ final String sqlDialectPath = getSQLDialectPath(dbType);
+ if (sqlDialectPath == null) {
+ LOG.debug("SKIP - No SQL dialect file was given.");
+ return (false);
+ }
- for (DialectType dialect : dialects.getDialect()) {
+ final String xmlContext = this.getClass().getPackage().getName() + ".dialects";
+
+ // COPIED FROM VoltDB's VoltCompiler.java
+ JAXBContext jc = JAXBContext.newInstance(xmlContext);
+ // This schema shot the sheriff.
+ SchemaFactory sf = SchemaFactory.newInstance(javax.xml.XMLConstants.W3C_XML_SCHEMA_NS_URI);
+ Schema schema =
+ sf.newSchema(new StreamSource(this.getClass().getResourceAsStream("/dialect.xsd")));
+ Unmarshaller unmarshaller = jc.createUnmarshaller();
+ // But did not shoot unmarshaller!
+ unmarshaller.setSchema(schema);
+
+ StreamSource streamSource =
+ new StreamSource(this.getClass().getResourceAsStream(sqlDialectPath));
+ JAXBElement result = unmarshaller.unmarshal(streamSource, DialectsType.class);
+ DialectsType dialects = result.getValue();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Loading the SQL dialect file for path {}", sqlDialectPath);
+ }
+ for (DialectType dialect : dialects.getDialect()) {
- if (!dialect.getType().equalsIgnoreCase(dbType.name())) {
- continue;
- }
+ if (!dialect.getType().equalsIgnoreCase(dbType.name())) {
+ continue;
+ }
- // For each Procedure in the XML file, go through its list of Statements
- // and populate our dialects map with the mapped SQL
- for (ProcedureType procedure : dialect.getProcedure()) {
- String procName = procedure.getName();
+ // For each Procedure in the XML file, go through its list of Statements
+ // and populate our dialects map with the mapped SQL
+ for (ProcedureType procedure : dialect.getProcedure()) {
+ String procName = procedure.getName();
- // Loop through all of the Statements listed for this Procedure
- Map procDialects = this.dialectsMap.get(procName);
- for (StatementType statement : procedure.getStatement()) {
- String stmtName = statement.getName();
- String stmtSQL = statement.getValue().trim();
- if (procDialects == null) {
- procDialects = new HashMap<>();
- this.dialectsMap.put(procName, procDialects);
- }
- procDialects.put(stmtName, stmtSQL);
- LOG.debug(String.format("%s.%s.%s\n%s\n", dbType, procName, stmtName, stmtSQL));
- }
- }
- }
- if (this.dialectsMap.isEmpty()) {
- LOG.warn(String.format("No SQL dialect provided for %s. Using default %s",
- dbType, DEFAULT_DB_TYPE));
- return (false);
+ // Loop through all of the Statements listed for this Procedure
+ Map procDialects = this.dialectsMap.get(procName);
+ for (StatementType statement : procedure.getStatement()) {
+ String stmtName = statement.getName();
+ String stmtSQL = statement.getValue().trim();
+ if (procDialects == null) {
+ procDialects = new HashMap<>();
+ this.dialectsMap.put(procName, procDialects);
+ }
+ procDialects.put(stmtName, stmtSQL);
+ LOG.debug(String.format("%s.%s.%s\n%s\n", dbType, procName, stmtName, stmtSQL));
}
-
- return (true);
+ }
}
-
- /**
- * Export the original SQL for all of the SQLStmt in the given list of Procedures
- *
- * @param dbType
- * @param procedures
- * @return A well-formed XML export of the SQL for the given Procedures
- */
- public String export(DatabaseType dbType, Collection procedures) {
-
- Marshaller marshaller = null;
- JAXBContext jc = null;
-
- final String xmlContext = this.getClass().getPackage().getName() + ".dialects";
-
- try {
- jc = JAXBContext.newInstance(xmlContext);
- marshaller = jc.createMarshaller();
-
- SchemaFactory sf = SchemaFactory.newInstance(javax.xml.XMLConstants.W3C_XML_SCHEMA_NS_URI);
- Schema schema = sf.newSchema(new StreamSource(this.getClass().getResourceAsStream("/dialect.xsd")));
- marshaller.setSchema(schema);
- } catch (Exception ex) {
- throw new RuntimeException("Unable to initialize serializer", ex);
- }
-
- List sorted = new ArrayList<>(procedures);
- sorted.sort(new Comparator() {
- @Override
- public int compare(Procedure o1, Procedure o2) {
- return (o1.getProcedureName().compareTo(o2.getProcedureName()));
- }
- });
-
- ObjectFactory factory = new ObjectFactory();
- DialectType dType = factory.createDialectType();
- dType.setType(dbType.name());
- for (Procedure proc : sorted) {
- if (proc.getStatements().isEmpty()) {
- continue;
- }
-
- ProcedureType pType = factory.createProcedureType();
- pType.setName(proc.getProcedureName());
- for (Entry e : proc.getStatements().entrySet()) {
- StatementType sType = factory.createStatementType();
- sType.setName(e.getKey());
- sType.setValue(e.getValue().getOriginalSQL());
- pType.getStatement().add(sType);
- }
- dType.getProcedure().add(pType);
- }
- DialectsType dialects = factory.createDialectsType();
- dialects.getDialect().add(dType);
-
- StringWriter st = new StringWriter();
- try {
- marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
- marshaller.marshal(factory.createDialects(dialects), st);
- } catch (JAXBException ex) {
- throw new RuntimeException("Failed to generate XML", ex);
- }
-
- return (st.toString());
+ if (this.dialectsMap.isEmpty()) {
+ LOG.warn(
+ String.format(
+ "No SQL dialect provided for %s. Using default %s", dbType, DEFAULT_DB_TYPE));
+ return (false);
}
- /**
- * Return the DatabaseType loaded from the XML file
- *
- * @return
- */
- public DatabaseType getDatabaseType() {
- return workloadConfiguration.getDatabaseType();
+ return (true);
+ }
+
+ /**
+ * Export the original SQL for all of the SQLStmt in the given list of Procedures
+ *
+ * @param dbType
+ * @param procedures
+ * @return A well-formed XML export of the SQL for the given Procedures
+ */
+ public String export(DatabaseType dbType, Collection procedures) {
+
+ Marshaller marshaller = null;
+ JAXBContext jc = null;
+
+ final String xmlContext = this.getClass().getPackage().getName() + ".dialects";
+
+ try {
+ jc = JAXBContext.newInstance(xmlContext);
+ marshaller = jc.createMarshaller();
+
+ SchemaFactory sf = SchemaFactory.newInstance(javax.xml.XMLConstants.W3C_XML_SCHEMA_NS_URI);
+ Schema schema =
+ sf.newSchema(new StreamSource(this.getClass().getResourceAsStream("/dialect.xsd")));
+ marshaller.setSchema(schema);
+ } catch (Exception ex) {
+ throw new RuntimeException("Unable to initialize serializer", ex);
}
- /**
- * Return the list of Statement names that we have dialect information
- * for the given Procedure name. If there are SQL dialects for the given
- * Procedure, then the result will be null.
- *
- * @param procName
- * @return
- */
- protected Collection getStatementNames(String procName) {
- Map procDialects = this.dialectsMap.get(procName);
- return (procDialects != null ? procDialects.keySet() : null);
- }
+ List sorted = new ArrayList<>(procedures);
+ sorted.sort(
+ new Comparator() {
+ @Override
+ public int compare(Procedure o1, Procedure o2) {
+ return (o1.getProcedureName().compareTo(o2.getProcedureName()));
+ }
+ });
- /**
- * Return the SQL dialect for the given Statement in the Procedure
- *
- * @param procName
- * @param stmtName
- * @return
- */
- public String getSQL(String procName, String stmtName) {
- Map procDialects = this.dialectsMap.get(procName);
- if (procDialects != null) {
- return (procDialects.get(stmtName));
- }
- return (null);
+ ObjectFactory factory = new ObjectFactory();
+ DialectType dType = factory.createDialectType();
+ dType.setType(dbType.name());
+ for (Procedure proc : sorted) {
+ if (proc.getStatements().isEmpty()) {
+ continue;
+ }
+
+ ProcedureType pType = factory.createProcedureType();
+ pType.setName(proc.getProcedureName());
+ for (Entry e : proc.getStatements().entrySet()) {
+ StatementType sType = factory.createStatementType();
+ sType.setName(e.getKey());
+ sType.setValue(e.getValue().getOriginalSQL());
+ pType.getStatement().add(sType);
+ }
+ dType.getProcedure().add(pType);
}
-
- /**
- * @return The list of Procedure names that we have dialect information for.
- */
- protected Collection getProcedureNames() {
- return (this.dialectsMap.keySet());
+ DialectsType dialects = factory.createDialectsType();
+ dialects.getDialect().add(dType);
+
+ StringWriter st = new StringWriter();
+ try {
+ marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
+ marshaller.marshal(factory.createDialects(dialects), st);
+ } catch (JAXBException ex) {
+ throw new RuntimeException("Failed to generate XML", ex);
}
+ return (st.toString());
+ }
+
+ /**
+ * Return the DatabaseType loaded from the XML file
+ *
+ * @return
+ */
+ public DatabaseType getDatabaseType() {
+ return workloadConfiguration.getDatabaseType();
+ }
+
+ /**
+ * Return the list of Statement names that we have dialect information for the given Procedure
+ * name. If there are SQL dialects for the given Procedure, then the result will be null.
+ *
+ * @param procName
+ * @return
+ */
+ protected Collection getStatementNames(String procName) {
+ Map procDialects = this.dialectsMap.get(procName);
+ return (procDialects != null ? procDialects.keySet() : null);
+ }
+
+ /**
+ * Return the SQL dialect for the given Statement in the Procedure
+ *
+ * @param procName
+ * @param stmtName
+ * @return
+ */
+ public String getSQL(String procName, String stmtName) {
+ Map procDialects = this.dialectsMap.get(procName);
+ if (procDialects != null) {
+ return (procDialects.get(stmtName));
+ }
+ return (null);
+ }
+
+ /**
+ * @return The list of Procedure names that we have dialect information for.
+ */
+ protected Collection getProcedureNames() {
+ return (this.dialectsMap.keySet());
+ }
}
diff --git a/src/main/java/com/oltpbenchmark/api/TransactionGenerator.java b/src/main/java/com/oltpbenchmark/api/TransactionGenerator.java
index ab84dd350..5217a0f2b 100644
--- a/src/main/java/com/oltpbenchmark/api/TransactionGenerator.java
+++ b/src/main/java/com/oltpbenchmark/api/TransactionGenerator.java
@@ -17,10 +17,7 @@
package com.oltpbenchmark.api;
-
public interface TransactionGenerator {
- /**
- * Implementations *must* be thread-safe.
- */
- T nextTransaction();
+ /** Implementations *must* be thread-safe. */
+ T nextTransaction();
}
diff --git a/src/main/java/com/oltpbenchmark/api/TransactionType.java b/src/main/java/com/oltpbenchmark/api/TransactionType.java
index 4086cd5b8..c2607336f 100644
--- a/src/main/java/com/oltpbenchmark/api/TransactionType.java
+++ b/src/main/java/com/oltpbenchmark/api/TransactionType.java
@@ -17,80 +17,87 @@
package com.oltpbenchmark.api;
-
import java.util.Objects;
public class TransactionType implements Comparable {
- public static class Invalid extends Procedure {
- }
-
- public static final int INVALID_ID = 0;
- public static final TransactionType INVALID = new TransactionType(Invalid.class, INVALID_ID, false, 0, 0);
-
- private final Class extends Procedure> procedureClass;
- private final int id;
- private final boolean supplemental;
- private final long preExecutionWait;
- private final long postExecutionWait;
-
- protected TransactionType(Class extends Procedure> procedureClass, int id, boolean supplemental, long preExecutionWait, long postExecutionWait) {
- this.procedureClass = procedureClass;
- this.id = id;
- this.supplemental = supplemental;
- this.preExecutionWait = preExecutionWait;
- this.postExecutionWait = postExecutionWait;
- }
-
- public Class extends Procedure> getProcedureClass() {
- return (this.procedureClass);
- }
-
- public String getName() {
- return this.procedureClass.getSimpleName();
- }
-
- public int getId() {
- return this.id;
+ public static class Invalid extends Procedure {}
+
+ public static final int INVALID_ID = 0;
+ public static final TransactionType INVALID =
+ new TransactionType(Invalid.class, INVALID_ID, false, 0, 0);
+
+ private final Class extends Procedure> procedureClass;
+ private final int id;
+ private final boolean supplemental;
+ private final long preExecutionWait;
+ private final long postExecutionWait;
+
+ protected TransactionType(
+ Class extends Procedure> procedureClass,
+ int id,
+ boolean supplemental,
+ long preExecutionWait,
+ long postExecutionWait) {
+ this.procedureClass = procedureClass;
+ this.id = id;
+ this.supplemental = supplemental;
+ this.preExecutionWait = preExecutionWait;
+ this.postExecutionWait = postExecutionWait;
+ }
+
+ public Class extends Procedure> getProcedureClass() {
+ return (this.procedureClass);
+ }
+
+ public String getName() {
+ return this.procedureClass.getSimpleName();
+ }
+
+ public int getId() {
+ return this.id;
+ }
+
+ public boolean isSupplemental() {
+ return this.supplemental;
+ }
+
+ public long getPreExecutionWait() {
+ return preExecutionWait;
+ }
+
+ public long getPostExecutionWait() {
+ return postExecutionWait;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
}
-
- public boolean isSupplemental() {
- return this.supplemental;
- }
-
- public long getPreExecutionWait() {
- return preExecutionWait;
+ if (o == null || getClass() != o.getClass()) {
+ return false;
}
-
- public long getPostExecutionWait() {
- return postExecutionWait;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- TransactionType that = (TransactionType) o;
- return id == that.id && supplemental == that.supplemental && preExecutionWait == that.preExecutionWait && postExecutionWait == that.postExecutionWait && procedureClass.equals(that.procedureClass);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(procedureClass, id, supplemental, preExecutionWait, postExecutionWait);
- }
-
- @Override
- public int compareTo(TransactionType o) {
- return (this.id - o.id);
- }
-
- @Override
- public String toString() {
- return String.format("%s/%02d", this.procedureClass.getName(), this.id);
- }
-
+ TransactionType that = (TransactionType) o;
+ return id == that.id
+ && supplemental == that.supplemental
+ && preExecutionWait == that.preExecutionWait
+ && postExecutionWait == that.postExecutionWait
+ && procedureClass.equals(that.procedureClass);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(procedureClass, id, supplemental, preExecutionWait, postExecutionWait);
+ }
+
+ @Override
+ public int compareTo(TransactionType o) {
+ return (this.id - o.id);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s/%02d", this.procedureClass.getName(), this.id);
+ }
}
diff --git a/src/main/java/com/oltpbenchmark/api/TransactionTypes.java b/src/main/java/com/oltpbenchmark/api/TransactionTypes.java
index 0059b08e7..39c8dc8f2 100644
--- a/src/main/java/com/oltpbenchmark/api/TransactionTypes.java
+++ b/src/main/java/com/oltpbenchmark/api/TransactionTypes.java
@@ -17,106 +17,104 @@
package com.oltpbenchmark.api;
-import org.apache.commons.collections4.map.ListOrderedMap;
-
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import org.apache.commons.collections4.map.ListOrderedMap;
public class TransactionTypes implements Collection {
- private final ListOrderedMap types = new ListOrderedMap<>();
-
- public TransactionTypes(List transactiontypes) {
- transactiontypes.sort(TransactionType::compareTo);
- for (TransactionType tt : transactiontypes) {
- String key = tt.getName().toUpperCase();
- this.types.put(key, tt);
- }
- }
-
- public TransactionType getType(String procName) {
- return (this.types.get(procName.toUpperCase()));
- }
-
- public TransactionType getType(Class extends Procedure> procClass) {
- return (this.getType(procClass.getSimpleName()));
- }
-
- public TransactionType getType(int id) {
- return (this.types.getValue(id));
- }
-
- @Override
- public String toString() {
- return this.types.values().toString();
- }
-
- @Override
- public boolean add(TransactionType tt) {
- String key = tt.getName().toUpperCase();
- this.types.put(key, tt);
- return (true);
- }
-
- @Override
- public boolean addAll(Collection extends TransactionType> c) {
- return false;
- }
-
- @Override
- public void clear() {
- this.types.clear();
- }
-
- @Override
- public boolean contains(Object o) {
- return false;
- }
-
- @Override
- public boolean containsAll(Collection> c) {
- return (this.types.values().containsAll(c));
- }
-
- @Override
- public boolean isEmpty() {
- return (this.types.isEmpty());
- }
-
- @Override
- public Iterator iterator() {
- return (this.types.values().iterator());
- }
-
- @Override
- public boolean remove(Object o) {
- return false;
- }
-
- @Override
- public boolean removeAll(Collection> c) {
- return false;
- }
-
- @Override
- public boolean retainAll(Collection> c) {
- return false;
- }
-
- @Override
- public int size() {
- return (this.types.size());
- }
-
- @Override
- public Object[] toArray() {
- return (this.types.values().toArray());
- }
-
- @Override
- public T[] toArray(T[] a) {
- return (this.types.values().toArray(a));
- }
-
+ private final ListOrderedMap types = new ListOrderedMap<>();
+
+ public TransactionTypes(List transactiontypes) {
+ transactiontypes.sort(TransactionType::compareTo);
+ for (TransactionType tt : transactiontypes) {
+ String key = tt.getName().toUpperCase();
+ this.types.put(key, tt);
+ }
+ }
+
+ public TransactionType getType(String procName) {
+ return (this.types.get(procName.toUpperCase()));
+ }
+
+ public TransactionType getType(Class extends Procedure> procClass) {
+ return (this.getType(procClass.getSimpleName()));
+ }
+
+ public TransactionType getType(int id) {
+ return (this.types.getValue(id));
+ }
+
+ @Override
+ public String toString() {
+ return this.types.values().toString();
+ }
+
+ @Override
+ public boolean add(TransactionType tt) {
+ String key = tt.getName().toUpperCase();
+ this.types.put(key, tt);
+ return (true);
+ }
+
+ @Override
+ public boolean addAll(Collection extends TransactionType> c) {
+ return false;
+ }
+
+ @Override
+ public void clear() {
+ this.types.clear();
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ return false;
+ }
+
+ @Override
+ public boolean containsAll(Collection> c) {
+ return (this.types.values().containsAll(c));
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return (this.types.isEmpty());
+ }
+
+ @Override
+ public Iterator iterator() {
+ return (this.types.values().iterator());
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ return false;
+ }
+
+ @Override
+ public boolean removeAll(Collection> c) {
+ return false;
+ }
+
+ @Override
+ public boolean retainAll(Collection> c) {
+ return false;
+ }
+
+ @Override
+ public int size() {
+ return (this.types.size());
+ }
+
+ @Override
+ public Object[] toArray() {
+ return (this.types.values().toArray());
+ }
+
+ @Override
+ public T[] toArray(T[] a) {
+ return (this.types.values().toArray(a));
+ }
}
diff --git a/src/main/java/com/oltpbenchmark/api/Worker.java b/src/main/java/com/oltpbenchmark/api/Worker.java
index f1cdf68ee..314622026 100644
--- a/src/main/java/com/oltpbenchmark/api/Worker.java
+++ b/src/main/java/com/oltpbenchmark/api/Worker.java
@@ -17,15 +17,14 @@
package com.oltpbenchmark.api;
+import static com.oltpbenchmark.types.State.MEASURE;
+
import com.oltpbenchmark.*;
import com.oltpbenchmark.api.Procedure.UserAbortException;
import com.oltpbenchmark.types.DatabaseType;
import com.oltpbenchmark.types.State;
import com.oltpbenchmark.types.TransactionStatus;
import com.oltpbenchmark.util.Histogram;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
@@ -34,526 +33,542 @@
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
-
-import static com.oltpbenchmark.types.State.MEASURE;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public abstract class Worker implements Runnable {
- private static final Logger LOG = LoggerFactory.getLogger(Worker.class);
- private static final Logger ABORT_LOG = LoggerFactory.getLogger("com.oltpbenchmark.api.ABORT_LOG");
-
- private WorkloadState workloadState;
- private LatencyRecord latencies;
- private final Statement currStatement;
-
- // Interval requests used by the monitor
- private final AtomicInteger intervalRequests = new AtomicInteger(0);
-
- private final int id;
- private final T benchmark;
- protected Connection conn = null;
- protected final WorkloadConfiguration configuration;
- protected final TransactionTypes transactionTypes;
- protected final Map procedures = new HashMap<>();
- protected final Map name_procedures = new HashMap<>();
- protected final Map, Procedure> class_procedures = new HashMap<>();
-
- private final Histogram txnUnknown = new Histogram<>();
- private final Histogram txnSuccess = new Histogram<>();
- private final Histogram txnAbort = new Histogram<>();
- private final Histogram txnRetry = new Histogram<>();
- private final Histogram txnErrors = new Histogram<>();
- private final Histogram txtRetryDifferent = new Histogram<>();
-
- private boolean seenDone = false;
-
- public Worker(T benchmark, int id) {
- this.id = id;
- this.benchmark = benchmark;
- this.configuration = this.benchmark.getWorkloadConfiguration();
- this.workloadState = this.configuration.getWorkloadState();
- this.currStatement = null;
- this.transactionTypes = this.configuration.getTransTypes();
-
- if (!this.configuration.getNewConnectionPerTxn()) {
- try {
- this.conn = this.benchmark.makeConnection();
- this.conn.setAutoCommit(false);
- this.conn.setTransactionIsolation(this.configuration.getIsolationMode());
- } catch (SQLException ex) {
- throw new RuntimeException("Failed to connect to database", ex);
- }
- }
-
- // Generate all the Procedures that we're going to need
- this.procedures.putAll(this.benchmark.getProcedures());
- for (Entry e : this.procedures.entrySet()) {
- Procedure proc = e.getValue();
- this.name_procedures.put(e.getKey().getName(), proc);
- this.class_procedures.put(proc.getClass(), proc);
- }
+ private static final Logger LOG = LoggerFactory.getLogger(Worker.class);
+ private static final Logger ABORT_LOG =
+ LoggerFactory.getLogger("com.oltpbenchmark.api.ABORT_LOG");
+
+ private WorkloadState workloadState;
+ private LatencyRecord latencies;
+ private final Statement currStatement;
+
+ // Interval requests used by the monitor
+ private final AtomicInteger intervalRequests = new AtomicInteger(0);
+
+ private final int id;
+ private final T benchmark;
+ protected Connection conn = null;
+ protected final WorkloadConfiguration configuration;
+ protected final TransactionTypes transactionTypes;
+ protected final Map procedures = new HashMap<>();
+ protected final Map name_procedures = new HashMap<>();
+ protected final Map, Procedure> class_procedures = new HashMap<>();
+
+ private final Histogram txnUnknown = new Histogram<>();
+ private final Histogram txnSuccess = new Histogram<>();
+ private final Histogram txnAbort = new Histogram<>();
+ private final Histogram txnRetry = new Histogram<>();
+ private final Histogram txnErrors = new Histogram<>();
+ private final Histogram txtRetryDifferent = new Histogram<>();
+
+ private boolean seenDone = false;
+
+ public Worker(T benchmark, int id) {
+ this.id = id;
+ this.benchmark = benchmark;
+ this.configuration = this.benchmark.getWorkloadConfiguration();
+ this.workloadState = this.configuration.getWorkloadState();
+ this.currStatement = null;
+ this.transactionTypes = this.configuration.getTransTypes();
+
+ if (!this.configuration.getNewConnectionPerTxn()) {
+ try {
+ this.conn = this.benchmark.makeConnection();
+ this.conn.setAutoCommit(false);
+ this.conn.setTransactionIsolation(this.configuration.getIsolationMode());
+ } catch (SQLException ex) {
+ throw new RuntimeException("Failed to connect to database", ex);
+ }
}
- /**
- * Get the BenchmarkModule managing this Worker
- */
- public final T getBenchmark() {
- return (this.benchmark);
+ // Generate all the Procedures that we're going to need
+ this.procedures.putAll(this.benchmark.getProcedures());
+ for (Entry e : this.procedures.entrySet()) {
+ Procedure proc = e.getValue();
+ this.name_procedures.put(e.getKey().getName(), proc);
+ this.class_procedures.put(proc.getClass(), proc);
}
-
- /**
- * Get the unique thread id for this worker
- */
- public final int getId() {
- return this.id;
+ }
+
+ /** Get the BenchmarkModule managing this Worker */
+ public final T getBenchmark() {
+ return (this.benchmark);
+ }
+
+ /** Get the unique thread id for this worker */
+ public final int getId() {
+ return this.id;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s<%03d>", this.getClass().getSimpleName(), this.getId());
+ }
+
+ public final WorkloadConfiguration getWorkloadConfiguration() {
+ return (this.benchmark.getWorkloadConfiguration());
+ }
+
+ public final Random rng() {
+ return (this.benchmark.rng());
+ }
+
+ public final int getRequests() {
+ return latencies.size();
+ }
+
+ public final int getAndResetIntervalRequests() {
+ return intervalRequests.getAndSet(0);
+ }
+
+ public final Iterable getLatencyRecords() {
+ return latencies;
+ }
+
+ public final Procedure getProcedure(TransactionType type) {
+ return (this.procedures.get(type));
+ }
+
+ @Deprecated
+ public final Procedure getProcedure(String name) {
+ return (this.name_procedures.get(name));
+ }
+
+ @SuppressWarnings("unchecked")
+ public final P getProcedure(Class
procClass) {
+ return (P) (this.class_procedures.get(procClass));
+ }
+
+ public final Histogram getTransactionSuccessHistogram() {
+ return (this.txnSuccess);
+ }
+
+ public final Histogram getTransactionUnknownHistogram() {
+ return (this.txnUnknown);
+ }
+
+ public final Histogram getTransactionRetryHistogram() {
+ return (this.txnRetry);
+ }
+
+ public final Histogram getTransactionAbortHistogram() {
+ return (this.txnAbort);
+ }
+
+ public final Histogram getTransactionErrorHistogram() {
+ return (this.txnErrors);
+ }
+
+ public final Histogram getTransactionRetryDifferentHistogram() {
+ return (this.txtRetryDifferent);
+ }
+
+ /** Stop executing the current statement. */
+ public synchronized void cancelStatement() {
+ try {
+ if (this.currStatement != null) {
+ this.currStatement.cancel();
+ }
+ } catch (SQLException e) {
+ LOG.error("Failed to cancel statement: {}", e.getMessage());
}
+ }
- @Override
- public String toString() {
- return String.format("%s<%03d>", this.getClass().getSimpleName(), this.getId());
- }
+ @Override
+ public final void run() {
+ Thread t = Thread.currentThread();
+ t.setName(this.toString());
- public final WorkloadConfiguration getWorkloadConfiguration() {
- return (this.benchmark.getWorkloadConfiguration());
- }
+ // In case of reuse reset the measurements
+ latencies = new LatencyRecord(workloadState.getTestStartNs());
- public final Random rng() {
- return (this.benchmark.rng());
+ // Invoke initialize callback
+ try {
+ this.initialize();
+ } catch (Throwable ex) {
+ throw new RuntimeException("Unexpected error when initializing " + this, ex);
}
- public final int getRequests() {
- return latencies.size();
- }
+ // wait for start
+ workloadState.blockForStart();
- public final int getAndResetIntervalRequests() {
- return intervalRequests.getAndSet(0);
- }
+ while (true) {
- public final Iterable getLatencyRecords() {
- return latencies;
- }
+ // PART 1: Init and check if done
- public final Procedure getProcedure(TransactionType type) {
- return (this.procedures.get(type));
- }
+ State preState = workloadState.getGlobalState();
- @Deprecated
- public final Procedure getProcedure(String name) {
- return (this.name_procedures.get(name));
- }
+ // Do nothing
+ if (preState == State.DONE) {
+ if (!seenDone) {
+ // This is the first time we have observed that the
+ // test is done notify the global test state, then
+ // continue applying load
+ seenDone = true;
+ workloadState.signalDone();
+ break;
+ }
+ }
- @SuppressWarnings("unchecked")
- public final P getProcedure(Class
procClass) {
- return (P) (this.class_procedures.get(procClass));
- }
+ // PART 2: Wait for work
- public final Histogram getTransactionSuccessHistogram() {
- return (this.txnSuccess);
- }
+ // Sleep if there's nothing to do.
+ workloadState.stayAwake();
- public final Histogram getTransactionUnknownHistogram() {
- return (this.txnUnknown);
- }
+ Phase prePhase = workloadState.getCurrentPhase();
+ if (prePhase == null) {
+ continue;
+ }
- public final Histogram getTransactionRetryHistogram() {
- return (this.txnRetry);
- }
+ // Grab some work and update the state, in case it changed while we
+ // waited.
- public final Histogram getTransactionAbortHistogram() {
- return (this.txnAbort);
- }
+ SubmittedProcedure pieceOfWork = workloadState.fetchWork();
- public final Histogram getTransactionErrorHistogram() {
- return (this.txnErrors);
- }
+ prePhase = workloadState.getCurrentPhase();
+ if (prePhase == null) {
+ continue;
+ }
- public final Histogram getTransactionRetryDifferentHistogram() {
- return (this.txtRetryDifferent);
- }
+ preState = workloadState.getGlobalState();
- /**
- * Stop executing the current statement.
- */
- synchronized public void cancelStatement() {
- try {
- if (this.currStatement != null) {
- this.currStatement.cancel();
- }
- } catch (SQLException e) {
- LOG.error("Failed to cancel statement: {}", e.getMessage());
+ switch (preState) {
+ case DONE, EXIT, LATENCY_COMPLETE -> {
+ // Once a latency run is complete, we wait until the next
+ // phase or until DONE.
+ LOG.warn("preState is {}? will continue...", preState);
+ continue;
}
- }
+ default -> {}
+ // Do nothing
+ }
- @Override
- public final void run() {
- Thread t = Thread.currentThread();
- t.setName(this.toString());
+ // PART 3: Execute work
- // In case of reuse reset the measurements
- latencies = new LatencyRecord(workloadState.getTestStartNs());
+ TransactionType transactionType =
+ getTransactionType(pieceOfWork, prePhase, preState, workloadState);
- // Invoke initialize callback
- try {
- this.initialize();
- } catch (Throwable ex) {
- throw new RuntimeException("Unexpected error when initializing " + this, ex);
- }
+ if (!transactionType.equals(TransactionType.INVALID)) {
- // wait for start
- workloadState.blockForStart();
+ // TODO: Measuring latency when not rate limited is ... a little
+ // weird because if you add more simultaneous clients, you will
+ // increase latency (queue delay) but we do this anyway since it is
+ // useful sometimes
- while (true) {
+ // Wait before transaction if specified
+ long preExecutionWaitInMillis = getPreExecutionWaitInMillis(transactionType);
- // PART 1: Init and check if done
+ if (preExecutionWaitInMillis > 0) {
+ try {
+ LOG.debug(
+ "{} will sleep for {} ms before executing",
+ transactionType.getName(),
+ preExecutionWaitInMillis);
- State preState = workloadState.getGlobalState();
+ Thread.sleep(preExecutionWaitInMillis);
+ } catch (InterruptedException e) {
+ LOG.error("Pre-execution sleep interrupted", e);
+ }
+ }
- // Do nothing
- if (preState == State.DONE) {
- if (!seenDone) {
- // This is the first time we have observed that the
- // test is done notify the global test state, then
- // continue applying load
- seenDone = true;
- workloadState.signalDone();
- break;
- }
- }
+ long start = System.nanoTime();
- // PART 2: Wait for work
+ doWork(configuration.getDatabaseType(), transactionType);
- // Sleep if there's nothing to do.
- workloadState.stayAwake();
+ long end = System.nanoTime();
- Phase prePhase = workloadState.getCurrentPhase();
- if (prePhase == null) {
- continue;
- }
+ // PART 4: Record results
- // Grab some work and update the state, in case it changed while we
- // waited.
+ State postState = workloadState.getGlobalState();
- SubmittedProcedure pieceOfWork = workloadState.fetchWork();
+ switch (postState) {
+ case MEASURE:
+ // Non-serial measurement. Only measure if the state both
+ // before and after was MEASURE, and the phase hasn't
+ // changed, otherwise we're recording results for a query
+ // that either started during the warmup phase or ended
+ // after the timer went off.
+ Phase postPhase = workloadState.getCurrentPhase();
- prePhase = workloadState.getCurrentPhase();
- if (prePhase == null) {
- continue;
+ if (postPhase == null) {
+ // Need a null check on postPhase since current phase being null is used in
+ // WorkloadState
+ // and ThreadBench as the indication that the benchmark is over. However, there's a
+ // race
+ // condition with postState not being changed from MEASURE to DONE yet, so we entered
+ // the
+ // switch. In this scenario, just break from the switch.
+ break;
}
-
- preState = workloadState.getGlobalState();
-
- switch (preState) {
- case DONE, EXIT, LATENCY_COMPLETE -> {
- // Once a latency run is complete, we wait until the next
- // phase or until DONE.
- LOG.warn("preState is {}? will continue...", preState);
- continue;
- }
- default -> {
- }
- // Do nothing
+ if (preState == MEASURE && postPhase.getId() == prePhase.getId()) {
+ latencies.addLatency(transactionType.getId(), start, end, this.id, prePhase.getId());
+ intervalRequests.incrementAndGet();
}
-
- // PART 3: Execute work
-
- TransactionType transactionType = getTransactionType(pieceOfWork, prePhase, preState, workloadState);
-
- if (!transactionType.equals(TransactionType.INVALID)) {
-
- // TODO: Measuring latency when not rate limited is ... a little
- // weird because if you add more simultaneous clients, you will
- // increase latency (queue delay) but we do this anyway since it is
- // useful sometimes
-
- // Wait before transaction if specified
- long preExecutionWaitInMillis = getPreExecutionWaitInMillis(transactionType);
-
- if (preExecutionWaitInMillis > 0) {
- try {
- LOG.debug("{} will sleep for {} ms before executing", transactionType.getName(), preExecutionWaitInMillis);
-
- Thread.sleep(preExecutionWaitInMillis);
- } catch (InterruptedException e) {
- LOG.error("Pre-execution sleep interrupted", e);
- }
- }
-
- long start = System.nanoTime();
-
- doWork(configuration.getDatabaseType(), transactionType);
-
- long end = System.nanoTime();
-
- // PART 4: Record results
-
- State postState = workloadState.getGlobalState();
-
- switch (postState) {
- case MEASURE:
- // Non-serial measurement. Only measure if the state both
- // before and after was MEASURE, and the phase hasn't
- // changed, otherwise we're recording results for a query
- // that either started during the warmup phase or ended
- // after the timer went off.
- Phase postPhase = workloadState.getCurrentPhase();
-
- if (postPhase == null) {
- // Need a null check on postPhase since current phase being null is used in WorkloadState
- // and ThreadBench as the indication that the benchmark is over. However, there's a race
- // condition with postState not being changed from MEASURE to DONE yet, so we entered the
- // switch. In this scenario, just break from the switch.
- break;
- }
- if (preState == MEASURE && postPhase.getId() == prePhase.getId()) {
- latencies.addLatency(transactionType.getId(), start, end, this.id, prePhase.getId());
- intervalRequests.incrementAndGet();
- }
- if (prePhase.isLatencyRun()) {
- workloadState.startColdQuery();
- }
- break;
- case COLD_QUERY:
- // No recording for cold runs, but next time we will since
- // it'll be a hot run.
- if (preState == State.COLD_QUERY) {
- workloadState.startHotQuery();
- }
- break;
- default:
- // Do nothing
- }
-
-
- // wait after transaction if specified
- long postExecutionWaitInMillis = getPostExecutionWaitInMillis(transactionType);
-
- if (postExecutionWaitInMillis > 0) {
- try {
- LOG.debug("{} will sleep for {} ms after executing", transactionType.getName(), postExecutionWaitInMillis);
-
- Thread.sleep(postExecutionWaitInMillis);
- } catch (InterruptedException e) {
- LOG.error("Post-execution sleep interrupted", e);
- }
- }
+ if (prePhase.isLatencyRun()) {
+ workloadState.startColdQuery();
}
-
- workloadState.finishedWork();
+ break;
+ case COLD_QUERY:
+ // No recording for cold runs, but next time we will since
+ // it'll be a hot run.
+ if (preState == State.COLD_QUERY) {
+ workloadState.startHotQuery();
+ }
+ break;
+ default:
+ // Do nothing
}
- LOG.debug("worker calling teardown");
-
- tearDown();
- }
+ // wait after transaction if specified
+ long postExecutionWaitInMillis = getPostExecutionWaitInMillis(transactionType);
- private TransactionType getTransactionType(SubmittedProcedure pieceOfWork, Phase phase, State state, WorkloadState workloadState) {
- TransactionType type = TransactionType.INVALID;
+ if (postExecutionWaitInMillis > 0) {
+ try {
+ LOG.debug(
+ "{} will sleep for {} ms after executing",
+ transactionType.getName(),
+ postExecutionWaitInMillis);
- try {
- type = transactionTypes.getType(pieceOfWork.getType());
- } catch (IndexOutOfBoundsException e) {
- if (phase.isThroughputRun()) {
- LOG.error("Thread tried executing disabled phase!");
- throw e;
- }
- if (phase.getId() == workloadState.getCurrentPhase().getId()) {
- switch (state) {
- case WARMUP -> {
- // Don't quit yet: we haven't even begun!
- LOG.info("[Serial] Resetting serial for phase.");
- phase.resetSerial();
- }
- case COLD_QUERY, MEASURE -> {
- // The serial phase is over. Finish the run early.
- LOG.info("[Serial] Updating workload state to {}.", State.LATENCY_COMPLETE);
- workloadState.signalLatencyComplete();
- }
- default -> throw e;
- }
- }
+ Thread.sleep(postExecutionWaitInMillis);
+ } catch (InterruptedException e) {
+ LOG.error("Post-execution sleep interrupted", e);
+ }
}
+ }
- return type;
+ workloadState.finishedWork();
}
- /**
- * Called in a loop in the thread to exercise the system under test. Each
- * implementing worker should return the TransactionType handle that was
- * executed.
- *
- * @param databaseType TODO
- * @param transactionType TODO
- */
- protected final void doWork(DatabaseType databaseType, TransactionType transactionType) {
-
- try {
- int retryCount = 0;
- int maxRetryCount = configuration.getMaxRetries();
-
- while (retryCount < maxRetryCount && this.workloadState.getGlobalState() != State.DONE) {
-
- TransactionStatus status = TransactionStatus.UNKNOWN;
-
- if (this.conn == null) {
- try {
- this.conn = this.benchmark.makeConnection();
- this.conn.setAutoCommit(false);
- this.conn.setTransactionIsolation(this.configuration.getIsolationMode());
- } catch (SQLException ex) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("%s failed to open a connection...", this));
- }
- retryCount++;
- continue;
- }
- }
-
- try {
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("%s %s attempting...", this, transactionType));
- }
-
- status = this.executeWork(conn, transactionType);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("%s %s completed with status [%s]...", this, transactionType, status.name()));
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("%s %s committing...", this, transactionType));
- }
-
- conn.commit();
-
- break;
-
- } catch (UserAbortException ex) {
- conn.rollback();
+ LOG.debug("worker calling teardown");
+
+ tearDown();
+ }
+
+ private TransactionType getTransactionType(
+ SubmittedProcedure pieceOfWork, Phase phase, State state, WorkloadState workloadState) {
+ TransactionType type = TransactionType.INVALID;
+
+ try {
+ type = transactionTypes.getType(pieceOfWork.getType());
+ } catch (IndexOutOfBoundsException e) {
+ if (phase.isThroughputRun()) {
+ LOG.error("Thread tried executing disabled phase!");
+ throw e;
+ }
+ if (phase.getId() == workloadState.getCurrentPhase().getId()) {
+ switch (state) {
+ case WARMUP -> {
+ // Don't quit yet: we haven't even begun!
+ LOG.info("[Serial] Resetting serial for phase.");
+ phase.resetSerial();
+ }
+ case COLD_QUERY, MEASURE -> {
+ // The serial phase is over. Finish the run early.
+ LOG.info("[Serial] Updating workload state to {}.", State.LATENCY_COMPLETE);
+ workloadState.signalLatencyComplete();
+ }
+ default -> throw e;
+ }
+ }
+ }
- ABORT_LOG.debug(String.format("%s Aborted", transactionType), ex);
+ return type;
+ }
+
+ /**
+ * Called in a loop in the thread to exercise the system under test. Each implementing worker
+ * should return the TransactionType handle that was executed.
+ *
+ * @param databaseType TODO
+ * @param transactionType TODO
+ */
+ protected final void doWork(DatabaseType databaseType, TransactionType transactionType) {
+
+ try {
+ int retryCount = 0;
+ int maxRetryCount = configuration.getMaxRetries();
+
+ while (retryCount < maxRetryCount && this.workloadState.getGlobalState() != State.DONE) {
+
+ TransactionStatus status = TransactionStatus.UNKNOWN;
+
+ if (this.conn == null) {
+ try {
+ this.conn = this.benchmark.makeConnection();
+ this.conn.setAutoCommit(false);
+ this.conn.setTransactionIsolation(this.configuration.getIsolationMode());
+ } catch (SQLException ex) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("%s failed to open a connection...", this));
+ }
+ retryCount++;
+ continue;
+ }
+ }
- status = TransactionStatus.USER_ABORTED;
+ try {
- break;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("%s %s attempting...", this, transactionType));
+ }
- } catch (SQLException ex) {
- conn.rollback();
+ status = this.executeWork(conn, transactionType);
- if (isRetryable(ex)) {
- LOG.debug(String.format("Retryable SQLException occurred during [%s]... current retry attempt [%d], max retry attempts [%d], sql state [%s], error code [%d].", transactionType, retryCount, maxRetryCount, ex.getSQLState(), ex.getErrorCode()), ex);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ String.format(
+ "%s %s completed with status [%s]...", this, transactionType, status.name()));
+ }
- status = TransactionStatus.RETRY;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("%s %s committing...", this, transactionType));
+ }
- retryCount++;
- } else {
- LOG.warn(String.format("SQLException occurred during [%s] and will not be retried... sql state [%s], error code [%d].", transactionType, ex.getSQLState(), ex.getErrorCode()), ex);
+ conn.commit();
- status = TransactionStatus.ERROR;
+ break;
- break;
- }
+ } catch (UserAbortException ex) {
+ conn.rollback();
- } finally {
- if (this.configuration.getNewConnectionPerTxn() && this.conn != null) {
- try {
- this.conn.close();
- this.conn = null;
- } catch (SQLException e) {
- LOG.error("Connection couldn't be closed.", e);
- }
- }
+ ABORT_LOG.debug(String.format("%s Aborted", transactionType), ex);
- switch (status) {
- case UNKNOWN -> this.txnUnknown.put(transactionType);
- case SUCCESS -> this.txnSuccess.put(transactionType);
- case USER_ABORTED -> this.txnAbort.put(transactionType);
- case RETRY -> this.txnRetry.put(transactionType);
- case RETRY_DIFFERENT -> this.txtRetryDifferent.put(transactionType);
- case ERROR -> this.txnErrors.put(transactionType);
- }
+ status = TransactionStatus.USER_ABORTED;
- }
+ break;
- }
} catch (SQLException ex) {
- String msg = String.format("Unexpected SQLException in '%s' when executing '%s' on [%s]", this, transactionType, databaseType.name());
-
- throw new RuntimeException(msg, ex);
+ conn.rollback();
+
+ if (isRetryable(ex)) {
+ LOG.debug(
+ String.format(
+ "Retryable SQLException occurred during [%s]... current retry attempt [%d], max retry attempts [%d], sql state [%s], error code [%d].",
+ transactionType,
+ retryCount,
+ maxRetryCount,
+ ex.getSQLState(),
+ ex.getErrorCode()),
+ ex);
+
+ status = TransactionStatus.RETRY;
+
+ retryCount++;
+ } else {
+ LOG.warn(
+ String.format(
+ "SQLException occurred during [%s] and will not be retried... sql state [%s], error code [%d].",
+ transactionType, ex.getSQLState(), ex.getErrorCode()),
+ ex);
+
+ status = TransactionStatus.ERROR;
+
+ break;
+ }
+
+ } finally {
+ if (this.configuration.getNewConnectionPerTxn() && this.conn != null) {
+ try {
+ this.conn.close();
+ this.conn = null;
+ } catch (SQLException e) {
+ LOG.error("Connection couldn't be closed.", e);
+ }
+ }
+
+ switch (status) {
+ case UNKNOWN -> this.txnUnknown.put(transactionType);
+ case SUCCESS -> this.txnSuccess.put(transactionType);
+ case USER_ABORTED -> this.txnAbort.put(transactionType);
+ case RETRY -> this.txnRetry.put(transactionType);
+ case RETRY_DIFFERENT -> this.txtRetryDifferent.put(transactionType);
+ case ERROR -> this.txnErrors.put(transactionType);
+ }
}
-
+ }
+ } catch (SQLException ex) {
+ String msg =
+ String.format(
+ "Unexpected SQLException in '%s' when executing '%s' on [%s]",
+ this, transactionType, databaseType.name());
+
+ throw new RuntimeException(msg, ex);
}
+ }
- private boolean isRetryable(SQLException ex) {
+ private boolean isRetryable(SQLException ex) {
- String sqlState = ex.getSQLState();
- int errorCode = ex.getErrorCode();
+ String sqlState = ex.getSQLState();
+ int errorCode = ex.getErrorCode();
- LOG.debug("sql state [{}] and error code [{}]", sqlState, errorCode);
-
- if (sqlState == null) {
- return false;
- }
+ LOG.debug("sql state [{}] and error code [{}]", sqlState, errorCode);
- // ------------------
- // MYSQL: https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-error-sqlstates.html
- // ------------------
- if (errorCode == 1213 && sqlState.equals("40001")) {
- // MySQL ER_LOCK_DEADLOCK
- return true;
- } else if (errorCode == 1205 && sqlState.equals("40001")) {
- // MySQL ER_LOCK_WAIT_TIMEOUT
- return true;
- }
-
- // ------------------
- // POSTGRES: https://www.postgresql.org/docs/current/errcodes-appendix.html
- // ------------------
- // Postgres serialization_failure
- return errorCode == 0 && sqlState.equals("40001");
+ if (sqlState == null) {
+ return false;
}
- /**
- * Optional callback that can be used to initialize the Worker right before
- * the benchmark execution begins
- */
- protected void initialize() {
- // The default is to do nothing
- }
-
- /**
- * Invoke a single transaction for the given TransactionType
- *
- * @param conn TODO
- * @param txnType TODO
- * @return TODO
- * @throws UserAbortException TODO
- * @throws SQLException TODO
- */
- protected abstract TransactionStatus executeWork(Connection conn, TransactionType txnType) throws UserAbortException, SQLException;
-
- /**
- * Called at the end of the test to do any clean up that may be required.
- */
- public void tearDown() {
- if (!this.configuration.getNewConnectionPerTxn() && this.conn != null) {
- try {
- conn.close();
- } catch (SQLException e) {
- LOG.error("Connection couldn't be closed.", e);
- }
- }
+ // ------------------
+ // MYSQL:
+ // https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-error-sqlstates.html
+ // ------------------
+ if (errorCode == 1213 && sqlState.equals("40001")) {
+ // MySQL ER_LOCK_DEADLOCK
+ return true;
+ } else if (errorCode == 1205 && sqlState.equals("40001")) {
+ // MySQL ER_LOCK_WAIT_TIMEOUT
+ return true;
}
- public void initializeState() {
- this.workloadState = this.configuration.getWorkloadState();
+ // ------------------
+ // POSTGRES: https://www.postgresql.org/docs/current/errcodes-appendix.html
+ // ------------------
+ // Postgres serialization_failure
+ return errorCode == 0 && sqlState.equals("40001");
+ }
+
+ /**
+ * Optional callback that can be used to initialize the Worker right before the benchmark
+ * execution begins
+ */
+ protected void initialize() {
+ // The default is to do nothing
+ }
+
+ /**
+ * Invoke a single transaction for the given TransactionType
+ *
+ * @param conn TODO
+ * @param txnType TODO
+ * @return TODO
+ * @throws UserAbortException TODO
+ * @throws SQLException TODO
+ */
+ protected abstract TransactionStatus executeWork(Connection conn, TransactionType txnType)
+ throws UserAbortException, SQLException;
+
+ /** Called at the end of the test to do any clean up that may be required. */
+ public void tearDown() {
+ if (!this.configuration.getNewConnectionPerTxn() && this.conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ LOG.error("Connection couldn't be closed.", e);
+ }
}
+ }
- protected long getPreExecutionWaitInMillis(TransactionType type) {
- return 0;
- }
+ public void initializeState() {
+ this.workloadState = this.configuration.getWorkloadState();
+ }
- protected long getPostExecutionWaitInMillis(TransactionType type) {
- return 0;
- }
+ protected long getPreExecutionWaitInMillis(TransactionType type) {
+ return 0;
+ }
+ protected long getPostExecutionWaitInMillis(TransactionType type) {
+ return 0;
+ }
}
diff --git a/src/main/java/com/oltpbenchmark/api/collectors/CockroachCollector.java b/src/main/java/com/oltpbenchmark/api/collectors/CockroachCollector.java
index 6267a595b..7ca21c238 100644
--- a/src/main/java/com/oltpbenchmark/api/collectors/CockroachCollector.java
+++ b/src/main/java/com/oltpbenchmark/api/collectors/CockroachCollector.java
@@ -21,31 +21,30 @@
public class CockroachCollector extends DBCollector {
- private static final String VERSION_SQL = "SELECT version();";
-
- private static final String PARAMETERS_SQL = "SHOW ALL;";
-
- public CockroachCollector(String oriDBUrl, String username, String password) {
- try (Connection conn = DriverManager.getConnection(oriDBUrl, username, password)) {
- try (Statement s = conn.createStatement()) {
-
- // Collect DBMS version
- try (ResultSet out = s.executeQuery(VERSION_SQL)) {
- if (out.next()) {
- this.version = out.getString(1);
- }
- }
-
- // Collect DBMS parameters
- try (ResultSet out = s.executeQuery(PARAMETERS_SQL)) {
- while (out.next()) {
- dbParameters.put(out.getString("variable"), out.getString("value"));
- }
- }
-
- }
- } catch (SQLException e) {
- LOG.error("Error while collecting DB parameters: {}", e.getMessage());
+ private static final String VERSION_SQL = "SELECT version();";
+
+ private static final String PARAMETERS_SQL = "SHOW ALL;";
+
+ public CockroachCollector(String oriDBUrl, String username, String password) {
+ try (Connection conn = DriverManager.getConnection(oriDBUrl, username, password)) {
+ try (Statement s = conn.createStatement()) {
+
+ // Collect DBMS version
+ try (ResultSet out = s.executeQuery(VERSION_SQL)) {
+ if (out.next()) {
+ this.version = out.getString(1);
+ }
+ }
+
+ // Collect DBMS parameters
+ try (ResultSet out = s.executeQuery(PARAMETERS_SQL)) {
+ while (out.next()) {
+ dbParameters.put(out.getString("variable"), out.getString("value"));
+ }
}
+ }
+ } catch (SQLException e) {
+ LOG.error("Error while collecting DB parameters: {}", e.getMessage());
}
+ }
}
diff --git a/src/main/java/com/oltpbenchmark/api/collectors/DBCollector.java b/src/main/java/com/oltpbenchmark/api/collectors/DBCollector.java
index 0a50b2aab..12973e97f 100644
--- a/src/main/java/com/oltpbenchmark/api/collectors/DBCollector.java
+++ b/src/main/java/com/oltpbenchmark/api/collectors/DBCollector.java
@@ -18,46 +18,43 @@
package com.oltpbenchmark.api.collectors;
import com.oltpbenchmark.util.JSONUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.Map;
import java.util.TreeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class DBCollector implements DBParameterCollector {
- protected static final Logger LOG = LoggerFactory.getLogger(DBCollector.class);
-
- protected final Map dbParameters = new TreeMap<>();
-
- protected final Map dbMetrics = new TreeMap<>();
+ protected static final Logger LOG = LoggerFactory.getLogger(DBCollector.class);
- protected String version = null;
+ protected final Map dbParameters = new TreeMap<>();
- @Override
- public boolean hasParameters() {
- return (!dbParameters.isEmpty());
- }
+ protected final Map dbMetrics = new TreeMap<>();
- @Override
- public boolean hasMetrics() {
- return (!dbMetrics.isEmpty());
- }
+ protected String version = null;
- @Override
- public String collectParameters() {
- return JSONUtil.format(JSONUtil.toJSONString(dbParameters));
- }
+ @Override
+ public boolean hasParameters() {
+ return (!dbParameters.isEmpty());
+ }
- @Override
- public String collectMetrics() {
- return JSONUtil.format(JSONUtil.toJSONString(dbMetrics));
- }
+ @Override
+ public boolean hasMetrics() {
+ return (!dbMetrics.isEmpty());
+ }
- @Override
- public String collectVersion() {
- return version;
- }
+ @Override
+ public String collectParameters() {
+ return JSONUtil.format(JSONUtil.toJSONString(dbParameters));
+ }
+ @Override
+ public String collectMetrics() {
+ return JSONUtil.format(JSONUtil.toJSONString(dbMetrics));
+ }
+ @Override
+ public String collectVersion() {
+ return version;
+ }
}
diff --git a/src/main/java/com/oltpbenchmark/api/collectors/DBParameterCollector.java b/src/main/java/com/oltpbenchmark/api/collectors/DBParameterCollector.java
index 406dd739f..69a75294b 100644
--- a/src/main/java/com/oltpbenchmark/api/collectors/DBParameterCollector.java
+++ b/src/main/java/com/oltpbenchmark/api/collectors/DBParameterCollector.java
@@ -18,13 +18,13 @@
package com.oltpbenchmark.api.collectors;
public interface DBParameterCollector {
- boolean hasParameters();
+ boolean hasParameters();
- boolean hasMetrics();
+ boolean hasMetrics();
- String collectParameters();
+ String collectParameters();
- String collectMetrics();
+ String collectMetrics();
- String collectVersion();
+ String collectVersion();
}
diff --git a/src/main/java/com/oltpbenchmark/api/collectors/DBParameterCollectorGen.java b/src/main/java/com/oltpbenchmark/api/collectors/DBParameterCollectorGen.java
index e5be29d05..5734e4306 100644
--- a/src/main/java/com/oltpbenchmark/api/collectors/DBParameterCollectorGen.java
+++ b/src/main/java/com/oltpbenchmark/api/collectors/DBParameterCollectorGen.java
@@ -20,18 +20,18 @@
import com.oltpbenchmark.types.DatabaseType;
public class DBParameterCollectorGen {
- public static DBParameterCollector getCollector(DatabaseType dbType, String dbUrl, String username, String password) {
- switch (dbType) {
-
- case MYSQL:
- case MARIADB:
- return new MySQLCollector(dbUrl, username, password);
- case POSTGRES:
- return new PostgresCollector(dbUrl, username, password);
- case COCKROACHDB:
- return new CockroachCollector(dbUrl, username, password);
- default:
- return new DBCollector();
- }
+ public static DBParameterCollector getCollector(
+ DatabaseType dbType, String dbUrl, String username, String password) {
+ switch (dbType) {
+ case MYSQL:
+ case MARIADB:
+ return new MySQLCollector(dbUrl, username, password);
+ case POSTGRES:
+ return new PostgresCollector(dbUrl, username, password);
+ case COCKROACHDB:
+ return new CockroachCollector(dbUrl, username, password);
+ default:
+ return new DBCollector();
}
+ }
}
diff --git a/src/main/java/com/oltpbenchmark/api/collectors/MySQLCollector.java b/src/main/java/com/oltpbenchmark/api/collectors/MySQLCollector.java
index d377a062d..82c514ead 100644
--- a/src/main/java/com/oltpbenchmark/api/collectors/MySQLCollector.java
+++ b/src/main/java/com/oltpbenchmark/api/collectors/MySQLCollector.java
@@ -21,39 +21,39 @@
public class MySQLCollector extends DBCollector {
- private static final String VERSION_SQL = "SELECT @@GLOBAL.version;";
-
- private static final String PARAMETERS_SQL = "SHOW VARIABLES;";
-
- private static final String METRICS_SQL = "SHOW STATUS";
-
- public MySQLCollector(String oriDBUrl, String username, String password) {
- try (Connection conn = DriverManager.getConnection(oriDBUrl, username, password)) {
- try (Statement s = conn.createStatement()) {
-
- // Collect DBMS version
- try (ResultSet out = s.executeQuery(VERSION_SQL)) {
- if (out.next()) {
- this.version = out.getString(1);
- }
- }
-
- // Collect DBMS parameters
- try (ResultSet out = s.executeQuery(PARAMETERS_SQL)) {
- while (out.next()) {
- dbParameters.put(out.getString(1).toLowerCase(), out.getString(2));
- }
- }
-
- // Collect DBMS internal metrics
- try (ResultSet out = s.executeQuery(METRICS_SQL)) {
- while (out.next()) {
- dbMetrics.put(out.getString(1).toLowerCase(), out.getString(2));
- }
- }
- }
- } catch (SQLException e) {
- LOG.error("Error while collecting DB parameters: {}", e.getMessage());
+ private static final String VERSION_SQL = "SELECT @@GLOBAL.version;";
+
+ private static final String PARAMETERS_SQL = "SHOW VARIABLES;";
+
+ private static final String METRICS_SQL = "SHOW STATUS";
+
+ public MySQLCollector(String oriDBUrl, String username, String password) {
+ try (Connection conn = DriverManager.getConnection(oriDBUrl, username, password)) {
+ try (Statement s = conn.createStatement()) {
+
+ // Collect DBMS version
+ try (ResultSet out = s.executeQuery(VERSION_SQL)) {
+ if (out.next()) {
+ this.version = out.getString(1);
+ }
+ }
+
+ // Collect DBMS parameters
+ try (ResultSet out = s.executeQuery(PARAMETERS_SQL)) {
+ while (out.next()) {
+ dbParameters.put(out.getString(1).toLowerCase(), out.getString(2));
+ }
+ }
+
+ // Collect DBMS internal metrics
+ try (ResultSet out = s.executeQuery(METRICS_SQL)) {
+ while (out.next()) {
+ dbMetrics.put(out.getString(1).toLowerCase(), out.getString(2));
+ }
}
+ }
+ } catch (SQLException e) {
+ LOG.error("Error while collecting DB parameters: {}", e.getMessage());
}
+ }
}
diff --git a/src/main/java/com/oltpbenchmark/api/collectors/PostgresCollector.java b/src/main/java/com/oltpbenchmark/api/collectors/PostgresCollector.java
index add037d08..ae22dfecc 100644
--- a/src/main/java/com/oltpbenchmark/api/collectors/PostgresCollector.java
+++ b/src/main/java/com/oltpbenchmark/api/collectors/PostgresCollector.java
@@ -18,83 +18,82 @@
package com.oltpbenchmark.api.collectors;
import com.oltpbenchmark.util.JSONUtil;
-
import java.sql.*;
import java.util.*;
public class PostgresCollector extends DBCollector {
- private static final String VERSION_SQL = "SELECT version();";
-
- private static final String PARAMETERS_SQL = "SHOW ALL;";
-
- private static final String[] PG_STAT_VIEWS = {
- "pg_stat_archiver", "pg_stat_bgwriter", "pg_stat_database",
- "pg_stat_database_conflicts", "pg_stat_user_tables", "pg_statio_user_tables",
- "pg_stat_user_indexes", "pg_statio_user_indexes"
- };
-
- private final Map>> pgMetrics = new HashMap<>();
-
- public PostgresCollector(String oriDBUrl, String username, String password) {
-
- try (Connection conn = DriverManager.getConnection(oriDBUrl, username, password)) {
- try (Statement s = conn.createStatement()) {
-
- // Collect DBMS version
- try (ResultSet out = s.executeQuery(VERSION_SQL)) {
- if (out.next()) {
- this.version = out.getString(1);
- }
- }
-
- // Collect DBMS parameters
- try (ResultSet out = s.executeQuery(PARAMETERS_SQL)) {
- while (out.next()) {
- dbParameters.put(out.getString("name"), out.getString("setting"));
- }
- }
-
- // Collect DBMS internal metrics
- for (String viewName : PG_STAT_VIEWS) {
- try (ResultSet out = s.executeQuery("SELECT * FROM " + viewName)) {
- pgMetrics.put(viewName, getMetrics(out));
- } catch (SQLException ex) {
- LOG.error("Error while collecting DB metric view: {}", ex.getMessage());
- }
- }
- }
- } catch (SQLException e) {
- LOG.error("Error while collecting DB parameters: {}", e.getMessage());
- }
- }
+ private static final String VERSION_SQL = "SELECT version();";
- @Override
- public boolean hasMetrics() {
- return (!pgMetrics.isEmpty());
- }
+ private static final String PARAMETERS_SQL = "SHOW ALL;";
- @Override
- public String collectMetrics() {
- return JSONUtil.format(JSONUtil.toJSONString(pgMetrics));
- }
+ private static final String[] PG_STAT_VIEWS = {
+ "pg_stat_archiver", "pg_stat_bgwriter", "pg_stat_database",
+ "pg_stat_database_conflicts", "pg_stat_user_tables", "pg_statio_user_tables",
+ "pg_stat_user_indexes", "pg_statio_user_indexes"
+ };
+
+ private final Map>> pgMetrics = new HashMap<>();
+
+ public PostgresCollector(String oriDBUrl, String username, String password) {
- private List