Skip to content

Commit 8b2fcbc

Browse files
committed
Merge branch 'MetricsPivot' of github.com:nodestream-proj/nodestream into MetricsPivot
2 parents 6724b19 + 388c25a commit 8b2fcbc

File tree

11 files changed

+859
-844
lines changed

11 files changed

+859
-844
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,4 +160,5 @@ cython_debug/
160160
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
161161
# and can be added to the global gitignore or merged into this file. For a more nuclear
162162
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
163-
.idea/
163+
.idea/
164+
pytest.xml

nodestream/databases/writer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ def from_file_data(
1414
),
1515
collect_stats: bool = True,
1616
batch_size: int = 1000,
17-
**database_args
17+
**database_args,
1818
):
1919
connector = DatabaseConnector.from_database_args(
2020
database=database, **database_args

nodestream/pipeline/extractors/credential_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def __init__(
1515
assume_role_arn: Optional[str] = None,
1616
assume_role_external_id: Optional[str] = None,
1717
session_ttl: int = 3000,
18-
**boto_session_args
18+
**boto_session_args,
1919
) -> None:
2020
self.assume_role_arn = assume_role_arn
2121
self.assume_role_external_id = assume_role_external_id

nodestream/pipeline/extractors/files.py

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
Iterable,
2020
List,
2121
Optional,
22-
Tuple,
22+
Sequence,
2323
)
2424

2525
import pandas as pd
@@ -74,7 +74,7 @@ def path_like(self) -> Path:
7474
@asynccontextmanager
7575
async def popped_suffix_tempfile(
7676
self,
77-
) -> AsyncContextManager[Tuple[Path, tempfile.NamedTemporaryFile]]:
77+
) -> AsyncContextManager[tuple[Path, tempfile.NamedTemporaryFile]]:
7878
"""Create a temporary file with the same suffixes sans the last one.
7979
8080
This method creates a temporary file with the same suffixes as the
@@ -179,6 +179,7 @@ def get_files(self) -> AsyncIterator[ReadableFile]:
179179
"""
180180
raise NotImplementedError
181181

182+
@abstractmethod
182183
def describe(self) -> str:
183184
"""Return a human-readable description of the file source.
184185
@@ -187,7 +188,6 @@ def describe(self) -> str:
187188
way that is understandable to the user. The description should be
188189
concise and informative.
189190
"""
190-
return str(self)
191191

192192

193193
@SUPPORTED_FILE_FORMAT_REGISTRY.connect_baseclass
@@ -481,7 +481,7 @@ class RemoteFileSource(FileSource, alias="http"):
481481
"""
482482

483483
def __init__(
484-
self, urls: Iterable[str], memory_spooling_max_size_in_mb: int = 10
484+
self, urls: Sequence[str], memory_spooling_max_size_in_mb: int = 10
485485
) -> None:
486486
self.urls = urls
487487
self.memory_spooling_max_size = memory_spooling_max_size_in_mb * 1024 * 1024
@@ -528,7 +528,7 @@ def archive_if_required(self, key: str):
528528
if not self.archive_dir:
529529
return
530530

531-
self.logger.info("Archiving S3 Object", extra=dict(key=key))
531+
self.logger.info("Archiving S3 Object", extra={"key": key})
532532
filename = Path(key).name
533533
self.s3_client.copy(
534534
Bucket=self.bucket,
@@ -617,6 +617,12 @@ async def get_files(self):
617617
object_format=self.object_format,
618618
)
619619

620+
def describe(self) -> str:
621+
return (
622+
f"S3FileSource{{bucket: {self.bucket}, prefix: {self.prefix}, "
623+
f"archive_dir: {self.archive_dir}, object_format: {self.object_format}}}"
624+
)
625+
620626

621627
class FileExtractor(Extractor):
622628
"""A class that extracts records from files.
@@ -629,19 +635,19 @@ class FileExtractor(Extractor):
629635
"""
630636

631637
@classmethod
632-
def local(cls, globs: Iterable[str]):
638+
def local(cls, globs: Iterable[str]) -> "FileExtractor":
633639
return FileExtractor.from_file_data([{"type": "local", "globs": globs}])
634640

635641
@classmethod
636-
def s3(cls, **kwargs):
642+
def s3(cls, **kwargs) -> "FileExtractor":
637643
return cls([S3FileSource.from_file_data(**kwargs)])
638644

639645
@classmethod
640646
def remote(
641647
cls,
642648
urls: Iterable[str],
643649
memory_spooling_max_size_in_mb: int = 10,
644-
):
650+
) -> "FileExtractor":
645651
return FileExtractor.from_file_data(
646652
[
647653
{
@@ -653,17 +659,19 @@ def remote(
653659
)
654660

655661
@classmethod
656-
def from_file_data(cls, sources: List[Dict[str, Any]]) -> "FileExtractor":
662+
def from_file_data(cls, sources: list[dict[str, Any]]) -> "FileExtractor":
657663
return cls(
658664
[FileSource.from_file_data_with_type_label(source) for source in sources]
659665
)
660666

661-
def __init__(self, file_sources: Iterable[FileSource]) -> None:
667+
def __init__(self, file_sources: Sequence[FileSource]) -> None:
662668
self.file_sources = file_sources
663669
self.logger = getLogger(__name__)
664670

665-
async def read_file(self, file: ReadableFile) -> Iterable[JsonLikeDocument]:
666-
intermediaries: List[AsyncContextManager[ReadableFile]] = []
671+
async def read_file(
672+
self, file: ReadableFile
673+
) -> AsyncGenerator[JsonLikeDocument, None]:
674+
intermediaries: list[AsyncContextManager[ReadableFile]] = []
667675

668676
while True:
669677
suffix = file.path_like().suffix
@@ -695,10 +703,10 @@ async def read_file(self, file: ReadableFile) -> Iterable[JsonLikeDocument]:
695703
pass
696704
except Exception as e:
697705
self.logger.warning(
698-
f"Failed to parse {file.path_like()} file. Please ensure the file is in the correct format.",
706+
"Failed to parse %s file. Please ensure the file is in the correct format.",
707+
file.path_like(),
699708
extra={"exception": str(e)},
700709
)
701-
pass
702710

703711
# Regardless of whether we found a codec or not, break out of the
704712
# loop and yield no more records because either (a) we found a
@@ -720,5 +728,6 @@ async def extract_records(self) -> AsyncGenerator[Any, Any]:
720728

721729
if total_files_from_source == 0:
722730
self.logger.warning(
723-
f"No files found for source: {file_source.describe()}"
731+
"No files found for source: %s",
732+
file_source.describe(),
724733
)

0 commit comments

Comments
 (0)