From 85a2babb13c8c32e029108736340042c17fcb92a Mon Sep 17 00:00:00 2001 From: John Bucy Date: Thu, 27 Feb 2025 14:50:13 -0800 Subject: [PATCH 01/24] remove SMTP.smtp_DATA() line_fragments smtp_DATA() could buffer an unbounded amount of data in line_fragments until it got crlf and we're going to throw it away anyway. drop TestSMTPWithController.test_long_line_leak which is now moot --- aiosmtpd/smtp.py | 32 +++++++++++++------------------- aiosmtpd/tests/test_smtp.py | 17 ----------------- 2 files changed, 13 insertions(+), 36 deletions(-) diff --git a/aiosmtpd/smtp.py b/aiosmtpd/smtp.py index 3b85fcfb..0cfabedf 100644 --- a/aiosmtpd/smtp.py +++ b/aiosmtpd/smtp.py @@ -1423,7 +1423,6 @@ async def smtp_DATA(self, arg: str) -> None: num_bytes: int = 0 limit: Optional[int] = self.data_size_limit - line_fragments: List[bytes] = [] state: _DataState = _DataState.NOMINAL while self.transport is not None: # pragma: nobranch # Since eof_received cancels this coroutine, @@ -1448,10 +1447,14 @@ async def smtp_DATA(self, arg: str) -> None: # Discard data immediately to prevent memory pressure data *= 0 # Drain the stream anyways + # Note: if the StreamReader buffer ends with a prefix + # of the delimiter, e.consumed does not include that line = await self._reader.read(e.consumed) assert not line.endswith(b'\r\n') + continue + # A lone dot in a line signals the end of DATA. - if not line_fragments and line == b'.\r\n': + if line == b'.\r\n': break num_bytes += len(line) if state == _DataState.NOMINAL and limit and num_bytes > limit: @@ -1460,20 +1463,14 @@ async def smtp_DATA(self, arg: str) -> None: state = _DataState.TOO_MUCH # Discard data immediately to prevent memory pressure data *= 0 - line_fragments.append(line) - if line.endswith(b'\r\n'): - # Record data only if state is "NOMINAL" - if state == _DataState.NOMINAL: - line = EMPTY_BARR.join(line_fragments) - if len(line) > self.line_length_limit: - # Theoretically we shouldn't reach this place. But it's always - # good to practice DEFENSIVE coding. - state = _DataState.TOO_LONG - # Discard data immediately to prevent memory pressure - data *= 0 - else: - data.append(EMPTY_BARR.join(line_fragments)) - line_fragments *= 0 + assert line.endswith(b'\r\n') + assert len(line) <= (self.line_length_limit + 2) + # Record data only if state is "NOMINAL" + if state != _DataState.NOMINAL: + continue + + data.append(bytearray(line)) + # Day of reckoning! Let's take care of those out-of-nominal situations if state != _DataState.NOMINAL: @@ -1484,9 +1481,6 @@ async def smtp_DATA(self, arg: str) -> None: self._set_post_data_state() return - # If unfinished_line is non-empty, then the connection was closed. - assert not line_fragments - # Remove extraneous carriage returns and de-transparency # according to RFC 5321, Section 4.5.2. for text in data: diff --git a/aiosmtpd/tests/test_smtp.py b/aiosmtpd/tests/test_smtp.py index 03f3a315..e1d9b394 100644 --- a/aiosmtpd/tests/test_smtp.py +++ b/aiosmtpd/tests/test_smtp.py @@ -1604,23 +1604,6 @@ def test_long_line_double_count(self, plain_controller, client): client.sendmail("anne@example.com", ["bart@example.com"], mail) assert exc.value.args == S.S500_DATALINE_TOO_LONG - def test_long_line_leak(self, mocker: MockFixture, plain_controller, client): - # Simulates situation where readuntil() does not raise LimitOverrunError, - # but somehow the line_fragments when join()ed resulted in a too-long line - - # Hijack EMPTY_BARR.join() to return a bytes object that's definitely too long - mock_ebarr = mocker.patch("aiosmtpd.smtp.EMPTY_BARR") - mock_ebarr.join.return_value = b"a" * 1010 - - client.helo("example.com") - mail = "z" * 72 # Make sure this is small and definitely within limits - with pytest.raises(SMTPDataError) as exc: - client.sendmail("anne@example.com", ["bart@example.com"], mail) - assert exc.value.args == S.S500_DATALINE_TOO_LONG - # self.assertEqual(cm.exception.smtp_code, 500) - # self.assertEqual(cm.exception.smtp_error, - # b'Line too long (see RFC5321 4.5.3.1.6)') - @controller_data(data_size_limit=20) def test_too_long_body_delay_error(self, plain_controller): with socket.socket() as sock: From d8e9e42c94c71247490a610e9d31fb5fee40bf65 Mon Sep 17 00:00:00 2001 From: John Bucy Date: Thu, 27 Feb 2025 14:40:43 -0800 Subject: [PATCH 02/24] Add new handler hook DATA_CHUNK which is invoked from the data reading loop. DATA_CHUNK takes 3 parameters: data : bytes, decoded_data : Optional[str], last : bool and returns Optional[bytes] response If the hook returns a response prior to the last=True chunk, smtp_DATA will read/discard the remaining data from the client without invoking the hook again. This allows streaming the data to a file or other storage api without having to buffer the whole message in memory first. Move dotstuff and utf8 decode into data reading loop to support this. This may also improve/fix #293 which I suspect is due to the tight loop decoding dotstuff for the whole message at once hogging the GIL in the old implementation. --- aiosmtpd/smtp.py | 77 ++++++++++++++++++++++++++----------- aiosmtpd/testing/helpers.py | 26 ++++++++++++- aiosmtpd/tests/test_smtp.py | 23 ++++++++++- 3 files changed, 101 insertions(+), 25 deletions(-) diff --git a/aiosmtpd/smtp.py b/aiosmtpd/smtp.py index 0cfabedf..3685dada 100644 --- a/aiosmtpd/smtp.py +++ b/aiosmtpd/smtp.py @@ -1420,10 +1420,14 @@ async def smtp_DATA(self, arg: str) -> None: await self.push('354 End data with .') data: List[bytearray] = [] + decoded_lines: List[str] = [] num_bytes: int = 0 limit: Optional[int] = self.data_size_limit state: _DataState = _DataState.NOMINAL + status : Optional[bytes] = None + DOT = ord('.') + while self.transport is not None: # pragma: nobranch # Since eof_received cancels this coroutine, # readuntil() can never raise asyncio.IncompleteReadError. @@ -1441,14 +1445,17 @@ async def smtp_DATA(self, arg: str) -> None: # The line exceeds StreamReader's "stream limit". # Delay SMTP Status Code sending until data receive is complete # This seems to be implied in RFC 5321 § 4.2.5 + + # TODO this (and _handle_client()) will currently read + # an unbounded amount of data from the client looking + # for crlf. Possibly this should return an immediate + # error and close the connection after some limit ~16kb. if state == _DataState.NOMINAL: # Transition to TOO_LONG only if we haven't gone TOO_MUCH yet state = _DataState.TOO_LONG # Discard data immediately to prevent memory pressure data *= 0 # Drain the stream anyways - # Note: if the StreamReader buffer ends with a prefix - # of the delimiter, e.consumed does not include that line = await self._reader.read(e.consumed) assert not line.endswith(b'\r\n') continue @@ -1469,46 +1476,70 @@ async def smtp_DATA(self, arg: str) -> None: if state != _DataState.NOMINAL: continue - data.append(bytearray(line)) + # Remove extraneous carriage returns and de-transparency + # according to RFC 5321, Section 4.5.2. + if line[0] == DOT: + line = line[1:] + decoded_line = None + if self._decode_data: + if self.enable_SMTPUTF8: + decoded_line = line.decode('utf-8', errors='surrogateescape') + else: + try: + decoded_line = line.decode('ascii', errors='strict') + except UnicodeDecodeError: + # This happens if enable_smtputf8 is false, meaning that + # the server explicitly does not want to accept non-ascii, + # but the client ignores that and sends non-ascii anyway. + status = '500 Error: strict ASCII mode' + decoded_lines *= 0 + continue + + if "DATA_CHUNK" in self._handle_hooks: + if status is None: + status = await self._call_handler_hook( + 'DATA_CHUNK', line, decoded_line, False) + else: + data.append(line) + if decoded_line: + decoded_lines.append(decoded_line) # Day of reckoning! Let's take care of those out-of-nominal situations - if state != _DataState.NOMINAL: + if state != _DataState.NOMINAL or status is not None: if state == _DataState.TOO_LONG: await self.push("500 Line too long (see RFC5321 4.5.3.1.6)") elif state == _DataState.TOO_MUCH: # pragma: nobranch await self.push('552 Error: Too much mail data') + elif status is not None: + await self.push(status) + self._set_post_data_state() + return + + + # Call the new API first if it's implemented. + if "DATA_CHUNK" in self._handle_hooks: + if status is None: + status = await self._call_handler_hook( + 'DATA_CHUNK', bytes(), '' if self._decode_data else None, + True) self._set_post_data_state() + await self.push('250 OK' if status is MISSING else status) return - # Remove extraneous carriage returns and de-transparency - # according to RFC 5321, Section 4.5.2. - for text in data: - if text.startswith(b'.'): - del text[0] original_content: bytes = EMPTYBYTES.join(data) # Discard data immediately to prevent memory pressure data *= 0 - content: Union[str, bytes] - if self._decode_data: - if self.enable_SMTPUTF8: - content = original_content.decode('utf-8', errors='surrogateescape') - else: - try: - content = original_content.decode('ascii', errors='strict') - except UnicodeDecodeError: - # This happens if enable_smtputf8 is false, meaning that - # the server explicitly does not want to accept non-ascii, - # but the client ignores that and sends non-ascii anyway. - await self.push('500 Error: strict ASCII mode') - return + if decoded_lines: + content = ''.join(decoded_lines) + decoded_lines *= 0 else: content = original_content + self.envelope.content = content self.envelope.original_content = original_content - # Call the new API first if it's implemented. if "DATA" in self._handle_hooks: status = await self._call_handler_hook('DATA') else: diff --git a/aiosmtpd/testing/helpers.py b/aiosmtpd/testing/helpers.py index 8ee630a7..cb8a189b 100644 --- a/aiosmtpd/testing/helpers.py +++ b/aiosmtpd/testing/helpers.py @@ -10,7 +10,7 @@ import sys import time from smtplib import SMTP as SMTP_Client -from typing import List +from typing import List, Optional from aiosmtpd.smtp import Envelope, Session, SMTP @@ -57,6 +57,30 @@ async def handle_DATA( self.box.append(envelope) return "250 OK" +class ChunkedReceivingHandler: + def __init__(self): + self.box: List[Envelope] = [] + + async def handle_DATA_CHUNK( + self, server: SMTP, session: Session, envelope: Envelope, + data : bytes, text : Optional[str], last : bool + ) -> Optional[str]: + if text is not None: + if envelope.content is None: + envelope.content = '' + envelope.content += text + if envelope.original_content is None: + envelope.original_content = b'' + envelope.original_content += data + else: + if envelope.content is None: + envelope.content = b'' + envelope.content += data + + if last: + self.box.append(envelope) + return "250 OK" + def catchup_delay(delay: float = ASYNCIO_CATCHUP_DELAY): """ diff --git a/aiosmtpd/tests/test_smtp.py b/aiosmtpd/tests/test_smtp.py index e1d9b394..390e831a 100644 --- a/aiosmtpd/tests/test_smtp.py +++ b/aiosmtpd/tests/test_smtp.py @@ -42,6 +42,7 @@ auth_mechanism, ) from aiosmtpd.testing.helpers import ( + ChunkedReceivingHandler, ReceivingHandler, catchup_delay, reset_connection, @@ -462,7 +463,6 @@ def test_empty_email(self, temp_event_loop, transport_resp, get_protocol): assert len(handler.box) == 1 assert handler.box[0].content == b"" - @pytest.mark.usefixtures("plain_controller") @controller_data( decode_data=True, @@ -1661,6 +1661,27 @@ def test_too_long_lines_then_too_long_body(self, plain_controller, client): client.sendmail("anne@example.com", ["bart@example.com"], mail) assert exc.value.args == S.S500_DATALINE_TOO_LONG + @controller_data(decode_data=True) + @handler_data(class_=ChunkedReceivingHandler) + def test_chunked_receiving(self, plain_controller, client): + handler = plain_controller.handler + self._ehlo(client) + client.send(b'MAIL FROM:\r\n') + assert client.getreply() == S.S250_OK + client.send(b'RCPT TO:\r\n') + assert client.getreply() == S.S250_OK + client.send(b'DATA\r\n') + assert client.getreply() == S.S354_DATA_ENDWITH + client.send(b'hello, \r\n') + client.send(b'\xe4\xb8\x96\xe7\x95\x8c!\r\n') + client.send(b'.\r\n') + assert client.getreply() == S.S250_OK + + assert len(handler.box) == 1 + envelope = handler.box[0] + assert envelope.original_content == b'hello, \r\n\xe4\xb8\x96\xe7\x95\x8c!\r\n' + assert envelope.content == 'hello, \r\n世界!\r\n' + class TestCustomization(_CommonMethods): @controller_data(class_=CustomHostnameController) From 36d035c1f5eb68c1357400177d060cb5a61bddf8 Mon Sep 17 00:00:00 2001 From: John Bucy Date: Fri, 28 Feb 2025 10:56:18 -0800 Subject: [PATCH 03/24] only decode data in read loop if we're using new DATA_CHUNK hook otherwise do it at the end as before so we don't ~double the memory while we're reading from the client --- aiosmtpd/smtp.py | 51 ++++++++++++++++++++++++++---------------------- 1 file changed, 28 insertions(+), 23 deletions(-) diff --git a/aiosmtpd/smtp.py b/aiosmtpd/smtp.py index 3685dada..e94dac36 100644 --- a/aiosmtpd/smtp.py +++ b/aiosmtpd/smtp.py @@ -1404,6 +1404,23 @@ async def smtp_RSET(self, arg: str): status = await self._call_handler_hook('RSET') await self.push('250 OK' if status is MISSING else status) + # -> err, decoded data + def _decode_line(self, data : bytes + ) -> Tuple[Optional[str], Optional[str]]: + if not self._decode_data: + return None, None + if self.enable_SMTPUTF8: + return None, data.decode('utf-8', errors='surrogateescape') + else: + try: + return None, data.decode('ascii', errors='strict') + except UnicodeDecodeError: + # This happens if enable_smtputf8 is false, meaning that + # the server explicitly does not want to accept non-ascii, + # but the client ignores that and sends non-ascii anyway. + return '500 Error: strict ASCII mode', None + + @syntax('DATA') async def smtp_DATA(self, arg: str) -> None: if await self.check_helo_needed(): @@ -1419,13 +1436,12 @@ async def smtp_DATA(self, arg: str) -> None: return await self.push('354 End data with .') - data: List[bytearray] = [] - decoded_lines: List[str] = [] + data: List[bytes] = [] num_bytes: int = 0 limit: Optional[int] = self.data_size_limit state: _DataState = _DataState.NOMINAL - status : Optional[bytes] = None + status = None DOT = ord('.') while self.transport is not None: # pragma: nobranch @@ -1480,30 +1496,17 @@ async def smtp_DATA(self, arg: str) -> None: # according to RFC 5321, Section 4.5.2. if line[0] == DOT: line = line[1:] - decoded_line = None - - if self._decode_data: - if self.enable_SMTPUTF8: - decoded_line = line.decode('utf-8', errors='surrogateescape') - else: - try: - decoded_line = line.decode('ascii', errors='strict') - except UnicodeDecodeError: - # This happens if enable_smtputf8 is false, meaning that - # the server explicitly does not want to accept non-ascii, - # but the client ignores that and sends non-ascii anyway. - status = '500 Error: strict ASCII mode' - decoded_lines *= 0 - continue if "DATA_CHUNK" in self._handle_hooks: + if status is None: + status, decoded_line = self._decode_line(line) + if status: + data *= 0 if status is None: status = await self._call_handler_hook( 'DATA_CHUNK', line, decoded_line, False) else: data.append(line) - if decoded_line: - decoded_lines.append(decoded_line) # Day of reckoning! Let's take care of those out-of-nominal situations if state != _DataState.NOMINAL or status is not None: @@ -1531,9 +1534,11 @@ async def smtp_DATA(self, arg: str) -> None: # Discard data immediately to prevent memory pressure data *= 0 content: Union[str, bytes] - if decoded_lines: - content = ''.join(decoded_lines) - decoded_lines *= 0 + if self._decode_data: + status, content = self._decode_line(original_content) + if status: + await self.push(status) + return else: content = original_content From 7e4f59fec976563a163b5f8523d36d39d4e607bc Mon Sep 17 00:00:00 2001 From: John Bucy Date: Tue, 17 Jun 2025 16:04:16 -0700 Subject: [PATCH 04/24] improvements for dotstuff + chunked receiving 1: buffer through BytesIO, not List[bytes], based on a simple microbenchmark, this is about 40% faster 2: when using the DATA_CHUNK hook, buffer SMTP(chunk_size=2**16) before calling the hook, the way this was before was calling the hook for every line --- aiosmtpd/smtp.py | 60 ++++++++++++++++++++++--------------- aiosmtpd/testing/helpers.py | 1 + 2 files changed, 37 insertions(+), 24 deletions(-) diff --git a/aiosmtpd/smtp.py b/aiosmtpd/smtp.py index e94dac36..ef30ad73 100644 --- a/aiosmtpd/smtp.py +++ b/aiosmtpd/smtp.py @@ -32,6 +32,7 @@ Union, ) from warnings import warn +from io import BytesIO import attr from public import public @@ -330,7 +331,8 @@ def __init__( command_call_limit: Union[int, Dict[str, int], None] = None, authenticator: Optional[AuthenticatorType] = None, proxy_protocol_timeout: Optional[Union[int, float]] = None, - loop: Optional[asyncio.AbstractEventLoop] = None + loop: Optional[asyncio.AbstractEventLoop] = None, + chunk_size : Optional[int] = 2**16 ): self.__ident__ = ident or __ident__ self.loop = loop if loop else make_loop() @@ -373,6 +375,7 @@ def __init__( "can lead to security vulnerabilities!") log.warning("auth_required == True but auth_require_tls == False") self._auth_require_tls = auth_require_tls + self._chunk_size = chunk_size if proxy_protocol_timeout is not None: if proxy_protocol_timeout <= 0: @@ -1436,7 +1439,7 @@ async def smtp_DATA(self, arg: str) -> None: return await self.push('354 End data with .') - data: List[bytes] = [] + data = BytesIO() num_bytes: int = 0 limit: Optional[int] = self.data_size_limit @@ -1444,6 +1447,8 @@ async def smtp_DATA(self, arg: str) -> None: status = None DOT = ord('.') + chunking = "DATA_CHUNK" in self._handle_hooks + while self.transport is not None: # pragma: nobranch # Since eof_received cancels this coroutine, # readuntil() can never raise asyncio.IncompleteReadError. @@ -1470,7 +1475,7 @@ async def smtp_DATA(self, arg: str) -> None: # Transition to TOO_LONG only if we haven't gone TOO_MUCH yet state = _DataState.TOO_LONG # Discard data immediately to prevent memory pressure - data *= 0 + data.truncate(0) # Drain the stream anyways line = await self._reader.read(e.consumed) assert not line.endswith(b'\r\n') @@ -1485,7 +1490,7 @@ async def smtp_DATA(self, arg: str) -> None: # This seems to be implied in RFC 5321 § 4.2.5 state = _DataState.TOO_MUCH # Discard data immediately to prevent memory pressure - data *= 0 + data.truncate(0) assert line.endswith(b'\r\n') assert len(line) <= (self.line_length_limit + 2) # Record data only if state is "NOMINAL" @@ -1497,16 +1502,21 @@ async def smtp_DATA(self, arg: str) -> None: if line[0] == DOT: line = line[1:] - if "DATA_CHUNK" in self._handle_hooks: + if not chunking: + data.write(line) + continue + + if data.tell() + len(line) > self._chunk_size: + data.seek(0) + chunk = data.read() + data.truncate(0) + data.seek(0) if status is None: - status, decoded_line = self._decode_line(line) - if status: - data *= 0 + status, decoded_line = self._decode_line(chunk) if status is None: status = await self._call_handler_hook( - 'DATA_CHUNK', line, decoded_line, False) - else: - data.append(line) + 'DATA_CHUNK', chunk, decoded_line, False) + data.write(line) # Day of reckoning! Let's take care of those out-of-nominal situations if state != _DataState.NOMINAL or status is not None: @@ -1519,28 +1529,30 @@ async def smtp_DATA(self, arg: str) -> None: self._set_post_data_state() return + data.seek(0) + original_content: bytes = data.read() + # Discard data immediately to prevent memory pressure + data.truncate(0) + + content: Union[str, bytes, None] = None + if self._decode_data: + status, content = self._decode_line(original_content) + if status: + await self.push(status) + return # Call the new API first if it's implemented. - if "DATA_CHUNK" in self._handle_hooks: + if chunking: if status is None: status = await self._call_handler_hook( - 'DATA_CHUNK', bytes(), '' if self._decode_data else None, - True) + 'DATA_CHUNK', original_content, content, True) self._set_post_data_state() await self.push('250 OK' if status is MISSING else status) return - original_content: bytes = EMPTYBYTES.join(data) - # Discard data immediately to prevent memory pressure - data *= 0 - content: Union[str, bytes] - if self._decode_data: - status, content = self._decode_line(original_content) - if status: - await self.push(status) - return - else: + if not self._decode_data: content = original_content + assert content is not None self.envelope.content = content self.envelope.original_content = original_content diff --git a/aiosmtpd/testing/helpers.py b/aiosmtpd/testing/helpers.py index cb8a189b..368270db 100644 --- a/aiosmtpd/testing/helpers.py +++ b/aiosmtpd/testing/helpers.py @@ -65,6 +65,7 @@ async def handle_DATA_CHUNK( self, server: SMTP, session: Session, envelope: Envelope, data : bytes, text : Optional[str], last : bool ) -> Optional[str]: + assert bool(data) if text is not None: if envelope.content is None: envelope.content = '' From ab8440b9070c4c3c1dcdb70e48d8e72d4205fe64 Mon Sep 17 00:00:00 2001 From: John Bucy Date: Wed, 18 Jun 2025 11:16:39 -0700 Subject: [PATCH 05/24] fix type annotation --- aiosmtpd/smtp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiosmtpd/smtp.py b/aiosmtpd/smtp.py index ef30ad73..69184258 100644 --- a/aiosmtpd/smtp.py +++ b/aiosmtpd/smtp.py @@ -332,7 +332,7 @@ def __init__( authenticator: Optional[AuthenticatorType] = None, proxy_protocol_timeout: Optional[Union[int, float]] = None, loop: Optional[asyncio.AbstractEventLoop] = None, - chunk_size : Optional[int] = 2**16 + chunk_size : int = 2**16 ): self.__ident__ = ident or __ident__ self.loop = loop if loop else make_loop() From 1adebc68e710a9742215557576ad78b3cf6a697b Mon Sep 17 00:00:00 2001 From: John Bucy Date: Wed, 18 Jun 2025 11:47:19 -0700 Subject: [PATCH 06/24] mypy passing --- aiosmtpd/smtp.py | 38 +++++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/aiosmtpd/smtp.py b/aiosmtpd/smtp.py index 69184258..6342c87f 100644 --- a/aiosmtpd/smtp.py +++ b/aiosmtpd/smtp.py @@ -1409,19 +1409,19 @@ async def smtp_RSET(self, arg: str): # -> err, decoded data def _decode_line(self, data : bytes - ) -> Tuple[Optional[str], Optional[str]]: + ) -> Tuple[Union[_Missing, bytes], Optional[str]]: if not self._decode_data: - return None, None + return MISSING, None if self.enable_SMTPUTF8: - return None, data.decode('utf-8', errors='surrogateescape') + return MISSING, data.decode('utf-8', errors='surrogateescape') else: try: - return None, data.decode('ascii', errors='strict') + return MISSING, data.decode('ascii', errors='strict') except UnicodeDecodeError: # This happens if enable_smtputf8 is false, meaning that # the server explicitly does not want to accept non-ascii, # but the client ignores that and sends non-ascii anyway. - return '500 Error: strict ASCII mode', None + return b'500 Error: strict ASCII mode', None @syntax('DATA') @@ -1444,7 +1444,7 @@ async def smtp_DATA(self, arg: str) -> None: num_bytes: int = 0 limit: Optional[int] = self.data_size_limit state: _DataState = _DataState.NOMINAL - status = None + status : Union[_Missing, bytes] = MISSING DOT = ord('.') chunking = "DATA_CHUNK" in self._handle_hooks @@ -1511,20 +1511,21 @@ async def smtp_DATA(self, arg: str) -> None: chunk = data.read() data.truncate(0) data.seek(0) - if status is None: + decoded_line = None + if status is MISSING: status, decoded_line = self._decode_line(chunk) - if status is None: + if status is MISSING: status = await self._call_handler_hook( 'DATA_CHUNK', chunk, decoded_line, False) data.write(line) # Day of reckoning! Let's take care of those out-of-nominal situations - if state != _DataState.NOMINAL or status is not None: + if state != _DataState.NOMINAL or status is not MISSING: if state == _DataState.TOO_LONG: await self.push("500 Line too long (see RFC5321 4.5.3.1.6)") elif state == _DataState.TOO_MUCH: # pragma: nobranch await self.push('552 Error: Too much mail data') - elif status is not None: + elif status is not MISSING: await self.push(status) self._set_post_data_state() return @@ -1537,17 +1538,17 @@ async def smtp_DATA(self, arg: str) -> None: content: Union[str, bytes, None] = None if self._decode_data: status, content = self._decode_line(original_content) - if status: + if status is not MISSING: await self.push(status) return # Call the new API first if it's implemented. if chunking: - if status is None: + if status is MISSING: status = await self._call_handler_hook( 'DATA_CHUNK', original_content, content, True) self._set_post_data_state() - await self.push('250 OK' if status is MISSING else status) + await self.push(b'250 OK' if status is MISSING else status) return if not self._decode_data: @@ -1568,18 +1569,21 @@ async def smtp_DATA(self, arg: str) -> None: assert self.session is not None args = (self.session.peer, self.envelope.mail_from, self.envelope.rcpt_tos, self.envelope.content) + old_status : Union[None, bytes] = None if asyncio.iscoroutinefunction( self.event_handler.process_message): - status = await self.event_handler.process_message(*args) + old_status = await self.event_handler.process_message(*args) else: - status = self.event_handler.process_message(*args) + old_status = self.event_handler.process_message(*args) # The deprecated API can return None which means, return the # default status. Don't worry about coverage for this case as # it's a deprecated API that will go away after 1.0. - if status is None: # pragma: nocover + if old_status is None: # pragma: nocover status = MISSING + else: + status = old_status self._set_post_data_state() - await self.push('250 OK' if status is MISSING else status) + await self.push(b'250 OK' if status is MISSING else status) # Commands that have not been implemented. async def smtp_EXPN(self, arg: str): From ea70fb1e2fb65ab0ea370dc0ea47dd9ab2521679 Mon Sep 17 00:00:00 2001 From: John Bucy Date: Wed, 18 Jun 2025 13:12:58 -0700 Subject: [PATCH 07/24] fix warning: Explicit returns mixed with implicit (fall through) returns --- aiosmtpd/testing/helpers.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/aiosmtpd/testing/helpers.py b/aiosmtpd/testing/helpers.py index 368270db..22306991 100644 --- a/aiosmtpd/testing/helpers.py +++ b/aiosmtpd/testing/helpers.py @@ -78,9 +78,10 @@ async def handle_DATA_CHUNK( envelope.content = b'' envelope.content += data - if last: - self.box.append(envelope) - return "250 OK" + if not last: + return None + self.box.append(envelope) + return "250 OK" def catchup_delay(delay: float = ASYNCIO_CATCHUP_DELAY): From b71377d8ca2b21df54242887ab554bd8a9583edd Mon Sep 17 00:00:00 2001 From: John Bucy Date: Wed, 18 Jun 2025 13:13:36 -0700 Subject: [PATCH 08/24] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 🇺🇦 Sviatoslav Sydorenko (Святослав Сидоренко) --- aiosmtpd/smtp.py | 2 +- aiosmtpd/testing/helpers.py | 2 +- aiosmtpd/tests/test_smtp.py | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/aiosmtpd/smtp.py b/aiosmtpd/smtp.py index 6342c87f..cf170077 100644 --- a/aiosmtpd/smtp.py +++ b/aiosmtpd/smtp.py @@ -1408,7 +1408,7 @@ async def smtp_RSET(self, arg: str): await self.push('250 OK' if status is MISSING else status) # -> err, decoded data - def _decode_line(self, data : bytes + def _decode_line(self, data: bytes ) -> Tuple[Union[_Missing, bytes], Optional[str]]: if not self._decode_data: return MISSING, None diff --git a/aiosmtpd/testing/helpers.py b/aiosmtpd/testing/helpers.py index 368270db..47d6faed 100644 --- a/aiosmtpd/testing/helpers.py +++ b/aiosmtpd/testing/helpers.py @@ -63,7 +63,7 @@ def __init__(self): async def handle_DATA_CHUNK( self, server: SMTP, session: Session, envelope: Envelope, - data : bytes, text : Optional[str], last : bool + data: bytes, text: Optional[str], last: bool ) -> Optional[str]: assert bool(data) if text is not None: diff --git a/aiosmtpd/tests/test_smtp.py b/aiosmtpd/tests/test_smtp.py index 390e831a..b6b42814 100644 --- a/aiosmtpd/tests/test_smtp.py +++ b/aiosmtpd/tests/test_smtp.py @@ -463,6 +463,7 @@ def test_empty_email(self, temp_event_loop, transport_resp, get_protocol): assert len(handler.box) == 1 assert handler.box[0].content == b"" + @pytest.mark.usefixtures("plain_controller") @controller_data( decode_data=True, From 612d26991638ae99bc3e199769445867864472eb Mon Sep 17 00:00:00 2001 From: John Bucy Date: Wed, 18 Jun 2025 13:25:28 -0700 Subject: [PATCH 09/24] fix whitespace, typing --- aiosmtpd/smtp.py | 2 +- aiosmtpd/testing/helpers.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/aiosmtpd/smtp.py b/aiosmtpd/smtp.py index cf170077..5d3f0863 100644 --- a/aiosmtpd/smtp.py +++ b/aiosmtpd/smtp.py @@ -332,7 +332,7 @@ def __init__( authenticator: Optional[AuthenticatorType] = None, proxy_protocol_timeout: Optional[Union[int, float]] = None, loop: Optional[asyncio.AbstractEventLoop] = None, - chunk_size : int = 2**16 + chunk_size: int = 2**16 ): self.__ident__ = ident or __ident__ self.loop = loop if loop else make_loop() diff --git a/aiosmtpd/testing/helpers.py b/aiosmtpd/testing/helpers.py index f8f8e240..49122297 100644 --- a/aiosmtpd/testing/helpers.py +++ b/aiosmtpd/testing/helpers.py @@ -69,6 +69,7 @@ async def handle_DATA_CHUNK( if text is not None: if envelope.content is None: envelope.content = '' + assert isinstance(envelope.content, str) envelope.content += text if envelope.original_content is None: envelope.original_content = b'' @@ -76,6 +77,7 @@ async def handle_DATA_CHUNK( else: if envelope.content is None: envelope.content = b'' + assert isinstance(envelope.content, bytes) envelope.content += data if not last: From ce0a1a754086e5712fc81e9406271ed0f5ab71e5 Mon Sep 17 00:00:00 2001 From: John Bucy Date: Wed, 18 Jun 2025 13:58:17 -0700 Subject: [PATCH 10/24] use a smaller buffer in test to exercise the flush path --- aiosmtpd/tests/test_smtp.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/aiosmtpd/tests/test_smtp.py b/aiosmtpd/tests/test_smtp.py index 446cbd8e..7962cb39 100644 --- a/aiosmtpd/tests/test_smtp.py +++ b/aiosmtpd/tests/test_smtp.py @@ -1664,6 +1664,8 @@ def test_too_long_lines_then_too_long_body(self, plain_controller, client): @controller_data(decode_data=True) @handler_data(class_=ChunkedReceivingHandler) def test_chunked_receiving(self, plain_controller, client): + smtpd: Server = plain_controller.smtpd + smtpd._chunk_size = 10 handler = plain_controller.handler self._ehlo(client) client.send(b'MAIL FROM:\r\n') @@ -1672,8 +1674,8 @@ def test_chunked_receiving(self, plain_controller, client): assert client.getreply() == S.S250_OK client.send(b'DATA\r\n') assert client.getreply() == S.S354_DATA_ENDWITH - client.send(b'hello, \r\n') - client.send(b'\xe4\xb8\x96\xe7\x95\x8c!\r\n') + client.send(b'hello, \r\n') # fits in chunk_size + client.send(b'\xe4\xb8\x96\xe7\x95\x8c!\r\n') # overflow -> flush client.send(b'.\r\n') assert client.getreply() == S.S250_OK From a506f173acf432cc348babd642d07b10983af9ad Mon Sep 17 00:00:00 2001 From: John Bucy Date: Wed, 18 Jun 2025 13:58:40 -0700 Subject: [PATCH 11/24] smtp.py passing mypy This brings up a bit of a discussion point: the way I have it now, the new DATA_CHUNK hook returns Optional[bytes]. The idea is that it will typically return None if last=False unless there was an early-return error but the final call with last=True is always expected to return a value. This can't exactly be expressed with annotations so I needed some asserts to pass mypy. --- aiosmtpd/smtp.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/aiosmtpd/smtp.py b/aiosmtpd/smtp.py index 5d3f0863..f4f14a08 100644 --- a/aiosmtpd/smtp.py +++ b/aiosmtpd/smtp.py @@ -1423,7 +1423,6 @@ def _decode_line(self, data: bytes # but the client ignores that and sends non-ascii anyway. return b'500 Error: strict ASCII mode', None - @syntax('DATA') async def smtp_DATA(self, arg: str) -> None: if await self.check_helo_needed(): @@ -1515,8 +1514,10 @@ async def smtp_DATA(self, arg: str) -> None: if status is MISSING: status, decoded_line = self._decode_line(chunk) if status is MISSING: - status = await self._call_handler_hook( + new_status = await self._call_handler_hook( 'DATA_CHUNK', chunk, decoded_line, False) + if new_status is not None: + status = new_status data.write(line) # Day of reckoning! Let's take care of those out-of-nominal situations @@ -1545,10 +1546,13 @@ async def smtp_DATA(self, arg: str) -> None: # Call the new API first if it's implemented. if chunking: if status is MISSING: - status = await self._call_handler_hook( + new_status = await self._call_handler_hook( 'DATA_CHUNK', original_content, content, True) + assert new_status is not None + status = new_status + assert status is not MISSING self._set_post_data_state() - await self.push(b'250 OK' if status is MISSING else status) + await self.push(status) return if not self._decode_data: @@ -1569,7 +1573,7 @@ async def smtp_DATA(self, arg: str) -> None: assert self.session is not None args = (self.session.peer, self.envelope.mail_from, self.envelope.rcpt_tos, self.envelope.content) - old_status : Union[None, bytes] = None + old_status : Optional[bytes] = None if asyncio.iscoroutinefunction( self.event_handler.process_message): old_status = await self.event_handler.process_message(*args) From 896962fe0219dce74b063d88a121af52f9f6681b Mon Sep 17 00:00:00 2001 From: John Bucy Date: Wed, 18 Jun 2025 15:04:28 -0700 Subject: [PATCH 12/24] coverage: add test where DATA_CHUNK hook returns response prior to last chunk --- aiosmtpd/smtp.py | 10 +++++----- aiosmtpd/testing/helpers.py | 14 +++++++++++--- aiosmtpd/tests/test_smtp.py | 24 +++++++++++++++++++++++- 3 files changed, 39 insertions(+), 9 deletions(-) diff --git a/aiosmtpd/smtp.py b/aiosmtpd/smtp.py index f4f14a08..c569fadf 100644 --- a/aiosmtpd/smtp.py +++ b/aiosmtpd/smtp.py @@ -1545,11 +1545,11 @@ async def smtp_DATA(self, arg: str) -> None: # Call the new API first if it's implemented. if chunking: - if status is MISSING: - new_status = await self._call_handler_hook( - 'DATA_CHUNK', original_content, content, True) - assert new_status is not None - status = new_status + assert status is MISSING # handled above + new_status = await self._call_handler_hook( + 'DATA_CHUNK', original_content, content, True) + assert new_status is not None + status = new_status assert status is not MISSING self._set_post_data_state() await self.push(status) diff --git a/aiosmtpd/testing/helpers.py b/aiosmtpd/testing/helpers.py index 49122297..56394be5 100644 --- a/aiosmtpd/testing/helpers.py +++ b/aiosmtpd/testing/helpers.py @@ -57,14 +57,19 @@ async def handle_DATA( self.box.append(envelope) return "250 OK" + class ChunkedReceivingHandler: def __init__(self): self.box: List[Envelope] = [] + self.response: Optional[str] = '250 OK' + self.respond_last = True + self.sent_response = False async def handle_DATA_CHUNK( self, server: SMTP, session: Session, envelope: Envelope, data: bytes, text: Optional[str], last: bool ) -> Optional[str]: + assert not self.sent_response assert bool(data) if text is not None: if envelope.content is None: @@ -80,10 +85,13 @@ async def handle_DATA_CHUNK( assert isinstance(envelope.content, bytes) envelope.content += data - if not last: + if last: + self.box.append(envelope) + if not last and self.respond_last: return None - self.box.append(envelope) - return "250 OK" + if self.response is not None: + self.sent_response = True + return self.response def catchup_delay(delay: float = ASYNCIO_CATCHUP_DELAY): diff --git a/aiosmtpd/tests/test_smtp.py b/aiosmtpd/tests/test_smtp.py index 7962cb39..8296504f 100644 --- a/aiosmtpd/tests/test_smtp.py +++ b/aiosmtpd/tests/test_smtp.py @@ -1675,7 +1675,7 @@ def test_chunked_receiving(self, plain_controller, client): client.send(b'DATA\r\n') assert client.getreply() == S.S354_DATA_ENDWITH client.send(b'hello, \r\n') # fits in chunk_size - client.send(b'\xe4\xb8\x96\xe7\x95\x8c!\r\n') # overflow -> flush + client.send(b'\xe4\xb8\x96\xe7\x95\x8c!\r\n') # overflow -> flush client.send(b'.\r\n') assert client.getreply() == S.S250_OK @@ -1684,6 +1684,28 @@ def test_chunked_receiving(self, plain_controller, client): assert envelope.original_content == b'hello, \r\n\xe4\xb8\x96\xe7\x95\x8c!\r\n' assert envelope.content == 'hello, \r\n世界!\r\n' + @controller_data(decode_data=True) + @handler_data(class_=ChunkedReceivingHandler) + def test_chunked_receiving_early_err(self, plain_controller, client): + smtpd: Server = plain_controller.smtpd + smtpd._chunk_size = 10 + handler = plain_controller.handler + handler.response = '550 bad' + handler.respond_last = False + self._ehlo(client) + client.send(b'MAIL FROM:\r\n') + assert client.getreply() == S.S250_OK + client.send(b'RCPT TO:\r\n') + assert client.getreply() == S.S250_OK + client.send(b'DATA\r\n') + assert client.getreply() == S.S354_DATA_ENDWITH + client.send(b'hello, \r\n') # fits in chunk_size + client.send(b'\xe4\xb8\x96\xe7\x95\x8c!\r\n') # overflow -> flush + client.send(b'.\r\n') + assert client.getreply() == (550, b'bad') + + assert len(handler.box) == 0 + class TestCustomization(_CommonMethods): @controller_data(class_=CustomHostnameController) From fe02f9843b7b75fd5fe92095305236dee3ea527c Mon Sep 17 00:00:00 2001 From: John Bucy Date: Wed, 18 Jun 2025 15:29:14 -0700 Subject: [PATCH 13/24] coverage: add test for decode error w/chunked receiving --- aiosmtpd/tests/test_smtp.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/aiosmtpd/tests/test_smtp.py b/aiosmtpd/tests/test_smtp.py index 8296504f..6f7eae39 100644 --- a/aiosmtpd/tests/test_smtp.py +++ b/aiosmtpd/tests/test_smtp.py @@ -1701,11 +1701,33 @@ def test_chunked_receiving_early_err(self, plain_controller, client): assert client.getreply() == S.S354_DATA_ENDWITH client.send(b'hello, \r\n') # fits in chunk_size client.send(b'\xe4\xb8\x96\xe7\x95\x8c!\r\n') # overflow -> flush + client.send(b'more data\r\n') client.send(b'.\r\n') assert client.getreply() == (550, b'bad') assert len(handler.box) == 0 + @controller_data(decode_data=True, enable_SMTPUTF8=False) + @handler_data(class_=ChunkedReceivingHandler) + def test_chunked_receiving_decode_err(self, plain_controller, client): + smtpd: Server = plain_controller.smtpd + smtpd._chunk_size = 10 + handler = plain_controller.handler + self._ehlo(client) + client.send(b'MAIL FROM:\r\n') + assert client.getreply() == S.S250_OK + client.send(b'RCPT TO:\r\n') + assert client.getreply() == S.S250_OK + client.send(b'DATA\r\n') + assert client.getreply() == S.S354_DATA_ENDWITH + client.send(b'hello, \r\n') # fits in chunk_size + client.send(b'\xe4\xb8\x96\xe7\x95\x8c!\r\n') # overflow -> flush + client.send(b'more data\r\n') + client.send(b'.\r\n') + assert client.getreply() == S.S500_STRICT_ASCII + + assert len(handler.box) == 0 + class TestCustomization(_CommonMethods): @controller_data(class_=CustomHostnameController) From 5ad19e67e96845e2d0899612dbd3435274806da7 Mon Sep 17 00:00:00 2001 From: John Bucy Date: Wed, 18 Jun 2025 15:42:18 -0700 Subject: [PATCH 14/24] coverage: add test for chunked receiving without decode_data --- aiosmtpd/tests/test_smtp.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/aiosmtpd/tests/test_smtp.py b/aiosmtpd/tests/test_smtp.py index 6f7eae39..443f502a 100644 --- a/aiosmtpd/tests/test_smtp.py +++ b/aiosmtpd/tests/test_smtp.py @@ -1684,6 +1684,29 @@ def test_chunked_receiving(self, plain_controller, client): assert envelope.original_content == b'hello, \r\n\xe4\xb8\x96\xe7\x95\x8c!\r\n' assert envelope.content == 'hello, \r\n世界!\r\n' + @controller_data(decode_data=False) + @handler_data(class_=ChunkedReceivingHandler) + def test_chunked_receiving_no_decode(self, plain_controller, client): + smtpd: Server = plain_controller.smtpd + smtpd._chunk_size = 10 + handler = plain_controller.handler + self._ehlo(client) + client.send(b'MAIL FROM:\r\n') + assert client.getreply() == S.S250_OK + client.send(b'RCPT TO:\r\n') + assert client.getreply() == S.S250_OK + client.send(b'DATA\r\n') + assert client.getreply() == S.S354_DATA_ENDWITH + client.send(b'hello, \r\n') + client.send(b'\xe4\xb8\x96\xe7\x95\x8c!\r\n') + client.send(b'.\r\n') + assert client.getreply() == S.S250_OK + + assert len(handler.box) == 1 + envelope = handler.box[0] + assert envelope.content == b'hello, \r\n\xe4\xb8\x96\xe7\x95\x8c!\r\n' + assert envelope.original_content is None + @controller_data(decode_data=True) @handler_data(class_=ChunkedReceivingHandler) def test_chunked_receiving_early_err(self, plain_controller, client): From b1da84349cd43255cb9253779c35d834448593db Mon Sep 17 00:00:00 2001 From: John Bucy Date: Thu, 19 Jun 2025 14:20:08 -0700 Subject: [PATCH 15/24] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 🇺🇦 Sviatoslav Sydorenko (Святослав Сидоренко) --- aiosmtpd/smtp.py | 2 +- aiosmtpd/testing/helpers.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/aiosmtpd/smtp.py b/aiosmtpd/smtp.py index c569fadf..3525abc2 100644 --- a/aiosmtpd/smtp.py +++ b/aiosmtpd/smtp.py @@ -332,7 +332,7 @@ def __init__( authenticator: Optional[AuthenticatorType] = None, proxy_protocol_timeout: Optional[Union[int, float]] = None, loop: Optional[asyncio.AbstractEventLoop] = None, - chunk_size: int = 2**16 + chunk_size: int = 2**16, ): self.__ident__ = ident or __ident__ self.loop = loop if loop else make_loop() diff --git a/aiosmtpd/testing/helpers.py b/aiosmtpd/testing/helpers.py index 56394be5..9c857795 100644 --- a/aiosmtpd/testing/helpers.py +++ b/aiosmtpd/testing/helpers.py @@ -67,7 +67,7 @@ def __init__(self): async def handle_DATA_CHUNK( self, server: SMTP, session: Session, envelope: Envelope, - data: bytes, text: Optional[str], last: bool + data: bytes, text: Optional[str], last: bool, ) -> Optional[str]: assert not self.sent_response assert bool(data) From 83ba510e5aae41fd01058aae90abc5adfa6d8bff Mon Sep 17 00:00:00 2001 From: John Bucy Date: Mon, 23 Jun 2025 13:55:38 -0700 Subject: [PATCH 16/24] data_chunk hook: invoke hook for DATA response this is probably not common but gives the handler implementation another opportunity to preempt the data transfer in case of an error --- aiosmtpd/smtp.py | 13 ++++++++++--- aiosmtpd/testing/helpers.py | 11 ++++------- aiosmtpd/tests/test_smtp.py | 25 ++++++++++++++++++++++--- 3 files changed, 36 insertions(+), 13 deletions(-) diff --git a/aiosmtpd/smtp.py b/aiosmtpd/smtp.py index c569fadf..7e970335 100644 --- a/aiosmtpd/smtp.py +++ b/aiosmtpd/smtp.py @@ -1437,7 +1437,16 @@ async def smtp_DATA(self, arg: str) -> None: await self.push('501 Syntax: DATA') return - await self.push('354 End data with .') + chunking = "DATA_CHUNK" in self._handle_hooks + + status = '354 End data with .' + if chunking and ((s := await self._call_handler_hook( + 'DATA_CHUNK', b'', + '' if self._decode_data else None, False)) is not None): + status = s + await self.push(status) + if status[0:3] != '354': + return data = BytesIO() num_bytes: int = 0 @@ -1446,8 +1455,6 @@ async def smtp_DATA(self, arg: str) -> None: status : Union[_Missing, bytes] = MISSING DOT = ord('.') - chunking = "DATA_CHUNK" in self._handle_hooks - while self.transport is not None: # pragma: nobranch # Since eof_received cancels this coroutine, # readuntil() can never raise asyncio.IncompleteReadError. diff --git a/aiosmtpd/testing/helpers.py b/aiosmtpd/testing/helpers.py index 56394be5..5260bd68 100644 --- a/aiosmtpd/testing/helpers.py +++ b/aiosmtpd/testing/helpers.py @@ -61,8 +61,7 @@ async def handle_DATA( class ChunkedReceivingHandler: def __init__(self): self.box: List[Envelope] = [] - self.response: Optional[str] = '250 OK' - self.respond_last = True + self.responses: List[str] = [None, '250 OK'] self.sent_response = False async def handle_DATA_CHUNK( @@ -70,7 +69,6 @@ async def handle_DATA_CHUNK( data: bytes, text: Optional[str], last: bool ) -> Optional[str]: assert not self.sent_response - assert bool(data) if text is not None: if envelope.content is None: envelope.content = '' @@ -87,11 +85,10 @@ async def handle_DATA_CHUNK( if last: self.box.append(envelope) - if not last and self.respond_last: - return None - if self.response is not None: + resp = self.responses.pop(0) + if resp is not None: self.sent_response = True - return self.response + return resp def catchup_delay(delay: float = ASYNCIO_CATCHUP_DELAY): diff --git a/aiosmtpd/tests/test_smtp.py b/aiosmtpd/tests/test_smtp.py index 443f502a..38a3b902 100644 --- a/aiosmtpd/tests/test_smtp.py +++ b/aiosmtpd/tests/test_smtp.py @@ -1667,6 +1667,7 @@ def test_chunked_receiving(self, plain_controller, client): smtpd: Server = plain_controller.smtpd smtpd._chunk_size = 10 handler = plain_controller.handler + handler.responses = [None, None, '250 OK'] self._ehlo(client) client.send(b'MAIL FROM:\r\n') assert client.getreply() == S.S250_OK @@ -1690,6 +1691,7 @@ def test_chunked_receiving_no_decode(self, plain_controller, client): smtpd: Server = plain_controller.smtpd smtpd._chunk_size = 10 handler = plain_controller.handler + handler.responses = [None, None, '250 OK'] self._ehlo(client) client.send(b'MAIL FROM:\r\n') assert client.getreply() == S.S250_OK @@ -1709,12 +1711,28 @@ def test_chunked_receiving_no_decode(self, plain_controller, client): @controller_data(decode_data=True) @handler_data(class_=ChunkedReceivingHandler) - def test_chunked_receiving_early_err(self, plain_controller, client): + def test_chunked_receiving_data_response_err(self, plain_controller, client): smtpd: Server = plain_controller.smtpd smtpd._chunk_size = 10 handler = plain_controller.handler - handler.response = '550 bad' - handler.respond_last = False + handler.responses = ['550 bad'] + self._ehlo(client) + client.send(b'MAIL FROM:\r\n') + assert client.getreply() == S.S250_OK + client.send(b'RCPT TO:\r\n') + assert client.getreply() == S.S250_OK + client.send(b'DATA\r\n') + assert client.getreply() == (550, b'bad') + + assert len(handler.box) == 0 + + @controller_data(decode_data=True) + @handler_data(class_=ChunkedReceivingHandler) + def test_chunked_receiving_non_last_err(self, plain_controller, client): + smtpd: Server = plain_controller.smtpd + smtpd._chunk_size = 10 + handler = plain_controller.handler + handler.responses = [None, '550 bad'] self._ehlo(client) client.send(b'MAIL FROM:\r\n') assert client.getreply() == S.S250_OK @@ -1736,6 +1754,7 @@ def test_chunked_receiving_decode_err(self, plain_controller, client): smtpd: Server = plain_controller.smtpd smtpd._chunk_size = 10 handler = plain_controller.handler + handler.responses = [None, None] self._ehlo(client) client.send(b'MAIL FROM:\r\n') assert client.getreply() == S.S250_OK From fbf99c565acada499b2ac47b57292c961889199b Mon Sep 17 00:00:00 2001 From: John Bucy Date: Mon, 23 Jun 2025 14:16:53 -0700 Subject: [PATCH 17/24] coverage: add coverage for legacy process_message handler with/without response these changes increased the scope of the status variable in SMTP.smtp_DATA() that entailed an extra test of status is None in the backward-compatible path and it was easier to add the coverage than untangle that --- aiosmtpd/smtp.py | 14 ++++++-------- aiosmtpd/tests/test_handlers.py | 14 ++++++++++++-- aiosmtpd/tests/test_smtp.py | 1 - 3 files changed, 18 insertions(+), 11 deletions(-) diff --git a/aiosmtpd/smtp.py b/aiosmtpd/smtp.py index 7e970335..a3288916 100644 --- a/aiosmtpd/smtp.py +++ b/aiosmtpd/smtp.py @@ -1520,11 +1520,10 @@ async def smtp_DATA(self, arg: str) -> None: decoded_line = None if status is MISSING: status, decoded_line = self._decode_line(chunk) - if status is MISSING: - new_status = await self._call_handler_hook( - 'DATA_CHUNK', chunk, decoded_line, False) - if new_status is not None: - status = new_status + if status is MISSING and ( + (s := await self._call_handler_hook( + 'DATA_CHUNK', chunk, decoded_line, False)) is not None): + status = s data.write(line) # Day of reckoning! Let's take care of those out-of-nominal situations @@ -1587,9 +1586,8 @@ async def smtp_DATA(self, arg: str) -> None: else: old_status = self.event_handler.process_message(*args) # The deprecated API can return None which means, return the - # default status. Don't worry about coverage for this case as - # it's a deprecated API that will go away after 1.0. - if old_status is None: # pragma: nocover + # default status. + if old_status is None: status = MISSING else: status = old_status diff --git a/aiosmtpd/tests/test_handlers.py b/aiosmtpd/tests/test_handlers.py index 392689db..57e9cb6a 100644 --- a/aiosmtpd/tests/test_handlers.py +++ b/aiosmtpd/tests/test_handlers.py @@ -187,9 +187,10 @@ def factory(self): class DeprecatedHandler: + def __init__(self): + self.response: Optional[str] = None def process_message(self, peer, mailfrom, rcpttos, data, **kws): - pass - + return self.response class AsyncDeprecatedHandler: async def process_message(self, peer, mailfrom, rcpttos, data, **kws): @@ -986,11 +987,20 @@ def _process_message_testing(self, controller, client): ), ) + @handler_data(class_=DeprecatedHandler) + def test_process_message_no_response(self, plain_controller, client): + """handler.process_message is Deprecated""" + handler = plain_controller.handler + assert isinstance(handler, DeprecatedHandler) + controller = plain_controller + self._process_message_testing(controller, client) + @handler_data(class_=DeprecatedHandler) def test_process_message(self, plain_controller, client): """handler.process_message is Deprecated""" handler = plain_controller.handler assert isinstance(handler, DeprecatedHandler) + handler.response = '250 ok' controller = plain_controller self._process_message_testing(controller, client) diff --git a/aiosmtpd/tests/test_smtp.py b/aiosmtpd/tests/test_smtp.py index 38a3b902..3b2db4f1 100644 --- a/aiosmtpd/tests/test_smtp.py +++ b/aiosmtpd/tests/test_smtp.py @@ -1723,7 +1723,6 @@ def test_chunked_receiving_data_response_err(self, plain_controller, client): assert client.getreply() == S.S250_OK client.send(b'DATA\r\n') assert client.getreply() == (550, b'bad') - assert len(handler.box) == 0 @controller_data(decode_data=True) From 5bb73cffe75790abf5f2d71288c49488deb710ba Mon Sep 17 00:00:00 2001 From: John Bucy Date: Mon, 23 Jun 2025 16:06:01 -0700 Subject: [PATCH 18/24] fix mypy --- aiosmtpd/smtp.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/aiosmtpd/smtp.py b/aiosmtpd/smtp.py index a3288916..66c84ea7 100644 --- a/aiosmtpd/smtp.py +++ b/aiosmtpd/smtp.py @@ -1439,13 +1439,13 @@ async def smtp_DATA(self, arg: str) -> None: chunking = "DATA_CHUNK" in self._handle_hooks - status = '354 End data with .' + data_status = '354 End data with .' if chunking and ((s := await self._call_handler_hook( 'DATA_CHUNK', b'', '' if self._decode_data else None, False)) is not None): - status = s - await self.push(status) - if status[0:3] != '354': + data_status = s + await self.push(data_status) + if data_status[0:3] != '354': return data = BytesIO() From f095bc377230296ab7202e1e5a36c5bebd0ecd13 Mon Sep 17 00:00:00 2001 From: John Bucy Date: Mon, 23 Jun 2025 16:06:30 -0700 Subject: [PATCH 19/24] DATA_CHUNK hook: reset timeout on every read I have encountered situations in the wild where someone was trying to send a large message over a slow connection that would require a very large static timeout (>hour) to accomodate. This way you can keep the timeout reasonably short (minutes) and it can take as long as it takes as long as the client keeps making forward progress. We could make this available to the current api under control of a flag but I don't want to unexpectedly change the behavior for exiseting users. --- aiosmtpd/smtp.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/aiosmtpd/smtp.py b/aiosmtpd/smtp.py index 66c84ea7..3915fbab 100644 --- a/aiosmtpd/smtp.py +++ b/aiosmtpd/smtp.py @@ -1512,6 +1512,13 @@ async def smtp_DATA(self, arg: str) -> None: data.write(line) continue + # reset timeout on every read so it can take as long as it + # takes as long as the client keeps making forward + # progress + # TODO we could do this for !chunking under control of a + # flag so as not to change the behavior unexpectedly? + self._reset_timeout() + if data.tell() + len(line) > self._chunk_size: data.seek(0) chunk = data.read() From 708f0812c979510a32483af38d801469234267e3 Mon Sep 17 00:00:00 2001 From: John Bucy Date: Mon, 23 Jun 2025 16:24:58 -0700 Subject: [PATCH 20/24] fix annotation in ChunkedReceivingHandler --- aiosmtpd/testing/helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiosmtpd/testing/helpers.py b/aiosmtpd/testing/helpers.py index 8e4221f0..d8ff6ae0 100644 --- a/aiosmtpd/testing/helpers.py +++ b/aiosmtpd/testing/helpers.py @@ -61,7 +61,7 @@ async def handle_DATA( class ChunkedReceivingHandler: def __init__(self): self.box: List[Envelope] = [] - self.responses: List[str] = [None, '250 OK'] + self.responses: List[Optional[str]] = [None, '250 OK'] self.sent_response = False async def handle_DATA_CHUNK( From 08ab7f24809ae4dab8fd31fd2a6cbb0449892170 Mon Sep 17 00:00:00 2001 From: John Bucy Date: Mon, 23 Jun 2025 16:30:48 -0700 Subject: [PATCH 21/24] add missing import --- aiosmtpd/tests/test_handlers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiosmtpd/tests/test_handlers.py b/aiosmtpd/tests/test_handlers.py index 57e9cb6a..6f73016b 100644 --- a/aiosmtpd/tests/test_handlers.py +++ b/aiosmtpd/tests/test_handlers.py @@ -11,7 +11,7 @@ from smtplib import SMTPDataError, SMTPRecipientsRefused from textwrap import dedent from types import SimpleNamespace -from typing import AnyStr, Callable, Generator, Type, TypeVar, Union +from typing import AnyStr, Callable, Generator, Optional, Type, TypeVar, Union import pytest From bac5e11ec747a82784a895c2d7835ae8237714d3 Mon Sep 17 00:00:00 2001 From: John Bucy Date: Tue, 24 Jun 2025 12:06:43 -0700 Subject: [PATCH 22/24] fix lint --- aiosmtpd/tests/test_handlers.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/aiosmtpd/tests/test_handlers.py b/aiosmtpd/tests/test_handlers.py index 6f73016b..f4c2618e 100644 --- a/aiosmtpd/tests/test_handlers.py +++ b/aiosmtpd/tests/test_handlers.py @@ -189,9 +189,11 @@ def factory(self): class DeprecatedHandler: def __init__(self): self.response: Optional[str] = None + def process_message(self, peer, mailfrom, rcpttos, data, **kws): return self.response + class AsyncDeprecatedHandler: async def process_message(self, peer, mailfrom, rcpttos, data, **kws): pass From 1307ede613318fed698b0a1ac46e4cd588195001 Mon Sep 17 00:00:00 2001 From: John Bucy Date: Tue, 24 Jun 2025 14:10:36 -0700 Subject: [PATCH 23/24] update docs/NEWS --- aiosmtpd/docs/NEWS.rst | 6 ++++++ aiosmtpd/docs/handlers.rst | 29 +++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/aiosmtpd/docs/NEWS.rst b/aiosmtpd/docs/NEWS.rst index c2e51f30..6f94758c 100644 --- a/aiosmtpd/docs/NEWS.rst +++ b/aiosmtpd/docs/NEWS.rst @@ -12,6 +12,12 @@ Fixed/Improved * Dropped Python 3.8, PyPy 3.8 * Added PyPy 3.9 +* buffering improvements to ``smtp_DATA()`` + +Added +----- +* ``handle_DATA_CHUNK()`` hook: chunked data receiving to avoid +buffering the entire message in memory while it is being received 1.4.6 (2024-05-18) diff --git a/aiosmtpd/docs/handlers.rst b/aiosmtpd/docs/handlers.rst index 3bd37f79..6b89bbca 100644 --- a/aiosmtpd/docs/handlers.rst +++ b/aiosmtpd/docs/handlers.rst @@ -98,6 +98,35 @@ The following hooks are currently supported (in alphabetical order): ``decode_data=False`` or ``decode_data=True``. See :attr:`Envelope.content` for more info. +.. py:method:: handle_DATA_CHUNK(server, session, envelope, data: bytes, text: Optional[str], last: bool) -> Optional[str] + :async: + + :return: Response message to be sent to the client + + Alternative to handle_DATA(), under active development, subject to change. + + Called periodically throughout ``DATA`` as the message (`"SMTP content" + `_ as described in + RFC 5321) is received. + + The content is passed to ``data`` as type ``bytes``, + normalized according to the transparency rules + as defined in :rfc:`RFC 5321, §4.5.2 <5321#section-4.5.2>`. + + If :class:`~aiosmtpd.smtp.SMTP` was instantiated with + ``decode_data=True``, the decoded text will be passed to ``text`` + as a python string. + + ``last`` will be ``False`` prior to the final call. The handler MAY + return a non-``None`` response prior to the ``last=True`` + call. This is treated as an error and terminates the + transaction. The handler will not be invoked again after a + non-``None`` response. Otherwise, the handler MUST return a + non-``None`` response to the ``last=True`` call. + + :class:`~aiosmtpd.smtp.SMTP` buffers data per the ``chunk_size`` + parameter. The hook may be invoked with an empty chunk at any time. + .. py:method:: handle_EHLO(server, session, envelope, hostname, responses) -> List[str] :async: :noindex: From 547bf02ffa10e199146b54300d77f41d7d56326f Mon Sep 17 00:00:00 2001 From: John Bucy Date: Tue, 24 Jun 2025 14:23:34 -0700 Subject: [PATCH 24/24] fix NEWS --- aiosmtpd/docs/NEWS.rst | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/aiosmtpd/docs/NEWS.rst b/aiosmtpd/docs/NEWS.rst index 6f94758c..e4af422b 100644 --- a/aiosmtpd/docs/NEWS.rst +++ b/aiosmtpd/docs/NEWS.rst @@ -12,13 +12,12 @@ Fixed/Improved * Dropped Python 3.8, PyPy 3.8 * Added PyPy 3.9 -* buffering improvements to ``smtp_DATA()`` +* buffering improvements to ``smtp_DATA()`` (should improve/fix #293) Added ----- -* ``handle_DATA_CHUNK()`` hook: chunked data receiving to avoid -buffering the entire message in memory while it is being received +* ``handle_DATA_CHUNK()`` hook: chunked data receiving to avoid buffering the entire message in memory while it is being received 1.4.6 (2024-05-18) ==================