|
16 | 16 | from .schema import Parent, Stimulus
|
17 | 17 | import datajoint as dj
|
18 | 18 | import os
|
| 19 | +import logging |
| 20 | +import io |
| 21 | + |
| 22 | +logger = logging.getLogger("datajoint") |
19 | 23 |
|
20 | 24 |
|
21 | 25 | class TestFetch:
|
@@ -209,12 +213,25 @@ def test_offset(self):
|
209 | 213 | np.all([cc == ll for cc, ll in zip(c, l)]), "Sorting order is different"
|
210 | 214 | )
|
211 | 215 |
|
212 |
| - # Need to change this to test a log instead of a warning now |
213 |
| - # def test_limit_warning(self): |
214 |
| - # """Tests whether warning is raised if offset is used without limit.""" |
215 |
| - # with warnings.catch_warnings(record=True) as w: |
216 |
| - # self.lang.fetch(offset=1) |
217 |
| - # assert_true(len(w) > 0, "Warning was not raised") |
| 216 | + def test_limit_warning(self): |
| 217 | + """Tests whether warning is raised if offset is used without limit.""" |
| 218 | + log_capture = io.StringIO() |
| 219 | + stream_handler = logging.StreamHandler(log_capture) |
| 220 | + log_format = logging.Formatter( |
| 221 | + "[%(asctime)s][%(funcName)s][%(levelname)s]: %(message)s" |
| 222 | + ) |
| 223 | + stream_handler.setFormatter(log_format) |
| 224 | + stream_handler.set_name("test_limit_warning") |
| 225 | + logger.addHandler(stream_handler) |
| 226 | + self.lang.fetch(offset=1) |
| 227 | + |
| 228 | + log_contents = log_capture.getvalue() |
| 229 | + log_capture.close() |
| 230 | + |
| 231 | + for handler in logger.handlers: # Clean up handler |
| 232 | + if handler.name == "test_limit_warning": |
| 233 | + logger.removeHandler(handler) |
| 234 | + assert "[WARNING]: Offset set, but no limit." in log_contents |
218 | 235 |
|
219 | 236 | def test_len(self):
|
220 | 237 | """Tests __len__"""
|
|
0 commit comments