Skip to content

Commit 431d865

Browse files
added changes for better build ordering when SQL expressions are pres… (#159)
* added changes for better build ordering when SQL expressions are present without baseColumn specification * updated changelog * fixed use of as_keyword in parsing * fixed typo * fixes based on review feedback
1 parent fefd904 commit 431d865

File tree

9 files changed

+540
-60
lines changed

9 files changed

+540
-60
lines changed

CHANGELOG.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,22 @@
33
## Change History
44
All notable changes to the Databricks Labs Data Generator will be documented in this file.
55

6-
### Version Unreleased
6+
### Unreleased
77

88
#### Changed
9+
* Adjusted column build phase separation (i.e which select statement is used to build columns) so that a
10+
column with a SQL expression can refer to previously created columns without use of a `baseColumn` attribute
911
* Changed build labelling to comply with PEP440
1012

13+
#### Fixed
14+
15+
#### Added
16+
* Parsing of SQL expressions to determine column dependencies
17+
18+
#### Notes
19+
* This does not change actual order of column building - but adjusts which phase columns are built in
20+
21+
1122
### Version 0.3.1
1223

1324
#### Changed

dbldatagen/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from .datagen_constants import DEFAULT_RANDOM_SEED, RANDOM_SEED_RANDOM, RANDOM_SEED_FIXED, \
2828
RANDOM_SEED_HASH_FIELD_NAME, MIN_PYTHON_VERSION, MIN_SPARK_VERSION
2929
from .utils import ensure, topologicalSort, mkBoundsList, coalesce_values, \
30-
deprecated, parse_time_interval, DataGenError
30+
deprecated, parse_time_interval, DataGenError, split_list_matching_condition
3131
from ._version import __version__
3232
from .column_generation_spec import ColumnGenerationSpec
3333
from .column_spec_options import ColumnSpecOptions

dbldatagen/data_generator.py

Lines changed: 69 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@
1414
from .column_generation_spec import ColumnGenerationSpec
1515
from .datagen_constants import DEFAULT_RANDOM_SEED, RANDOM_SEED_FIXED, RANDOM_SEED_HASH_FIELD_NAME, \
1616
DEFAULT_SEED_COLUMN, SPARK_RANGE_COLUMN, MIN_SPARK_VERSION
17-
from .utils import ensure, topologicalSort, DataGenError, deprecated
17+
from .utils import ensure, topologicalSort, DataGenError, deprecated, split_list_matching_condition
1818
from . _version import _get_spark_version
19+
from .schema_parser import SchemaParser
1920

2021
_OLD_MIN_OPTION = 'min'
2122
_OLD_MAX_OPTION = 'max'
@@ -754,7 +755,6 @@ def withColumn(self, colName, colType=StringType(), minValue=None, maxValue=None
754755
new_props = {}
755756
new_props.update(kwargs)
756757

757-
from .schema_parser import SchemaParser
758758
if type(colType) == str:
759759
colType = SchemaParser.columnTypeFromString(colType)
760760

@@ -908,24 +908,85 @@ def _computeColumnBuildOrder(self):
908908
self._seedColumnName, set())
909909
for x in self._allColumnSpecs]
910910

911-
# self.pp_list(dependency_ordering, msg="dependencies")
912-
913911
self.logger.info("dependency list: %s", str(dependency_ordering))
914912

915913
self._buildOrder = list(
916914
topologicalSort(dependency_ordering, flatten=False, initial_columns=[self._seedColumnName]))
917915

918916
self.logger.info("columnBuildOrder: %s", str(self._buildOrder))
919917

920-
# self.pp_list(self._buildOrder, "build order")
918+
self._buildOrder = self._adjustBuildOrderForSqlDependencies(self._buildOrder, self._columnSpecsByName)
919+
921920
return self._buildOrder
922921

