diff --git a/dataflows/base/schema_validator.py b/dataflows/base/schema_validator.py index 7d11c9a..b865da7 100644 --- a/dataflows/base/schema_validator.py +++ b/dataflows/base/schema_validator.py @@ -30,8 +30,10 @@ def drop(res_name, row, i, e): return False -def schema_validator(resource, iterator, - field_names=None, on_error=None): +def schema_validator( + resource, iterator, field_names=None, + on_error=None, preserve_missing_values=False, +): if on_error is None: on_error = raise_exception if isinstance(resource, Resource): @@ -46,7 +48,7 @@ def schema_validator(resource, iterator, for i, row in enumerate(iterator): try: for f in schema_fields: - row[f.name] = f.cast_value(row.get(f.name)) + row[f.name] = f.cast_value(row.get(f.name), preserve_missing_values=preserve_missing_values) except CastError as e: if not on_error(resource['name'], row, i, e): continue diff --git a/dataflows/processors/set_type.py b/dataflows/processors/set_type.py index 887e45a..9a291bf 100644 --- a/dataflows/processors/set_type.py +++ b/dataflows/processors/set_type.py @@ -6,7 +6,10 @@ class set_type(DataStreamProcessor): - def __init__(self, name, resources=-1, regex=True, on_error=None, **options): + def __init__( + self, name, resources=-1, regex=True, + on_error=None, preserve_missing_values=False, **options + ): super(set_type, self).__init__() if not regex: name = re.escape(name) @@ -15,6 +18,7 @@ def __init__(self, name, resources=-1, regex=True, on_error=None, **options): self.resources = resources self.field_names = [] self.on_error = on_error + self.preserve_missing_values = preserve_missing_values def process_resources(self, resources): for res in resources: @@ -22,7 +26,8 @@ def process_resources(self, resources): if len(self.field_names) > 0: yield schema_validator(res.res, res, field_names=self.field_names, - on_error=self.on_error) + on_error=self.on_error, + preserve_missing_values=self.preserve_missing_values) else: yield res else: