99import pandas as pd
1010import pyarrow as pa
1111import pyarrow .csv as csv
12+ import pyarrow .parquet as parquet
1213
1314from activitysim .core import configuration , workflow
1415from activitysim .core .workflow .checkpoint import CHECKPOINT_NAME
@@ -226,8 +227,13 @@ def write_data_dictionary(state: workflow.State) -> None:
226227@workflow .step
227228def write_tables (state : workflow .State ) -> None :
228229 """
229- Write pipeline tables as csv files (in output directory) as specified by output_tables list
230- in settings file.
230+ Write pipeline tables as csv or parquet files (in output directory) as specified
231+ by output_tables list in settings file. Output to parquet or a single h5 file is
232+ also supported.
233+
234+ 'h5_store' defaults to False, which means the output will be written out to csv.
235+ 'file_type' defaults to 'csv' but can also be used to specify 'parquet' or 'h5'.
236+ When 'h5_store' is set to True, 'file_type' is ingored and the outputs are written to h5.
231237
232238 'output_tables' can specify either a list of output tables to include or to skip
233239 if no output_tables list is specified, then all checkpointed tables will be written
@@ -261,6 +267,16 @@ def write_tables(state: workflow.State) -> None:
261267 tables:
262268 - households
263269
270+ To write tables to parquet files, use the file_type setting:
271+
272+ ::
273+
274+ output_tables:
275+ file_type: parquet
276+ action: include
277+ tables:
278+ - households
279+
264280 Parameters
265281 ----------
266282 output_dir: str
@@ -277,6 +293,7 @@ def write_tables(state: workflow.State) -> None:
277293 tables = output_tables_settings .tables
278294 prefix = output_tables_settings .prefix
279295 h5_store = output_tables_settings .h5_store
296+ file_type = output_tables_settings .file_type
280297 sort = output_tables_settings .sort
281298
282299 registered_tables = state .registered_tables ()
@@ -388,14 +405,20 @@ def map_func(x):
388405 ):
389406 dt = dt .drop ([f"_original_{ lookup_col } " ])
390407
391- if h5_store :
408+ if h5_store or file_type == "h5" :
392409 file_path = state .get_output_file_path ("%soutput_tables.h5" % prefix )
393410 dt .to_pandas ().to_hdf (
394411 str (file_path ), key = table_name , mode = "a" , format = "fixed"
395412 )
413+
396414 else :
397- file_name = f"{ prefix } { table_name } .csv "
415+ file_name = f"{ prefix } { table_name } .{ file_type } "
398416 file_path = state .get_output_file_path (file_name )
399417
400418 # include the index if it has a name or is a MultiIndex
401- csv .write_csv (dt , file_path )
419+ if file_type == "csv" :
420+ csv .write_csv (dt , file_path )
421+ elif file_type == "parquet" :
422+ parquet .write_table (dt , file_path )
423+ else :
424+ raise ValueError (f"unknown file_type { file_type } " )
0 commit comments