Skip to content

Commit 6f2e34c

Browse files
authored
Diagonally union remote and new Glides records (#686)
1 parent 02391c2 commit 6f2e34c

File tree

1 file changed

+6
-5
lines changed

1 file changed

+6
-5
lines changed

src/lamp_py/ingestion/glides.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ class GlidesRecord(dy.Schema):
6868
)
6969
source = dy.String()
7070
specversion = dy.String()
71+
dataschema = dy.String(nullable=True)
7172

7273

7374
class EditorChangesRecord(GlidesRecord):
@@ -217,17 +218,17 @@ def append_records(self) -> None:
217218
process_logger = ProcessLogger(process_name="append_glides_records", type=self.type)
218219
process_logger.log_start()
219220

220-
new_dataset = self.convert_records().lazy()
221+
new_dataset = self.convert_records()
221222

222223
if os.path.exists(self.local_path):
223-
remote_records = self.table_schema.scan_parquet(self.local_path, validation="allow")
224-
joined_ds = pl.union([new_dataset, remote_records])
224+
remote_records = pl.read_parquet(self.local_path)
225+
joined_ds = pl.union([new_dataset, remote_records], how="diagonal_relaxed")
225226
else:
226227
joined_ds = new_dataset
227228

228229
process_logger.add_metadata(
229-
new_records=new_dataset.select("time").count().collect().item(),
230-
total_records=joined_ds.select("time").count().collect().item(),
230+
new_records=new_dataset.select("time").height,
231+
total_records=joined_ds.select("time").height,
231232
)
232233

233234
with tempfile.TemporaryDirectory() as tmp_dir:

0 commit comments

Comments
 (0)