Skip to content

Commit a3835d3

Browse files
craig[bot]spilchen
andcommitted
Merge #148569
148569: workload/schemachange: simplify FK validation for generated columns r=spilchen a=spilchen FK validation logic never handled generated child columns, an oversight since #124851. The random schema workload wasn't updated accordingly. We now conservatively treat any generated column in an FK constraint as a potential violation. Existing handling for generated parent columns was removed for consistency and simplicity. Fixes #147596 Release note: none Epic: none Co-authored-by: Matt Spilchen <[email protected]>
2 parents 60cd2b1 + c4bd5f8 commit a3835d3

File tree

2 files changed

+48
-56
lines changed

2 files changed

+48
-56
lines changed

pkg/workload/schemachange/error_screening.go

Lines changed: 42 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1033,22 +1033,29 @@ func (og *operationGenerator) violatesFkConstraints(
10331033
tableName *tree.TableName,
10341034
nonGeneratedColNames []tree.Name,
10351035
rows [][]string,
1036-
) (bool, error) {
1036+
) (expectedViolation, potentialViolation bool, err error) {
10371037
// TODO(annie): readd the join on active constraints once #120702 is resolved.
10381038
//
10391039
// N.B. We add random noise to column names that makes it hard to just directly call on these names. This is
10401040
// not the case with table/schema names; thus, only column names are quote_ident'ed to ensure that they get
10411041
// referenced properly.
10421042
fkConstraints, err := og.scanStringArrayRows(ctx, tx, fmt.Sprintf(`
1043-
SELECT array[parent.table_schema, parent.table_name, parent.column_name, child.column_name]
1043+
SELECT array[
1044+
parent.table_schema,
1045+
parent.table_name,
1046+
parent.column_name,
1047+
parent.is_generated,
1048+
child.column_name,
1049+
child.is_generated
1050+
]
10441051
FROM (
10451052
SELECT conname, conkey, confkey, conrelid, confrelid
10461053
FROM pg_constraint
10471054
WHERE contype = 'f'
10481055
AND conrelid = '%s'::REGCLASS::INT8
10491056
) AS con
10501057
JOIN (
1051-
SELECT column_name, ordinal_position, column_default
1058+
SELECT column_name, ordinal_position, column_default, is_generated
10521059
FROM information_schema.columns
10531060
WHERE table_schema = '%s'
10541061
AND table_name = '%s'
@@ -1058,7 +1065,8 @@ func (og *operationGenerator) violatesFkConstraints(
10581065
cols.table_schema,
10591066
cols.table_name,
10601067
cols.column_name,
1061-
cols.ordinal_position
1068+
cols.ordinal_position,
1069+
cols.is_generated
10621070
FROM pg_class AS pc
10631071
JOIN pg_namespace AS pn ON pc.relnamespace = pn.oid
10641072
JOIN information_schema.columns AS cols ON (pc.relname = cols.table_name AND pn.nspname = cols.table_schema)
@@ -1069,59 +1077,71 @@ func (og *operationGenerator) violatesFkConstraints(
10691077
WHERE child.column_name != 'rowid';
10701078
`, tableName.String(), tableName.Schema(), tableName.Object()))
10711079
if err != nil {
1072-
return false, og.checkAndAdjustForUnknownSchemaErrors(err)
1080+
return false, false, og.checkAndAdjustForUnknownSchemaErrors(err)
10731081
}
10741082

1075-
// Maps a column name to its index. This way, the value of a column in a row can be looked up
1076-
// using row[colToIndexMap["columnName"]] = "valueForColumn"
1077-
columnNameToIndexMap := map[tree.Name]int{}
1083+
// Maps a non-generated column name to its index. This way, the value of a
1084+
// column in a row can be looked up using row[colToIndexMap["columnName"]] = "valueForColumn"
1085+
nonGeneratedColumnNameToIndexMap := map[tree.Name]int{}
10781086

10791087
for i, name := range nonGeneratedColNames {
1080-
columnNameToIndexMap[name] = i
1088+
nonGeneratedColumnNameToIndexMap[name] = i
10811089
}
10821090

10831091
for _, row := range rows {
10841092
for _, constraint := range fkConstraints {
10851093
parentTableSchema := tree.Name(constraint[0])
10861094
parentTableName := tree.Name(constraint[1])
10871095
parentColumnName := tree.Name(constraint[2])
1088-
childColumnName := tree.Name(constraint[3])
1096+
parentIsGenerated := constraint[3] == "ALWAYS"
1097+
childColumnName := tree.Name(constraint[4])
1098+
childIsGenerated := constraint[5] == "ALWAYS"
10891099

10901100
// If self referential, there cannot be a violation.
10911101
parentAndChildAreSame := parentTableSchema == tableName.SchemaName && parentTableName == tableName.ObjectName
10921102
if parentAndChildAreSame && parentColumnName == childColumnName {
10931103
continue
10941104
}
10951105

1106+
// If the foreign key involves any generated columns, skip detailed FK checking
1107+
// and conservatively assume a violation may occur. This avoids the complexity
1108+
// of reasoning about values that are automatically computed by the database.
1109+
if parentIsGenerated || childIsGenerated {
1110+
return false, true, nil
1111+
}
1112+
10961113
violation, err := og.violatesFkConstraintsHelper(
1097-
ctx, tx, columnNameToIndexMap, parentTableSchema, parentTableName, parentColumnName, childColumnName, tableName, parentAndChildAreSame, row, rows,
1114+
ctx, tx, nonGeneratedColumnNameToIndexMap, parentTableSchema, parentTableName, parentColumnName, childColumnName, tableName, parentAndChildAreSame, row, rows,
10981115
)
10991116
if err != nil {
1100-
return false, err
1117+
return false, false, err
11011118
}
11021119
if violation {
1103-
return true, nil
1120+
return true, false, nil
11041121
}
11051122
}
11061123
}
11071124

1108-
return false, nil
1125+
return false, false, nil
11091126
}
11101127

