Skip to content

Commit 6cf7ea0

Browse files
committed
workload_generator: add support for external DDL
Currently, the DDL generator relies on the presence of DDL CREATE statements within the debug zip. However, this dependency breaks when the debug zip is captured for a time range that does not include the creation of those objects. To address this limitation, this PR introduces support for supplying a schema dump as an alternative input source. Users can generate this dump by running: ``` cockroach sql --url='postgresql://<url>/<db name>' --execute="SHOW CREATE ALL TABLES;" > ddl_file.sql ``` The resulting file can then be provided to the generator via the `--ddl-file` flag. Epic: None Release note: None
1 parent 755c186 commit 6cf7ea0

File tree

3 files changed

+214
-8
lines changed

3 files changed

+214
-8
lines changed

pkg/workload/workload_generator/schema_generator.go

Lines changed: 161 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,22 @@ var (
9898
// TODO: The "anonymize" parameter is unused for now.
9999
func generateDDLs(
100100
zipDir,
101-
dbName string, anonymize bool,
101+
dbName, ddlFile string, anonymize bool,
102102
) (allSchemas map[string]*TableSchema, createStmts map[string]string, retErr error) {
103103

104+
if ddlFile != "" {
105+
// DDL file location is present. We will use this instead of the debug zip.
106+
f, err := os.Open(ddlFile)
107+
if err != nil {
108+
return nil, nil, errors.Wrap(err, "failed to open DDL file")
109+
}
110+
defer func() {
111+
if cerr := f.Close(); cerr != nil && retErr == nil {
112+
retErr = errors.Wrap(cerr, "failed to close input DDL file")
113+
}
114+
}()
115+
return generateDDLFromDDLFile(bufio.NewReader(f), dbName, anonymize)
116+
}
104117
f, err := openCreateStatementsTSV(zipDir)
105118
if err != nil {
106119
return nil, nil, errors.Wrap(err, "failed to open TSV file")
@@ -111,14 +124,158 @@ func generateDDLs(
111124
}
112125
}()
113126

114-
return generateDDLFromReader(bufio.NewReader(f), dbName, anonymize)
127+
return generateDDLFromCSV(bufio.NewReader(f), dbName, anonymize)
128+
}
129+
130+
// generateDDLFromDDLFile reads DDL statements from a SQL dump file
131+
// and returns a map of table names to their schemas and a map of
132+
// short table names to their CREATE TABLE statements.
133+
// The file can be generated by running the following:
134+
//
135+
// cockroach sql --url='postgresql://<url>/<db name>' --execute="SHOW CREATE ALL TABLES;" > ddl_file.sql
136+
func generateDDLFromDDLFile(
137+
reader *bufio.Reader, dbName string, anonymize bool,
138+
) (map[string]*TableSchema, map[string]string, error) {
139+
// the results are stored in these Maps.
140+
tableStatements := make(map[string]string)
141+
order := make([]string, 0)
142+
seen := make(map[string]struct{})
143+
144+
// Buffer accumulates the SQL statements
145+
var currentStmt strings.Builder
146+
// inStatement helps handling multi line statements
147+
inStatement := false
148+
149+
// The file is read line by line
150+
for {
151+
line, err := reader.ReadString('\n')
152+
if err != nil {
153+
if err.Error() == "EOF" {
154+
break
155+
}
156+
return nil, nil, errors.Wrap(err, "failed while reading SQL file")
157+
}
158+
159+
// Empty lines and comments are skipped
160+
trimmedLine := strings.TrimSpace(line)
161+
if !inStatement {
162+
// The generated statement has a quote at the start of the statement. This is trimmed.
163+
trimmedLine = strings.TrimLeft(trimmedLine, "\"")
164+
}
165+
if trimmedLine == "" || strings.HasPrefix(trimmedLine, "--") ||
166+
strings.HasPrefix(trimmedLine, "create_statement") {
167+
continue
168+
}
169+
170+
// A new statement is expected to start with CREATE TABLE.
171+
if strings.HasPrefix(strings.ToUpper(trimmedLine), "CREATE TABLE") {
172+
// If we were already in a statement, the previous statement is processed
173+
if inStatement {
174+
tableStatements, order, seen = processStatement(currentStmt.String(), tableStatements, order, seen, dbName)
175+
}
176+
177+
// A new statement is started.
178+
currentStmt.Reset()
179+
currentStmt.WriteString(trimmedLine)
180+
inStatement = true
181+
} else if strings.HasPrefix(strings.ToUpper(trimmedLine), "ALTER TABLE") {
182+
// If we were in a CREATE TABLE statement, the statement is processed
183+
if inStatement {
184+
tableStatements, order, seen = processStatement(currentStmt.String(), tableStatements, order, seen, dbName)
185+
}
186+
187+
// A new ALTER TABLE statement is started.
188+
currentStmt.Reset()
189+
currentStmt.WriteString(trimmedLine)
190+
inStatement = true
191+
} else if inStatement {
192+
if strings.HasSuffix(trimmedLine, ";\"") {
193+
// The generated statement has a quote at the end of the statement. This needs to be trimmed.
194+
trimmedLine = strings.TrimRight(trimmedLine, "\"")
195+
}
196+
// The current statement is accumulated.
197+
currentStmt.WriteString(trimmedLine)
198+
199+
// if the statement is complete (ends with semicolon or has closing parenthesis followed by options), it is processed.
200+
if strings.HasSuffix(trimmedLine, ";") ||
201+
(strings.Contains(trimmedLine, ");") && !strings.HasPrefix(trimmedLine, "--")) {
202+
tableStatements, order, seen = processStatement(currentStmt.String(), tableStatements, order, seen, dbName)
203+
inStatement = false
204+
}
205+
}
206+
}
207+
208+
// Any remaining statement is processed.
209+
if inStatement {
210+
tableStatements, order, _ = processStatement(currentStmt.String(), tableStatements, order, seen, dbName)
211+
}
212+
213+
return buildSchemas(order, tableStatements), buildCreateStmts(tableStatements), nil
214+
}
215+
216+
// processStatement processes a single SQL statement and adds it to the tableStatements map if it's a CREATE TABLE statement
217+
// It returns the updated tableStatements, order, and seen maps
218+
func processStatement(
219+
stmt string,
220+
tableStatements map[string]string,
221+
order []string,
222+
seen map[string]struct{},
223+
dbName string,
224+
) (map[string]string, []string, map[string]struct{}) {
225+
// Only process CREATE TABLE statements
226+
if !strings.HasPrefix(strings.ToUpper(strings.TrimSpace(stmt)), "CREATE TABLE") {
227+
return tableStatements, order, seen
228+
}
229+
230+
// Extract the table name using the tablePattern regex
231+
tableMatch := tablePattern.FindStringSubmatch(stmt)
232+
if tableMatch == nil {
233+
return tableStatements, order, seen
234+
}
235+
236+
// Extract and normalize the table name
237+
tableName := tableMatch[1]
238+
parts := strings.Split(tableName, ".")
239+
for i := range parts {
240+
parts[i] = strings.Trim(parts[i], `"`) // Remove quotes from parts
241+
}
242+
243+
// If the table name doesn't have a schema, assume it's "public"
244+
var schemaName string
245+
var simpleTableName string
246+
247+
if len(parts) == 1 {
248+
schemaName = "public"
249+
simpleTableName = parts[0]
250+
} else if len(parts) == 2 {
251+
schemaName = parts[0]
252+
simpleTableName = parts[1]
253+
} else {
254+
// Skip tables with more complex names
255+
return tableStatements, order, seen
256+
}
257+
258+
// Create a regex for the schema name
259+
schemaPattern := regexp.MustCompile(`\b` + regexp.QuoteMeta(schemaName) + `\.`)
260+
261+
// Process the DDL record
262+
fullTableName, statement := processDDLRecord(dbName, schemaName, simpleTableName, stmt, schemaPattern)
263+
264+
// Add to the maps if not seen before
265+
if _, ok := seen[fullTableName]; !ok && fullTableName != "" {
266+
tableStatements[fullTableName] = statement
267+
order = append(order, fullTableName)
268+
seen[fullTableName] = struct{}{}
269+
}
270+
271+
return tableStatements, order, seen
115272
}
116273

117-
// generateDDLFromReader takes a reader for a TSV file containing DDL statements,
274+
// generateDDLFromCSV takes a reader for a TSV file containing DDL statements,
118275
// parses the statements, and returns a map of table names to their schemas
119276
// and a map of short table names to their CREATE TABLE statements.
120277
// It has been deigned this way to maek it unit-testable
121-
func generateDDLFromReader(
278+
func generateDDLFromCSV(
122279
r io.Reader, dbName string, anonymize bool,
123280
) (map[string]*TableSchema, map[string]string, error) {
124281
reader := csv.NewReader(r)

pkg/workload/workload_generator/schema_generator_test.go

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package workload_generator
99

1010
import (
11+
"bufio"
1112
_ "embed"
1213
"strings"
1314
"testing"
@@ -115,13 +116,57 @@ var data string
115116

116117
func TestGenerateDDLsIntegration(t *testing.T) {
117118
t.Run("expect success", func(t *testing.T) {
118-
schemas, stmts, err := generateDDLFromReader(strings.NewReader(data), testDBName, false)
119+
schemas, stmts, err := generateDDLFromCSV(strings.NewReader(data), testDBName, false)
119120
assert.NoError(t, err)
120121
assert.NotEmpty(t, schemas)
121122
assert.NotEmpty(t, stmts)
122123
})
123124
t.Run("expect failure due to invalid file location", func(t *testing.T) {
124-
_, _, err := generateDDLs("wrong_file_location", testDBName, false)
125+
_, _, err := generateDDLs("wrong_file_location", testDBName, "", false)
125126
assert.NotNil(t, err)
126127
})
127128
}
129+
130+
func TestGenerateDDLFromDDLFile(t *testing.T) {
131+
// Sample DDL statements for testing
132+
sampleDDL := `"CREATE TABLE public.users (
133+
id INT8 NOT NULL,
134+
name VARCHAR(50) NOT NULL,
135+
email VARCHAR(100) UNIQUE,
136+
CONSTRAINT users_pkey PRIMARY KEY (id)
137+
);"
138+
139+
CREATE TABLE public.orders (
140+
order_id INT8 NOT NULL,
141+
user_id INT8 NOT NULL,
142+
amount DECIMAL(10,2) NOT NULL,
143+
CONSTRAINT orders_pkey PRIMARY KEY (order_id)
144+
);
145+
146+
ALTER TABLE public.orders ADD CONSTRAINT orders_user_id_fkey FOREIGN KEY (user_id) REFERENCES public.users(id);
147+
`
148+
149+
t.Run("expect success with DDL file", func(t *testing.T) {
150+
reader := strings.NewReader(sampleDDL)
151+
schemas, stmts, err := generateDDLFromDDLFile(bufio.NewReader(reader), testDBName, false)
152+
assert.NoError(t, err)
153+
assert.NotEmpty(t, schemas)
154+
assert.NotEmpty(t, stmts)
155+
156+
// Verify that both tables were parsed correctly
157+
assert.Contains(t, schemas, "users")
158+
assert.Contains(t, schemas, "orders")
159+
160+
// Verify that the users table has the correct columns
161+
usersSchema := schemas["users"]
162+
assert.Contains(t, usersSchema.Columns, "id")
163+
assert.Contains(t, usersSchema.Columns, "name")
164+
assert.Contains(t, usersSchema.Columns, "email")
165+
166+
// Verify that the orders table has the correct columns
167+
ordersSchema := schemas["orders"]
168+
assert.Contains(t, ordersSchema.Columns, "order_id")
169+
assert.Contains(t, ordersSchema.Columns, "user_id")
170+
assert.Contains(t, ordersSchema.Columns, "amount")
171+
})
172+
}

pkg/workload/workload_generator/workload.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ var workloadGeneratorMeta = workload.Meta{
104104
g.flags.FlagSet = pflag.NewFlagSet("db_workload", pflag.ContinueOnError)
105105
g.flags.StringVar(&g.debugLogsLocation, "debug-logs", "",
106106
"Path to unzipped debug logs directory.")
107+
g.flags.StringVar(&g.ddlFile, "ddl-file", "",
108+
"The File containing the DDL. If this is provided, the DDL is read from this file instead of the debug logs."+
109+
" To generate the file, execute: \n"+
110+
"cockroach sql --url='postgresql://<url>/<db name>' --execute=\"SHOW CREATE ALL TABLES;\" > ddl_file.sql")
107111
g.flags.IntVar(&g.rowCount, "rows", 1000,
108112
"Base row count for tables without foreign keys; other tables scale by foreign key depth/fanout.")
109113
g.flags.StringVar(&g.inputYAML, "input-yaml", "",
@@ -126,6 +130,7 @@ type workloadGenerator struct {
126130
flags workload.Flags
127131
connFlags *workload.ConnFlags
128132
debugLogsLocation string // path to the unzipped debug zip file
133+
ddlFile string // path to the DDL file.
129134
dbName string // database name to use for the workload
130135
rowCount int // base number of rows per table before FK‐depth scaling
131136

@@ -200,7 +205,7 @@ func (w *workloadGenerator) initializeGenerator() error {
200205
w.dbName = dbName
201206

202207
// 2) Parsing DDLs out of the debug logs.
203-
schemas, stmts, err := generateDDLs(w.debugLogsLocation, w.dbName, false)
208+
schemas, stmts, err := generateDDLs(w.debugLogsLocation, w.dbName, w.ddlFile, false)
204209
if err != nil {
205210
return errors.Wrap(err, "failed to generate DDLs from debug logs")
206211
}
@@ -592,7 +597,6 @@ func (w *workloadGenerator) initGenerators(db *gosql.DB) error {
592597
maxRows = tblBlock.Count
593598
}
594599
}
595-
// TODO: make the globalBatchNumber dynamic
596600
globalNumBatches := (maxRows + baseBatchSize - 1) / baseBatchSize
597601

598602
// 1) The generator + empty cache for every table.col is built.

0 commit comments

Comments
 (0)