922+
def _adjustBuildOrderForSqlDependencies(self, buildOrder, columnSpecsByName):
923+
""" Adjust column build order according to the following heuristics
924+
925+
1: if the column being built in a specific build order phase has a SQL expression and it references
926+
other columns in the same build phase (or potentially references them as the expression parsing is
927+
primitive), separate that phase into multiple phases.
928+
929+
It will also issue a warning if the SQL expression appears to reference a column built later
930+
931+
:param buildOrder: list of lists of ids - each sublist represents phase of build
932+
:param columnSpecsByName: dictionary to map column names to column specs
933+
:returns: Spark SQL dataframe of generated test data
934+
935+
"""
936+
new_build_order = []
937+
938+
all_columns = set([item for sublist in buildOrder for item in sublist])
939+
built_columns = []
940+
prior_phase_built_columns = []
941+
942+
# for each phase, evaluate it to see if it needs to be split
943+
for current_phase in buildOrder:
944+
separate_phase_columns = []
945+
946+
for columnBeingBuilt in current_phase:
947+
948+
if columnBeingBuilt in columnSpecsByName:
949+
cs = columnSpecsByName[columnBeingBuilt]
950+
951+
if cs.expr is not None:
952+
sql_references = SchemaParser.columnsReferencesFromSQLString(cs.expr, filter=all_columns)
953+
954+
# determine references to columns not yet built
955+
forward_references = set(sql_references) - set(built_columns)
956+
if len(forward_references) > 0:
957+
msg = f"Column '{columnBeingBuilt} may have forward references to {forward_references}."
958+
self.logger.warning(msg)
959+
self.logger.warning("Use `baseColumn` attribute to correct build ordering if necessary")
960+
961+
references_not_yet_built = set(sql_references) - set(prior_phase_built_columns)
962+
963+
if len(references_not_yet_built.intersection(set(current_phase))) > 0:
964+
separate_phase_columns.append(columnBeingBuilt)
965+
966+
# for each column, get the set of sql references and filter against column names
967+
built_columns.append(columnBeingBuilt)
968+
969+
if len(separate_phase_columns) > 0:
970+
# split phase based on columns in separate_phase_column_list set
971+
revised_phase = split_list_matching_condition(current_phase, lambda el: el in separate_phase_columns)
972+
new_build_order.extend(revised_phase)
973+
else:
974+
# no change to phase
975+
new_build_order.append(current_phase)
976+
977+
prior_phase_built_columns.extend(current_phase)
978+
979+
return new_build_order
980+
923981
@property
924982
def build_order(self):
925983
""" return the build order minus the seed column (which defaults to `id`)
926984
927985
The build order will be a list of lists - each list specifying columns that can be built at the same time
928986
"""
987+
if not self.buildPlanComputed:
988+
self.computeBuildPlan()
989+
929990
return [x for x in self._buildOrder if x != [self._seedColumnName]]
930991

931992
def _getColumnDataTypes(self, columns):
@@ -1033,6 +1094,9 @@ def _buildColumnExpressionsWithSelects(self, df1):
10331094
Build column generation expressions with selects
10341095
:param df1: dataframe for base data generator
10351096
:return: new dataframe
1097+
1098+
The data generator build plan is separated into `rounds` of expressions. Each round consists of
1099+
expressions that are generated using a single `select` operation
10361100
"""
10371101
self.executionHistory.append("Generating data with selects")
10381102
# generation with selects may be more efficient as less intermediate data frames

dbldatagen/schema_parser.py

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ def getTypeDefinitionParser(cls):
132132
pp.delimitedList(pp.Group(ident + pp.Optional(colon) + pp.Group(type_expr)))) + r_angle
133133

134134
# try to capture invalid type name for better error reporting
135-
invalid_type = pp.Word(pp.alphas, pp.alphanums+"_", as_keyword=True)
135+
invalid_type = pp.Word(pp.alphas, pp.alphanums+"_", asKeyword=True)
136136

137137
# use left recursion to handle nesting of types
138138
type_expr <<= pp.MatchFirst([primitive_type_keyword, array_expr, map_expr, struct_expr, invalid_type])
@@ -260,6 +260,67 @@ def columnTypeFromString(cls, type_string):
260260

261261
return type_construct
262262

263+
@classmethod
264+
def _cleanseSQL(cls, sql_string):
265+
""" Cleanse sql string removing string literals so that they are not considered as part of potential column
266+
references
267+
:param sql_string: String representation of SQL expression
268+
:returns: cleansed string
269+
270+
Any strings identified are replaced with `' '`
271+
"""
272+
assert sql_string is not None, "`sql_string` must be specified"
273+
274+
# skip over quoted identifiers even if they contain quotes
275+
quoted_ident = pp.QuotedString(quoteChar="`", escQuote="``")
276+
277+
stringForm1 = pp.Literal('r') + pp.QuotedString(quoteChar="'")
278+
stringForm2 = pp.Literal('r') + pp.QuotedString(quoteChar='"')
279+
stringForm3 = pp.QuotedString(quoteChar="'", escQuote=r"\'")
280+
stringForm4 = pp.QuotedString(quoteChar='"', escQuote=r'\"')
281+
stringForm = stringForm1 ^ stringForm2 ^ stringForm3 ^ stringForm4
282+
stringForm.set_parse_action(lambda s, loc, toks: "' '")
283+
284+
parser = quoted_ident ^ stringForm
285+
286+
transformed_string = parser.transform_string(sql_string)
287+
288+
return transformed_string
289+
290+
@classmethod
291+
def columnsReferencesFromSQLString(cls, sql_string, filter=None):
292+
""" Generate a list of possible column references from a SQL string
293+
294+
This method finds all condidate references to SQL columnn ids in the string
295+
296+
To avoid the overhead of a full SQL parser, the implementation will simply look for possible field names
297+
298+
Further improvements may eliminate some common syntax but in current form, reserved words will
299+
also be returned as possible column references.
300+
301+
So any uses of this must not assume that all possible references are valid column references
302+
303+
:param sql_string: String representation of SQL expression
304+
:returns: list of possible column references
305+
"""
306+
assert sql_string is not None, "`sql_string` must be specified"
307+
assert filter is None or isinstance(filter, list) or isinstance(filter, set)
308+
309+
cleansed_sql_string = cls._cleanseSQL(sql_string)
310+
311+
ident = pp.Word(pp.alphas, pp.alphanums + "_") | pp.QuotedString(quoteChar="`", escQuote="``")
312+
parser = ident
313+
314+
references = parser.search_string(cleansed_sql_string)
315+
316+
results = set([item for sublist in references for item in sublist])
317+
318+
if filter is not None:
319+
filtered_results = results.intersection(set(filter))
320+
return list(filtered_results)
321+
else:
322+
return list(results)
323+
263324
@classmethod
264325
def parseCreateTable(cls, sparkSession, source_schema):
265326
""" Parse a schema from a schema string

