|
6 | 6 | import logging |
7 | 7 | import os.path |
8 | 8 | from enum import Enum |
| 9 | +from os.path import exists |
9 | 10 | from pathlib import Path |
10 | | -from typing import TextIO |
| 11 | +from typing import TextIO, Callable, List, Tuple |
11 | 12 |
|
12 | 13 | # ndjson missing types: https://github.com/rhgrant10/ndjson/issues/10 |
13 | 14 | import ndjson # type: ignore |
@@ -60,14 +61,17 @@ def load(self, file: TextIO) -> list[DuneRecord]: |
60 | 61 | return list(ndjson.reader(file)) |
61 | 62 | raise ValueError(f"Unrecognized FileType {self} for {file.name}") |
62 | 63 |
|
63 | | - def write(self, out_file: TextIO, data: list[DuneRecord]) -> None: |
| 64 | + def write( |
| 65 | + self, out_file: TextIO, data: list[DuneRecord], skip_headers: bool = False |
| 66 | + ) -> None: |
64 | 67 | """Writes `data` to `out_file`""" |
65 | 68 | logger.debug(f"writing results to file {out_file.name}") |
66 | 69 | if self == FileType.CSV: |
67 | 70 | headers = data[0].keys() |
68 | 71 | data_tuple = [tuple(rec.values()) for rec in data] |
69 | 72 | dict_writer = csv.DictWriter(out_file, headers, lineterminator="\n") |
70 | | - dict_writer.writeheader() |
| 73 | + if not skip_headers: |
| 74 | + dict_writer.writeheader() |
71 | 75 | writer = csv.writer(out_file, lineterminator="\n") |
72 | 76 | writer.writerows(data_tuple) |
73 | 77 |
|
@@ -106,13 +110,77 @@ def _filepath(self, name: str, ftype: FileType) -> str: |
106 | 110 | """Internal method for building absolute path.""" |
107 | 111 | return os.path.join(self.path, name + str(ftype)) |
108 | 112 |
|
109 | | - def _write(self, data: list[DuneRecord], name: str, ftype: FileType) -> None: |
110 | | - # TODO - use @skip_empty decorator here. Couldn't get the types to work. |
| 113 | + def _write(self, data: List[DuneRecord], name: str, ftype: FileType) -> None: |
| 114 | + # The following three lines are duplicated in _append, due to python version compatibility |
| 115 | + # https://github.com/cowprotocol/dune-client/issues/45 |
| 116 | + # We will continue to support python < 3.10 until ~3.13, this issue will remain open. |
111 | 117 | if len(data) == 0: |
112 | 118 | logger.info(f"Nothing to write to {name}... skipping") |
113 | | - return |
| 119 | + return None |
114 | 120 | with open(self._filepath(name, ftype), "w", encoding=self.encoding) as out_file: |
115 | 121 | ftype.write(out_file, data) |
| 122 | + return None |
| 123 | + |
| 124 | + def _assert_matching_keys( |
| 125 | + self, keys: Tuple[str, ...], fname: str, ftype: FileType |
| 126 | + ) -> None: |
| 127 | + with open(fname, "r", encoding=self.encoding) as file: |
| 128 | + if ftype == FileType.CSV: |
| 129 | + # Check matching headers. |
| 130 | + headers = file.readline() |
| 131 | + existing_keys = headers.strip().split(",") |
| 132 | + elif ftype == FileType.JSON: |
| 133 | + single_object = json.loads(file.readline())[0] |
| 134 | + existing_keys = single_object.keys() |
| 135 | + elif ftype == FileType.NDJSON: |
| 136 | + single_object = json.loads(file.readline()) |
| 137 | + existing_keys = single_object.keys() |
| 138 | + |
| 139 | + key_tuple = tuple(existing_keys) |
| 140 | + assert keys == key_tuple, f"{keys} != {key_tuple}" |
| 141 | + |
| 142 | + def _append(self, data: List[DuneRecord], name: str, ftype: FileType) -> None: |
| 143 | + if len(data) == 0: |
| 144 | + logger.info(f"Nothing to write to {name}... skipping") |
| 145 | + return None |
| 146 | + fname = self._filepath(name, ftype) |
| 147 | + if not exists(fname): |
| 148 | + logger.warning( |
| 149 | + f"File {fname} does not exist, using write instead of append!" |
| 150 | + ) |
| 151 | + return self._write(data, name, ftype) |
| 152 | + |
| 153 | + # validate that the incoming content to be appended has the same schema |
| 154 | + # The skip empty decorator ensures existence of data[0]! |
| 155 | + self._assert_matching_keys(tuple(data[0].keys()), fname, ftype) |
| 156 | + |
| 157 | + if ftype == FileType.JSON: |
| 158 | + # These are JSON lists, so we have to concatenate the data. |
| 159 | + with open(fname, "r", encoding=self.encoding) as existing_file: |
| 160 | + existing_data = ftype.load(existing_file) |
| 161 | + return self._write(existing_data + data, name, ftype) |
| 162 | + |
| 163 | + with open(fname, "a+", encoding=self.encoding) as out_file: |
| 164 | + return ftype.write(out_file, data, skip_headers=True) |
| 165 | + |
| 166 | + def append_csv(self, data: list[DuneRecord], name: str) -> None: |
| 167 | + """Appends `data` to csv file `name`""" |
| 168 | + # This is a special case because we want to skip headers when the file already exists |
| 169 | + # Additionally, we may want to validate that the headers actually coincide. |
| 170 | + self._append(data, name, FileType.CSV) |
| 171 | + |
| 172 | + def append_json(self, data: list[DuneRecord], name: str) -> None: |
| 173 | + """ |
| 174 | + Appends `data` to json file `name` |
| 175 | + This is the least efficient of all, since we have to load the entire file, |
| 176 | + concatenate the lists and then overwrite the file! |
| 177 | + Other filetypes such as CSV and NDJSON can be directly appended to! |
| 178 | + """ |
| 179 | + self._append(data, name, FileType.JSON) |
| 180 | + |
| 181 | + def append_ndjson(self, data: list[DuneRecord], name: str) -> None: |
| 182 | + """Appends `data` to ndjson file `name`""" |
| 183 | + self._append(data, name, FileType.NDJSON) |
116 | 184 |
|
117 | 185 | def write_csv(self, data: list[DuneRecord], name: str) -> None: |
118 | 186 | """Writes `data` to csv file `name`""" |
@@ -154,3 +222,6 @@ def load_singleton( |
154 | 222 | ) -> DuneRecord: |
155 | 223 | """Loads and returns single entry by index (default 0)""" |
156 | 224 | return self._load(name, self._parse_ftype(ftype))[index] |
| 225 | + |
| 226 | + |
| 227 | +WriteLikeSignature = Callable[[FileIO, List[DuneRecord], str, FileType], None] |
0 commit comments