11import re
2+ from collections import defaultdict
23from io import BytesIO
34from types import FunctionType
4- from typing import IO , TYPE_CHECKING , Callable , Generator
5+ from typing import IO , TYPE_CHECKING , Callable , Generator , Protocol
56
67from etl_utils .ldif .model import DistinguishedName
78from smart_open import open as _smart_open
@@ -67,6 +68,13 @@ def ldif_dump(fp: IO, obj: list[PARSED_RECORD]) -> str:
6768 )
6869
6970
71+ class _StreamBlock (Protocol ):
72+ def flush (self ) -> str : ...
73+ def reset (self ): ...
74+ def parse (self , line : bytes ): ...
75+ def __bool__ (self ): ...
76+
77+
7078class StreamBlock :
7179 def __init__ (self , filter_terms : list [tuple [str , str ]]):
7280 self .data = BytesIO ()
@@ -80,7 +88,7 @@ def flush(self) -> str:
8088 self .data .write (self .buffer )
8189 self .reset ()
8290
83- def reset (self ) -> str :
91+ def reset (self ):
8492 self .buffer = bytes ()
8593 self .keep = False
8694
@@ -93,20 +101,41 @@ def __bool__(self):
93101 return bool (self .buffer ) and self .keep
94102
95103
96- def filter_ldif_from_s3_by_property (
97- s3_path , filter_terms : list [tuple [str , str ]], s3_client : "S3Client"
98- ) -> memoryview :
99- """
100- Efficiently streams a file from S3 directly into a bytes memoryview,
101- filtering out any LDIF record without any (attribute_name, attribute_value)
102- matching at least one of the filter terms.
104+ class GroupedStreamBlock :
105+ def __init__ (self , group_field : str , filter_terms : list [tuple [str , str ]]):
106+ self .data = defaultdict (BytesIO )
107+ self .filters : list [FunctionType ] = [
108+ re .compile (rf"(?i)^({ key } ): ({ value } )\n$" .encode ()).match
109+ for key , value in filter_terms
110+ ]
111+ self .group_filter = re .compile (rf"(?i)^({ group_field } ): (.*)\n$" .encode ()).match
112+ self .reset ()
103113
104- The output of this function can then be parsed using'
105- 'parse_ldif(file_opener=BytesIO, path_or_data=filtered_ldif)'
106- """
114+ def flush (self ) -> str :
115+ if self .group is None :
116+ raise Exception
117+ self .data [self .group ].write (self .buffer )
118+ self .reset ()
107119
108- stream_block = StreamBlock (filter_terms )
120+ def reset (self ) -> str :
121+ self .buffer = bytes ()
122+ self .keep = False
123+ self .group = None
124+
125+ def parse (self , line : bytes ):
126+ group_match = self .group_filter (line )
127+ if group_match :
128+ (_ , self .group ) = group_match .groups ()
129+
130+ if not self .keep and any (filter (line ) for filter in self .filters ):
131+ self .keep = True
132+ self .buffer += line
109133
134+ def __bool__ (self ):
135+ return bool (self .buffer ) and self .keep
136+
137+
138+ def stream_to_block (s3_path : str , s3_client : "S3Client" , stream_block : _StreamBlock ):
110139 with _smart_open (s3_path , mode = "rb" , transport_params = {"client" : s3_client }) as f :
111140 for line in f .readlines ():
112141 line_is_empty = line .strip () == EMPTY_BYTESTRING
@@ -118,4 +147,46 @@ def filter_ldif_from_s3_by_property(
118147
119148 if stream_block :
120149 stream_block .flush ()
150+
151+
152+ def filter_ldif_from_s3_by_property (
153+ s3_path , filter_terms : list [tuple [str , str ]], s3_client : "S3Client"
154+ ) -> memoryview :
155+ """
156+ Efficiently streams a file from S3 directly into a bytes memoryview,
157+ filtering out any LDIF record without any (attribute_name, attribute_value)
158+ matching at least one of the filter terms.
159+
160+ The output of this function can then be parsed using'
161+ 'parse_ldif(file_opener=BytesIO, path_or_data=filtered_ldif)'
162+ """
163+ stream_block = StreamBlock (filter_terms )
164+ stream_to_block (s3_path = s3_path , s3_client = s3_client , stream_block = stream_block )
121165 return stream_block .data .getbuffer ()
166+
167+
168+ def filter_and_group_ldif_from_s3_by_property (
169+ s3_path ,
170+ group_field : str ,
171+ filter_terms : list [tuple [str , str ]],
172+ s3_client : "S3Client" ,
173+ ) -> memoryview :
174+ """
175+ Efficiently streams a file from S3 directly into a bytes memoryview,
176+ filtering out any LDIF record without any (attribute_name, attribute_value)
177+ matching at least one of the filter terms, and then also grouping records
178+ by the group_field.
179+
180+ The output of this function can then be parsed using'
181+ 'parse_ldif(file_opener=BytesIO, path_or_data=filtered_and_grouped_ldif)'
182+ """
183+
184+ stream_block = GroupedStreamBlock (
185+ group_field = group_field , filter_terms = filter_terms
186+ )
187+ stream_to_block (s3_path = s3_path , s3_client = s3_client , stream_block = stream_block )
188+
189+ data = BytesIO ()
190+ for group in stream_block .data .values ():
191+ data .write (group .getbuffer ())
192+ return data .getbuffer ()
0 commit comments