1818import logging
1919import sys
2020import threading
21+ from contextlib import contextmanager
2122from dataclasses import asdict
2223from typing import Any
2324from unittest import TestCase
@@ -84,12 +85,24 @@ def test_fsspec_entry_point_no_fsspec(self):
8485FAKE_SYSTEM_INSTRUCTION = [types .Text (content = "You are a helpful assistant." )]
8586
8687
88+ class ThreadSafeMagicMock (MagicMock ):
89+ def __init__ (self , * args , ** kwargs ) -> None :
90+ self .__dict__ ["_lock" ] = threading .Lock ()
91+ super ().__init__ (* args , ** kwargs )
92+
93+ def _increment_mock_call (self , / , * args , ** kwargs ):
94+ with self .__dict__ ["_lock" ]:
95+ super ()._increment_mock_call (* args , ** kwargs )
96+
97+
8798class TestFsspecUploadHook (TestCase ):
8899 def setUp (self ):
89100 self ._fsspec_patcher = patch (
90101 "opentelemetry.util.genai._fsspec_upload.fsspec_hook.fsspec"
91102 )
92103 self .mock_fsspec = self ._fsspec_patcher .start ()
104+ self .mock_fsspec .open = ThreadSafeMagicMock ()
105+
93106 self .hook = FsspecUploadHook (
94107 base_path = BASE_PATH ,
95108 max_size = MAXSIZE ,
@@ -99,6 +112,20 @@ def tearDown(self) -> None:
99112 self .hook .shutdown ()
100113 self ._fsspec_patcher .stop ()
101114
115+ @contextmanager
116+ def block_upload (self ):
117+ unblock_upload = threading .Event ()
118+
119+ def blocked_upload (* args : Any ):
120+ unblock_upload .wait ()
121+ return MagicMock ()
122+
123+ try :
124+ self .mock_fsspec .open .side_effect = blocked_upload
125+ yield
126+ finally :
127+ unblock_upload .set ()
128+
102129 def test_shutdown_no_items (self ):
103130 self .hook .shutdown ()
104131
@@ -118,46 +145,45 @@ def test_upload_then_shutdown(self):
118145 )
119146
120147 def test_upload_blocked (self ):
121- unblock_upload = threading .Event ()
148+ with self .block_upload ():
149+ # fill the queue
150+ for _ in range (MAXSIZE ):
151+ self .hook .upload (
152+ inputs = FAKE_INPUTS ,
153+ outputs = FAKE_OUTPUTS ,
154+ system_instruction = FAKE_SYSTEM_INSTRUCTION ,
155+ )
122156
123- def blocked_upload (* args : Any ):
124- unblock_upload .wait ()
125- return MagicMock ()
157+ self .assertLessEqual (
158+ self .mock_fsspec .open .call_count ,
159+ MAXSIZE ,
160+ f"uploader should only be called { MAXSIZE = } times" ,
161+ )
126162
127- self .mock_fsspec .open .side_effect = blocked_upload
163+ with self .assertLogs (level = logging .WARNING ) as logs :
164+ self .hook .upload (
165+ inputs = FAKE_INPUTS ,
166+ outputs = FAKE_OUTPUTS ,
167+ system_instruction = FAKE_SYSTEM_INSTRUCTION ,
168+ )
128169
129- # fill the queue
130- for _ in range (MAXSIZE ):
131- self .hook .upload (
132- inputs = FAKE_INPUTS ,
133- outputs = FAKE_OUTPUTS ,
134- system_instruction = FAKE_SYSTEM_INSTRUCTION ,
170+ self .assertIn (
171+ "fsspec upload queue is full, dropping upload" , logs .output [0 ]
135172 )
136173
137- self .assertLessEqual (
138- self .mock_fsspec .open .call_count ,
139- MAXSIZE ,
140- f"uploader should only be called { MAXSIZE = } times" ,
141- )
142-
143- with self .assertLogs (level = logging .WARNING ) as logs :
174+ def test_shutdown_timeout (self ):
175+ with self .block_upload ():
144176 self .hook .upload (
145177 inputs = FAKE_INPUTS ,
146178 outputs = FAKE_OUTPUTS ,
147179 system_instruction = FAKE_SYSTEM_INSTRUCTION ,
148180 )
149181
150- self .assertIn (
151- "fsspec upload queue is full, dropping upload" , logs .output [0 ]
152- )
153-
154- unblock_upload .set ()
182+ # shutdown should timeout and return even though there are still items in the queue
183+ self .hook .shutdown (timeout_sec = 0.01 )
155184
156185 def test_failed_upload_logs (self ):
157- def failing_upload (* args : Any ) -> None :
158- raise RuntimeError ("failed to upload" )
159-
160- self .mock_fsspec .open = MagicMock (wraps = failing_upload )
186+ self .mock_fsspec .open .side_effect = RuntimeError ("failed to upload" )
161187
162188 with self .assertLogs (level = logging .ERROR ) as logs :
163189 self .hook .upload (
@@ -177,7 +203,7 @@ def test_upload_after_shutdown_logs(self):
177203 outputs = FAKE_OUTPUTS ,
178204 system_instruction = FAKE_SYSTEM_INSTRUCTION ,
179205 )
180- self .assertEqual (len (logs .output ), 1 )
206+ self .assertEqual (len (logs .output ), 3 )
181207 self .assertIn (
182208 "attempting to upload file after FsspecUploadHook.shutdown() was already called" ,
183209 logs .output [0 ],
0 commit comments