dbldatagen/utils.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ def ensure(cond, msg="condition does not hold true"):
7575
:raises: `DataGenError` exception if condition does not hold true
7676
:returns: Does not return anything but raises exception if condition does not hold
7777
"""
78+
7879
def strip_margin(text):
7980
return re.sub(r'\n[ \t]*\|', '\n', text)
8081

@@ -214,3 +215,57 @@ def parse_time_interval(spec):
214215
)
215216

216217
return delta
218+
219+
220+
def split_list_matching_condition(lst, cond):
221+
""" Split a list on elements that match a condition
222+
223+
This will find all matches of a specific condition in the list and split the list into sublists around the
224+
element that matches this condition.
225+
226+
It will handle multiple matches performing splits on each match.
227+
228+
For example, the following code will produce the results below:
229+
230+
x = ['id', 'city_name', 'id', 'city_id', 'city_pop', 'id', 'city_id', 'city_pop','city_id', 'city_pop','id']
231+
splitListOnCondition(x, lambda el: el == 'id')
232+
233+
234+
result:
235+
`[['id'], ['city_name'], ['id'], ['city_id', 'city_pop'],
236+
['id'], ['city_id', 'city_pop', 'city_id', 'city_pop'], ['id']]`
237+
238+
:arg lst: list of items to perform condition matches against
239+
:arg cond: lambda function or function taking single argument and returning True or False
240+
:returns: list of sublists
241+
"""
242+
243+
def match_condition(matchList, matchFn):
244+
"""Return first index of element of list matching condition"""
245+
if matchList is None or len(matchList) == 0:
246+
return -1
247+
248+
for i in range(len(matchList)):
249+
if matchFn(matchList[i]):
250+
return i
251+
252+
return -1
253+
254+
# main code
255+
retval = []
256+
257+
if lst is None:
258+
retval = lst
259+
elif len(lst) == 1:
260+
retval = [lst]
261+
else:
262+
ix = match_condition(lst, cond)
263+
if ix != -1:
264+
retval.extend(split_list_matching_condition(lst[0:ix], cond))
265+
retval.append(lst[ix:ix + 1])
266+
retval.extend(split_list_matching_condition(lst[ix + 1:], cond))
267+
else:
268+
retval = [lst]
269+
270+
# filter out empty lists
271+
return [el for el in retval if el != []]

docs/source/generating_column_data.rst

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,13 +107,29 @@ This performs the following actions:
107107
- The final set of output fields will be selected (omitting any columns where the ``omit`` attribute was set to
108108
**True**)
109109

110+
.. note::
111+
112+
Normally the columns will be built in the order specified in the spec.
113+
Use of the `baseColumn` attribute may change the column build ordering.
114+
115+
110116
This has several implications:
111117

112-
- If a column is referred to in an expression, the ``baseColumn`` attribute must be defined with a dependency
118+
- If a column is referred to in an expression, the ``baseColumn`` attribute may need to be defined with a dependency
113119
on that column
114120
- If a column uses a base column with a restricted range of values then it is possible that the column
115121
will not generate the full range of values in the column generation spec
116122
- If the base column is of type ``boolean`` or some other restricted range type, computations on that base value
117123
may not produce the expected range of values
118-
- If base column is not specified, you may see errors reporting that the column in an expression does not exist
124+
- If base column is not specified, you may see errors reporting that the column in an expression does not exist.
125+
This may be fixed by specifying a column dependency using the `baseColumn` attribute
126+
127+
.. note::
128+
129+
The implementation performs primitive scanning of SQL expressions (specified using the `expr` attribute)
130+
to determine if the sql expression depends on
131+
earlier columns and if so, will put the building of the column in a separate phase.
119132

133+
However it does not reorder the building sequence if there is a reference to a column that will be built later in the
134+
SQL expression.
135+
To enforce the dependency, you must use the `baseColumn` attribute to indicate the dependency.

0 commit comments

Comments
 (0)