Skip to content

Commit 10aaa5b

Browse files
raverburgyunzheng
andauthored
Add queue size to Elastic adapter (#156)
Add queue size limit to prevent high memory usage for plugins with multiple million of records --------- Co-authored-by: Yun Zheng Hu <hu@fox-it.com>
1 parent 25d783a commit 10aaa5b

File tree

1 file changed

+5
-2
lines changed

1 file changed

+5
-2
lines changed

flow/record/adapter/elastic.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
2626
Optional arguments:
2727
[API_KEY]: base64 encoded api key to authenticate with (default: False)
28+
[QUEUE_SIZE]: maximum queue size for writing records; limits memory usage (default: 100000)
2829
[INDEX]: name of the index to use (default: records)
2930
[VERIFY_CERTS]: verify certs of Elasticsearch instance (default: True)
3031
[HASH_RECORD]: make record unique by hashing record [slow] (default: False)
@@ -43,18 +44,20 @@ def __init__(
4344
http_compress: str | bool = True,
4445
hash_record: str | bool = False,
4546
api_key: str | None = None,
47+
queue_size: int = 100000,
4648
**kwargs,
4749
) -> None:
4850
self.index = index
4951
self.uri = uri
5052
verify_certs = str(verify_certs).lower() in ("1", "true")
5153
http_compress = str(http_compress).lower() in ("1", "true")
5254
self.hash_record = str(hash_record).lower() in ("1", "true")
55+
queue_size = int(queue_size)
5356

5457
if not uri.lower().startswith(("http://", "https://")):
5558
uri = "http://" + uri
5659

57-
self.queue: queue.Queue[Record | StopIteration] = queue.Queue()
60+
self.queue: queue.Queue[Record | StopIteration] = queue.Queue(maxsize=queue_size)
5861
self.event = threading.Event()
5962

6063
self.es = elasticsearch.Elasticsearch(
@@ -147,7 +150,7 @@ def streaming_bulk_thread(self) -> None:
147150
self.event.set()
148151

149152
def write(self, record: Record) -> None:
150-
self.queue.put_nowait(record)
153+
self.queue.put(record)
151154

152155
def flush(self) -> None:
153156
pass

0 commit comments

Comments
 (0)