1111-
// violatesFkConstraintsHelper checks if a single row will violate a foreign key constraint
1112-
// between the childColumn and parentColumn.
1128+
// violatesFkConstraintsHelper checks if inserting a single row would violate a
1129+
// foreign key constraint between the given child and parent columns.
1130+
//
1131+
// This function assumes that neither the parent nor child columns are generated.
1132+
// The caller must ensure this; generated columns are not supported.
11131133
func (og *operationGenerator) violatesFkConstraintsHelper(
11141134
ctx context.Context,
11151135
tx pgx.Tx,
1116-
columnNameToIndexMap map[tree.Name]int,
1136+
nonGeneratedColumnNameToIndexMap map[tree.Name]int,
11171137
parentTableSchema, parentTableName, parentColumn, childColumn tree.Name,
11181138
childTableName *tree.TableName,
11191139
parentAndChildAreSameTable bool,
11201140
rowToInsert []string,
11211141
allRows [][]string,
11221142
) (bool, error) {
11231143

1124-
childIndex, ok := columnNameToIndexMap[childColumn]
1144+
childIndex, ok := nonGeneratedColumnNameToIndexMap[childColumn]
11251145
if !ok {
11261146
return false, errors.Newf("child column %s does not exist in table %s", childColumn, childTableName)
11271147
}
@@ -1134,41 +1154,12 @@ func (og *operationGenerator) violatesFkConstraintsHelper(
11341154
// insert may satisfy the same constraint.
11351155
var parentAndChildSameQueryColumns []string
11361156
if parentAndChildAreSameTable {
1137-
colsInfo, err := og.getTableColumns(ctx, tx, childTableName, false)
1138-
if err != nil {
1139-
return false, err
1140-
}
1141-
1142-
var parentColInfo *column
1143-
for _, colInfo := range colsInfo {
1144-
if colInfo.name == parentColumn {
1145-
parentColInfo = &colInfo
1146-
break
1147-
}
1148-
}
1149-
if parentColInfo == nil {
1150-
return false, errors.Newf("column %s not found in columns for %s", parentColumn, childTableName)
1151-
}
1152-
11531157
for _, otherRow := range allRows {
1154-
var parentValueInSameInsert string
1155-
if parentColInfo.generated {
1156-
// If the parent column is a computed column, spend time to generate the value.
1157-
columnsToValues := map[tree.Name]string{}
1158-
for name, idx := range columnNameToIndexMap {
1159-
columnsToValues[name] = rowToInsert[idx]
1160-
}
1161-
parentValueInSameInsert, err = og.generateColumn(ctx, tx, *parentColInfo, columnsToValues)
1162-
if err != nil {
1163-
return false, err
1164-
}
1165-
} else {
1166-
parentIdx, ok := columnNameToIndexMap[parentColumn]
1167-
if !ok {
1168-
return false, errors.Newf("parent column %s does not exist in table %s", parentColumn, childTableName)
1169-
}
1170-
parentValueInSameInsert = otherRow[parentIdx]
1158+
parentIdx, ok := nonGeneratedColumnNameToIndexMap[parentColumn]
1159+
if !ok {
1160+
return false, errors.Newf("parent column %s does not exist in table %s", parentColumn, childTableName)
11711161
}
1162+
parentValueInSameInsert := otherRow[parentIdx]
11721163

11731164
// Skip over NULL values.
11741165
if parentValueInSameInsert == "NULL" {

pkg/workload/schemachange/operation_generator.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3083,7 +3083,8 @@ func (og *operationGenerator) insertRow(ctx context.Context, tx pgx.Tx) (stmt *o
30833083
hasUniqueConstraints := false
30843084
hasUniqueConstraintsMutations := false
30853085
hasForeignKeyMutations := false
3086-
fkViolation := false
3086+
expectedFkViolation := false
3087+
potentialFkViolation := false
30873088
if !anyInvalidInserts {
30883089
// Verify if the new row may violate unique constraints by checking the
30893090
// constraints in the database.
@@ -3100,7 +3101,7 @@ func (og *operationGenerator) insertRow(ctx context.Context, tx pgx.Tx) (stmt *o
31003101
}
31013102
// Verify if the new row will violate fk constraints by checking the constraints and rows
31023103
// in the database.
3103-
fkViolation, err = og.violatesFkConstraints(ctx, tx, tableName, nonGeneratedColNames, rows)
3104+
expectedFkViolation, potentialFkViolation, err = og.violatesFkConstraints(ctx, tx, tableName, nonGeneratedColNames, rows)
31043105
if err != nil {
31053106
return nil, err
31063107
}
@@ -3113,16 +3114,16 @@ func (og *operationGenerator) insertRow(ctx context.Context, tx pgx.Tx) (stmt *o
31133114

31143115
stmt.potentialExecErrors.addAll(codesWithConditions{
31153116
{code: pgcode.UniqueViolation, condition: hasUniqueConstraints || hasUniqueConstraintsMutations},
3116-
{code: pgcode.ForeignKeyViolation, condition: fkViolation || hasForeignKeyMutations},
3117+
{code: pgcode.ForeignKeyViolation, condition: expectedFkViolation || potentialFkViolation || hasForeignKeyMutations},
31173118
{code: pgcode.NotNullViolation, condition: true},
31183119
{code: pgcode.CheckViolation, condition: true},
31193120
{code: pgcode.InsufficientPrivilege, condition: true}, // For RLS violations
31203121
})
31213122
og.expectedCommitErrors.addAll(codesWithConditions{
3122-
{code: pgcode.ForeignKeyViolation, condition: fkViolation && !hasForeignKeyMutations},
3123+
{code: pgcode.ForeignKeyViolation, condition: expectedFkViolation && !hasForeignKeyMutations},
31233124
})
31243125
og.potentialCommitErrors.addAll(codesWithConditions{
3125-
{code: pgcode.ForeignKeyViolation, condition: hasForeignKeyMutations},
3126+
{code: pgcode.ForeignKeyViolation, condition: potentialFkViolation || hasForeignKeyMutations},
31263127
})
31273128

31283129
var formattedRows []string

0 commit comments

Comments
 (0)