Skip to content

Commit b76e546

Browse files
Merge pull request #7 from DACCS-Climate/support-pipes
support reading log files from pipe-like files
2 parents f3810d9 + bbb1b0b commit b76e546

File tree

3 files changed

+21
-3
lines changed

3 files changed

+21
-3
lines changed

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ asyncio.run(my_coroutine())
7373

7474
`track_file` has the following parameters:
7575

76-
- `log_file`: path to a log file to track
76+
- `log_file`: path to a log file to track. This can be a regular file, stream (e.g. stdout/stderr), or named FIFO pipe.
7777
- `line_parsers` a list of [line parsers](#what-is-a-line-parser)
7878
- `poll_delay`: (see [parameters](#track-parameters) for `track` and `track_async`)
7979
- `tail`: (see [parameters](#track-parameters) for `track` and `track_async`)
@@ -237,7 +237,8 @@ these scenarios in the following ways:
237237

238238
#### How do I track logs that haven't been written to a file?
239239

240-
Log Parser currently only supports reading logs from files. We'd like to support other log sources in the future though.
240+
Log Parser currently only supports reading logs from files and named pipes (such as the stdout and stderr streams).
241+
We'd like to support other log sources in the future though.
241242
Feel free to [contribute](#development) to this project if you want to speed up support for this feature.
242243

243244
## Extensions

log_parser/log_parser.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ async def _check_file_state(log_io: AsyncFile[str]) -> _FileState:
4040
return _FileState.DELETED
4141
file_stat = os.stat(log_io.fileno())
4242
if same_name_file_stat == file_stat:
43+
if not log_io.seekable():
44+
return _FileState.NOCHANGE
4345
if await log_io.tell() > file_stat.st_size:
4446
return _FileState.TRUNCATED
4547
else:
@@ -79,7 +81,7 @@ async def track_file(
7981
log_io = await open_file(log_file)
8082
logger.debug(f"file '{log_file}' opened for reading")
8183
try:
82-
if tail:
84+
if tail and log_io.seekable():
8385
await log_io.seek(0, os.SEEK_END)
8486
while True:
8587
file_state = await _check_file_state(log_io)

test/test_log_parser.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import asyncio
33
import inspect
44
import itertools
5+
import os
56
from contextlib import asynccontextmanager
67

78
import anyio
@@ -28,6 +29,12 @@ async def tmp_log(self, tmp_path):
2829
await log_file.touch()
2930
return log_file
3031

32+
@pytest.fixture
33+
async def tmp_log_pipe(self, tmp_path):
34+
log_file = anyio.Path(tmp_path / "test.log")
35+
os.mkfifo(log_file)
36+
return log_file
37+
3138
@staticmethod
3239
async def delayed_write(file, text, delay, mode="w"):
3340
await asyncio.sleep(delay)
@@ -162,6 +169,14 @@ async def test_file_contains_line_mixed_line_parsers(self, tmp_log):
162169
)
163170
assert output == text * 2
164171

172+
@pytest.mark.parametrize("text", [["test 123"], ["test 123", "test 456"]])
173+
async def test_pipe_contains_lines(self, tmp_log_pipe, text):
174+
output = []
175+
async with silent_timeout(1.1), asyncio.TaskGroup() as tg:
176+
tg.create_task(log_parser.track_file(str(tmp_log_pipe), [self.basic_line_parser(output)]))
177+
tg.create_task(self.delayed_write(tmp_log_pipe, "\n".join(text), 0.1))
178+
assert output == text
179+
165180

166181
class _TrackTests(abc.ABC):
167182
@abc.abstractmethod

0 commit comments

Comments
 (0)