Skip to content

Commit 5d15bbf

Browse files
craig[bot]nameisbhaskar
andcommitted
Merge #153521
153521: workload_generator: add support for external DDL r=shailendra-patel a=nameisbhaskar Currently, the DDL generator relies on the presence of DDL CREATE statements within the debug zip. However, this dependency breaks when the debug zip is captured for a time range that does not include the creation of those objects. To address this limitation, this PR introduces support for supplying a schema dump as an alternative input source. Users can generate this dump by running: ``` cockroach sql --url='postgresql://<url>/<db name>' --execute="SHOW CREATE ALL TABLES;" > ddl_file.sql ``` The resulting file can then be provided to the generator via the `--ddl-file` flag. Epic: None Release note: None Co-authored-by: Bhaskarjyoti Bora <[email protected]>
2 parents d5d6132 + 6cf7ea0 commit 5d15bbf

File tree

3 files changed

+214
-8
lines changed

3 files changed

+214
-8
lines changed

pkg/workload/workload_generator/schema_generator.go

Lines changed: 161 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,22 @@ var (
9898
// TODO: The "anonymize" parameter is unused for now.
9999
func generateDDLs(
100100
zipDir,
101-
dbName string, anonymize bool,
101+
dbName, ddlFile string, anonymize bool,
102102
) (allSchemas map[string]*TableSchema, createStmts map[string]string, retErr error) {
103103

104+
if ddlFile != "" {
105+
// DDL file location is present. We will use this instead of the debug zip.
106+
f, err := os.Open(ddlFile)
107+
if err != nil {
108+
return nil, nil, errors.Wrap(err, "failed to open DDL file")
109+
}
110+
defer func() {
111+
if cerr := f.Close(); cerr != nil && retErr == nil {
112+
retErr = errors.Wrap(cerr, "failed to close input DDL file")
113+
}
114+
}()
115+
return generateDDLFromDDLFile(bufio.NewReader(f), dbName, anonymize)
116+
}
104117
f, err := openCreateStatementsTSV(zipDir)
105118
if err != nil {
106119
return nil, nil, errors.Wrap(err, "failed to open TSV file")
@@ -111,14 +124,158 @@ func generateDDLs(
111124
}
112125
}()
113126

114-
return generateDDLFromReader(bufio.NewReader(f), dbName, anonymize)
127+
return generateDDLFromCSV(bufio.NewReader(f), dbName, anonymize)
128+
}
129+
130+
// generateDDLFromDDLFile reads DDL statements from a SQL dump file
131+
// and returns a map of table names to their schemas and a map of
132+
// short table names to their CREATE TABLE statements.
133+
// The file can be generated by running the following:
134+
//
135+
// cockroach sql --url='postgresql://<url>/<db name>' --execute="SHOW CREATE ALL TABLES;" > ddl_file.sql
136+
func generateDDLFromDDLFile(
137+
reader *bufio.Reader, dbName string, anonymize bool,
138+
) (map[string]*TableSchema, map[string]string, error) {
139+
// the results are stored in these Maps.
140+
tableStatements := make(map[string]string)
141+
order := make([]string, 0)
142+
seen := make(map[string]struct{})
143+
144+
// Buffer accumulates the SQL statements
145+
var currentStmt strings.Builder
146+
// inStatement helps handling multi line statements
147+
inStatement := false
148+
149+
// The file is read line by line
150+
for {
151+
line, err := reader.ReadString('\n')
152+
if err != nil {
153+
if err.Error() == "EOF" {
154+
break
155+
}
156+
return nil, nil, errors.Wrap(err, "failed while reading SQL file")
157+
}
158+
159+
// Empty lines and comments are skipped
160+
trimmedLine := strings.TrimSpace(line)
161+
if !inStatement {
162+
// The generated statement has a quote at the start of the statement. This is trimmed.
163+
trimmedLine = strings.TrimLeft(trimmedLine, "\"")
164+
}
165+
if trimmedLine == "" || strings.HasPrefix(trimmedLine, "--") ||
166+
strings.HasPrefix(trimmedLine, "create_statement") {
167+
continue
168+
}
169+
170+
// A new statement is expected to start with CREATE TABLE.
171+
if strings.HasPrefix(strings.ToUpper(trimmedLine), "CREATE TABLE") {
172+
// If we were already in a statement, the previous statement is processed
173+
if inStatement {
174+
tableStatements, order, seen = processStatement(currentStmt.String(), tableStatements, order, seen, dbName)
175+
}
176+
177+
// A new statement is started.
178+
currentStmt.Reset()
179+
currentStmt.WriteString(trimmedLine)
180+
inStatement = true
181+
} else if strings.HasPrefix(strings.ToUpper(trimmedLine), "ALTER TABLE") {
182+
// If we were in a CREATE TABLE statement, the statement is processed
183+
if inStatement {
184+
tableStatements, order, seen = processStatement(currentStmt.String(), tableStatements, order, seen, dbName)
185+
}
186+
187+
// A new ALTER TABLE statement is started.
188+
currentStmt.Reset()
189+
currentStmt.WriteString(trimmedLine)
190+
inStatement = true
191+
} else if inStatement {
192+
if strings.HasSuffix(trimmedLine, ";\"") {
193+
// The generated statement has a quote at the end of the statement. This needs to be trimmed.
194+
trimmedLine = strings.TrimRight(trimmedLine, "\"")
195+
}
196+
// The current statement is accumulated.
197+
currentStmt.WriteString(trimmedLine)
198+
199+
// if the statement is complete (ends with semicolon or has closing parenthesis followed by options), it is processed.
200+
if strings.HasSuffix(trimmedLine, ";") ||
201+
(strings.Contains(trimmedLine, ");") && !strings.HasPrefix(trimmedLine, "--")) {
202+
tableStatements, order, seen = processStatement(currentStmt.String(), tableStatements, order, seen, dbName)
203+
inStatement = false
204+
}
205+
}
206+
}
207+
208+
// Any remaining statement is processed.
209+
if inStatement {
210+
tableStatements, order, _ = processStatement(currentStmt.String(), tableStatements, order, seen, dbName)
211+
}
212+
213+
return buildSchemas(order, tableStatements), buildCreateStmts(tableStatements), nil
214+
}
215+
216+
// processStatement processes a single SQL statement and adds it to the tableStatements map if it's a CREATE TABLE statement
217+
// It returns the updated tableStatements, order, and seen maps
218+
func processStatement(
219+
stmt string,
220+
tableStatements map[string]string,
221+
order []string,
222+
seen map[string]struct{},
223+
dbName string,
224+
) (map[string]string, []string, map[string]struct{}) {
225+
// Only process CREATE TABLE statements
226+
if !strings.HasPrefix(strings.ToUpper(strings.TrimSpace(stmt)), "CREATE TABLE") {
227+
return tableStatements, order, seen
228+
}
229+
230+
// Extract the table name using the tablePattern regex
231+
tableMatch := tablePattern.FindStringSubmatch(stmt)
232+
if tableMatch == nil {
233+
return tableStatements, order, seen
234+
}
235+
236+
// Extract and normalize the table name
237+
tableName := tableMatch[1]
238+
parts := strings.Split(tableName, ".")
239+
for i := range parts {
240+
parts[i] = strings.Trim(parts[i], `"`) // Remove quotes from parts
241+
}
242+
243+
// If the table name doesn't have a schema, assume it's "public"
244+
var schemaName string
245+
var simpleTableName string
246+
247+
if len(parts) == 1 {
248+
schemaName = "public"
249+
simpleTableName = parts[0]
250+
} else if len(parts) == 2 {
251+
schemaName = parts[0]
252+
simpleTableName = parts[1]
253+
} else {
254+
// Skip tables with more complex names
255+
return tableStatements, order, seen
256+
}
257+
258+
// Create a regex for the schema name
259+
schemaPattern := regexp.MustCompile(`\b` + regexp.QuoteMeta(schemaName) + `\.`)
260+
261+
// Process the DDL record
262+
fullTableName, statement := processDDLRecord(dbName, schemaName, simpleTableName, stmt, schemaPattern)
263+
264+
// Add to the maps if not seen before
265+
if _, ok := seen[fullTableName]; !ok && fullTableName != "" {
266+
tableStatements[fullTableName] = statement
267+
order = append(order, fullTableName)
268+
seen[fullTableName] = struct{}{}
269+
}
270+
271+
return tableStatements, order, seen
115272
}
116273

117-
// generateDDLFromReader takes a reader for a TSV file containing DDL statements,
274+
// generateDDLFromCSV takes a reader for a TSV file containing DDL statements,
118275
// parses the statements, and returns a map of table names to their schemas
119276
// and a map of short table names to their CREATE TABLE statements.
120277
// It has been deigned this way to maek it unit-testable
121-
func generateDDLFromReader(
278+
func generateDDLFromCSV(
122279
r io.Reader, dbName string, anonymize bool,
123280
) (map[string]*TableSchema, map[string]string, error) {
124281
reader := csv.NewReader(r)

pkg/workload/workload_generator/schema_generator_test.go

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package workload_generator
99

1010
import (
11+
"bufio"
1112
_ "embed"
1213
"strings"
1314
"testing"
@@ -115,13 +116,57 @@ var data string
115116

116117
func TestGenerateDDLsIntegration(t *testing.T) {
117118
t.Run("expect success", func(t *testing.T) {
118-
schemas, stmts, err := generateDDLFromReader(strings.NewReader(data), testDBName, false)
119+
schemas, stmts, err := generateDDLFromCSV(strings.NewReader(data), testDBName, false)
119120
assert.NoError(t, err)
120121
assert.NotEmpty(t, schemas)
121122
assert.NotEmpty(t, stmts)
122123
})
123124
t.Run("expect failure due to invalid file location", func(t *testing.T) {
124-
_, _, err := generateDDLs("wrong_file_location", testDBName, false)
125+
_, _, err := generateDDLs("wrong_file_location", testDBName, "", false)
125126
assert.NotNil(t, err)
126127
})
127128
}
129+
130+
func TestGenerateDDLFromDDLFile(t *testing.T) {
131+
// Sample DDL statements for testing
132+
sampleDDL := `"CREATE TABLE public.users (
133+
id INT8 NOT NULL,
134+
name VARCHAR(50) NOT NULL,
135+
email VARCHAR(100) UNIQUE,
136+
CONSTRAINT users_pkey PRIMARY KEY (id)
137+
);"
138+
139+
CREATE TABLE public.orders (
140+
order_id INT8 NOT NULL,
141+
user_id INT8 NOT NULL,
142+
amount DECIMAL(10,2) NOT NULL,
143+
CONSTRAINT orders_pkey PRIMARY KEY (order_id)
144+
);
145+
146+
ALTER TABLE public.orders ADD CONSTRAINT orders_user_id_fkey FOREIGN KEY (user_id) REFERENCES public.users(id);
147+
`
148+
149+
t.Run("expect success with DDL file", func(t *testing.T) {
150+
reader := strings.NewReader(sampleDDL)
151+
schemas, stmts, err := generateDDLFromDDLFile(bufio.NewReader(reader), testDBName, false)
152+
assert.NoError(t, err)
153+
assert.NotEmpty(t, schemas)
154+
assert.NotEmpty(t, stmts)
155+
156+
// Verify that both tables were parsed correctly
157+
assert.Contains(t, schemas, "users")
158+
assert.Contains(t, schemas, "orders")
159+
160+
// Verify that the users table has the correct columns
161+
usersSchema := schemas["users"]
162+
assert.Contains(t, usersSchema.Columns, "id")
163+
assert.Contains(t, usersSchema.Columns, "name")
164+
assert.Contains(t, usersSchema.Columns, "email")
165+
166+
// Verify that the orders table has the correct columns
167+
ordersSchema := schemas["orders"]
168+
assert.Contains(t, ordersSchema.Columns, "order_id")
169+
assert.Contains(t, ordersSchema.Columns, "user_id")
170+
assert.Contains(t, ordersSchema.Columns, "amount")
171+
})
172+
}

pkg/workload/workload_generator/workload.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ var workloadGeneratorMeta = workload.Meta{
104104
g.flags.FlagSet = pflag.NewFlagSet("db_workload", pflag.ContinueOnError)
105105
g.flags.StringVar(&g.debugLogsLocation, "debug-logs", "",
106106
"Path to unzipped debug logs directory.")
107+
g.flags.StringVar(&g.ddlFile, "ddl-file", "",
108+
"The File containing the DDL. If this is provided, the DDL is read from this file instead of the debug logs."+
109+
" To generate the file, execute: \n"+
110+
"cockroach sql --url='postgresql://<url>/<db name>' --execute=\"SHOW CREATE ALL TABLES;\" > ddl_file.sql")
107111
g.flags.IntVar(&g.rowCount, "rows", 1000,
108112
"Base row count for tables without foreign keys; other tables scale by foreign key depth/fanout.")
109113
g.flags.StringVar(&g.inputYAML, "input-yaml", "",
@@ -126,6 +130,7 @@ type workloadGenerator struct {
126130
flags workload.Flags
127131
connFlags *workload.ConnFlags
128132
debugLogsLocation string // path to the unzipped debug zip file
133+
ddlFile string // path to the DDL file.
129134
dbName string // database name to use for the workload
130135
rowCount int // base number of rows per table before FK‐depth scaling
131136

@@ -200,7 +205,7 @@ func (w *workloadGenerator) initializeGenerator() error {
200205
w.dbName = dbName
201206

202207
// 2) Parsing DDLs out of the debug logs.
203-
schemas, stmts, err := generateDDLs(w.debugLogsLocation, w.dbName, false)
208+
schemas, stmts, err := generateDDLs(w.debugLogsLocation, w.dbName, w.ddlFile, false)
204209
if err != nil {
205210
return errors.Wrap(err, "failed to generate DDLs from debug logs")
206211
}
@@ -592,7 +597,6 @@ func (w *workloadGenerator) initGenerators(db *gosql.DB) error {
592597
maxRows = tblBlock.Count
593598
}
594599
}
595-
// TODO: make the globalBatchNumber dynamic
596600
globalNumBatches := (maxRows + baseBatchSize - 1) / baseBatchSize
597601

598602
// 1) The generator + empty cache for every table.col is built.

0 commit comments

Comments
 (0)