Skip to content

Commit 9bf1f96

Browse files
committed
Add persistence methods
1 parent e5c3c3c commit 9bf1f96

File tree

5 files changed

+108
-1
lines changed

5 files changed

+108
-1
lines changed

dbldatagen/config.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# See the License for the specific language governing permissions and
2+
# limitations under the License.
3+
#
4+
5+
"""
6+
This module implements configuration classes for writing generated data.
7+
"""
8+
9+
from dataclasses import dataclass, field
10+
11+
12+
@dataclass(frozen=True, slots=True)
13+
class OutputConfig:
14+
"""
15+
This class implements an output sink configuration used to write generated data. Output sinks must extend from the
16+
`OutputConfig` base class.
17+
"""
18+
location: str
19+
output_mode: str
20+
format: str = "delta"
21+
options: dict = field(default_factory=dict)

dbldatagen/data_generator.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from dbldatagen import datagen_constants
2121
from dbldatagen._version import _get_spark_version
2222
from dbldatagen.column_generation_spec import ColumnGenerationSpec
23+
from dbldatagen.config import OutputConfig
2324
from dbldatagen.constraints import Constraint, SqlExpr
2425
from dbldatagen.datarange import DataRange
2526
from dbldatagen.distributions import DataDistribution
@@ -28,7 +29,14 @@
2829
from dbldatagen.serialization import SerializableToDict
2930
from dbldatagen.spark_singleton import SparkSingleton
3031
from dbldatagen.text_generators import TextGenerator
31-
from dbldatagen.utils import DataGenError, deprecated, ensure, split_list_matching_condition, topologicalSort
32+
from dbldatagen.utils import (
33+
DataGenError,
34+
deprecated,
35+
ensure,
36+
split_list_matching_condition,
37+
topologicalSort,
38+
write_data_to_output,
39+
)
3240

3341

3442
_OLD_MIN_OPTION: str = "min"
@@ -1909,6 +1917,18 @@ def scriptMerge(
19091917

19101918
return result
19111919

1920+
def writeGeneratedData(self, config: OutputConfig, is_streaming: bool = False) -> None:
1921+
"""
1922+
Builds a `DataFrame` from the `DataGenerator` and writes the data to a target table.
1923+
1924+
:param config: Output configuration for writing generated data
1925+
:param is_streaming: Whether to write data with Structured Streaming (default `False`)
1926+
"""
1927+
if is_streaming:
1928+
write_data_to_output(self.build(withStreaming=True), config=config, is_streaming=is_streaming)
1929+
1930+
write_data_to_output(self.build(), config=config)
1931+
19121932
@staticmethod
19131933
def loadFromJson(options: str) -> "DataGenerator":
19141934
"""

dbldatagen/utils.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
from typing import Any
1919

2020
import jmespath
21+
from pyspark.sql import DataFrame
22+
23+
from dbldatagen.config import OutputConfig
2124

2225

2326
def deprecated(message: str = "") -> Callable[[Callable[..., Any]], Callable[..., Any]]:
@@ -360,3 +363,32 @@ def system_time_millis() -> int:
360363
"""
361364
curr_time: int = round(time.time() / 1000)
362365
return curr_time
366+
367+
368+
def write_data_to_output(df: DataFrame, config: OutputConfig, is_streaming: bool = False) -> None:
369+
"""
370+
Writes a DataFrame to the sink configured in the output configuration.
371+
372+
:param df: Spark DataFrame to write
373+
:param config: Output configuration passed as an `OutputConfig`
374+
:param is_streaming: Whether to write the data with Structured Streaming (default `False`)
375+
"""
376+
377+
if is_streaming:
378+
(
379+
df
380+
.writeStream
381+
.format(config.format)
382+
.outputMode(config.output_mode)
383+
.options(**config.options)
384+
.start(config.location)
385+
)
386+
387+
(
388+
df
389+
.write
390+
.format(config.format)
391+
.mode(config.output_mode)
392+
.options(**config.options)
393+
.save(config.location)
394+
)

docs/source/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ As it is installable via `%pip install`, it can also be incorporated in environm
3737
Using multiple tables <multi_table_data>
3838
Extending text generation <extending_text_generation>
3939
Use with Delta Live Tables <using_delta_live_tables>
40+
Writing Generated Data <writing_generated_data>
4041
Troubleshooting data generation <troubleshooting>
4142

4243
.. toctree::
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
.. Databricks Labs Data Generator documentation master file, created by
2+
sphinx-quickstart on Sun Jun 21 10:54:30 2020.
3+
4+
Writing Generated Data to Tables or Files
5+
===========================================================
6+
7+
Generated data can be written directly to output tables or files using the ``OutputConfig`` class.
8+
9+
Writing Generated Data to a Table
10+
--------------------------------------------
11+
12+
Once you've defined a ``DataGenerator``, call the ``writeGeneratedData`` method to write data to a target table.
13+
14+
.. code-block:: python
15+
16+
from pyspark.sql.types import StringType
17+
import dbldatagen as dg
18+
from dbldatagen.config import OutputConfig
19+
20+
# Create a sample data generator with a few columns:
21+
testDataSpec = (
22+
dg.DataGenerator(spark, name="users_dataset", rows=1000)
23+
.withColumn("user_name", StringType(), expr="concat('user_', id)")
24+
.withColumn("email_address", StringType(), expr="concat(user_name, '@email.com')")
25+
.withColumn("phone_number", StringType(), template="555-DDD-DDDD")
26+
)
27+
28+
# Define an output configuration:
29+
outputConfig = OutputConfig(location="main.demo.table", output_mode="overwrite")
30+
31+
# Get the data generation options as a Python dictionary:
32+
testDataSpec.writeGeneratedData(config=outputConfig)
33+

0 commit comments

Comments
 (0)