Skip to content
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
85a2bab
remove SMTP.smtp_DATA() line_fragments
jsbucy Feb 27, 2025
d8e9e42
Add new handler hook DATA_CHUNK which is invoked from the data readin…
jsbucy Feb 27, 2025
36d035c
only decode data in read loop if we're using new DATA_CHUNK hook
jsbucy Feb 28, 2025
e2c4f72
Merge branch 'master' into data_chunk3
jsbucy Feb 28, 2025
a9162af
Merge branch 'master' into data_chunk3
jsbucy Jun 17, 2025
7e4f59f
improvements for dotstuff + chunked receiving
jsbucy Jun 17, 2025
c99b486
Merge branch 'data_chunk3' of ssh://github.com/jsbucy/aiosmtpd into d…
jsbucy Jun 17, 2025
ab8440b
fix type annotation
jsbucy Jun 18, 2025
1adebc6
mypy passing
jsbucy Jun 18, 2025
f0398d7
Merge branch 'master' into data_chunk3
jsbucy Jun 18, 2025
ea70fb1
fix warning: Explicit returns mixed with implicit (fall through) returns
jsbucy Jun 18, 2025
b71377d
Apply suggestions from code review
jsbucy Jun 18, 2025
58dbe07
Merge branch 'data_chunk3' of ssh://github.com/jsbucy/aiosmtpd into d…
jsbucy Jun 18, 2025
612d269
fix whitespace, typing
jsbucy Jun 18, 2025
ce0a1a7
use a smaller buffer in test to exercise the flush path
jsbucy Jun 18, 2025
a506f17
smtp.py passing mypy
jsbucy Jun 18, 2025
896962f
coverage: add test where DATA_CHUNK hook returns response prior to la…
jsbucy Jun 18, 2025
fe02f98
coverage: add test for decode error w/chunked receiving
jsbucy Jun 18, 2025
5ad19e6
coverage: add test for chunked receiving without decode_data
jsbucy Jun 18, 2025
b1da843
Apply suggestions from code review
jsbucy Jun 19, 2025
83ba510
data_chunk hook: invoke hook for DATA response
jsbucy Jun 23, 2025
fbf99c5
coverage: add coverage for legacy process_message handler with/withou…
jsbucy Jun 23, 2025
5bb73cf
fix mypy
jsbucy Jun 23, 2025
f095bc3
DATA_CHUNK hook: reset timeout on every read
jsbucy Jun 23, 2025
f748fbf
Merge branch 'data_chunk3' of ssh://github.com/jsbucy/aiosmtpd into d…
jsbucy Jun 23, 2025
708f081
fix annotation in ChunkedReceivingHandler
jsbucy Jun 23, 2025
08ab7f2
add missing import
jsbucy Jun 23, 2025
bac5e11
fix lint
jsbucy Jun 24, 2025
1307ede
update docs/NEWS
jsbucy Jun 24, 2025
547bf02
fix NEWS
jsbucy Jun 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 99 additions & 49 deletions aiosmtpd/smtp.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
Union,
)
from warnings import warn
from io import BytesIO

import attr
from public import public
Expand Down Expand Up @@ -330,7 +331,8 @@ def __init__(
command_call_limit: Union[int, Dict[str, int], None] = None,
authenticator: Optional[AuthenticatorType] = None,
proxy_protocol_timeout: Optional[Union[int, float]] = None,
loop: Optional[asyncio.AbstractEventLoop] = None
loop: Optional[asyncio.AbstractEventLoop] = None,
chunk_size: int = 2**16
):
self.__ident__ = ident or __ident__
self.loop = loop if loop else make_loop()
Expand Down Expand Up @@ -373,6 +375,7 @@ def __init__(
"can lead to security vulnerabilities!")
log.warning("auth_required == True but auth_require_tls == False")
self._auth_require_tls = auth_require_tls
self._chunk_size = chunk_size

if proxy_protocol_timeout is not None:
if proxy_protocol_timeout <= 0:
Expand Down Expand Up @@ -1404,6 +1407,22 @@ async def smtp_RSET(self, arg: str):
status = await self._call_handler_hook('RSET')
await self.push('250 OK' if status is MISSING else status)

# -> err, decoded data
def _decode_line(self, data: bytes
) -> Tuple[Union[_Missing, bytes], Optional[str]]:
if not self._decode_data:
return MISSING, None
if self.enable_SMTPUTF8:
return MISSING, data.decode('utf-8', errors='surrogateescape')
else:
try:
return MISSING, data.decode('ascii', errors='strict')
except UnicodeDecodeError:
# This happens if enable_smtputf8 is false, meaning that
# the server explicitly does not want to accept non-ascii,
# but the client ignores that and sends non-ascii anyway.
return b'500 Error: strict ASCII mode', None

@syntax('DATA')
async def smtp_DATA(self, arg: str) -> None:
if await self.check_helo_needed():
Expand All @@ -1419,12 +1438,16 @@ async def smtp_DATA(self, arg: str) -> None:
return

await self.push('354 End data with <CR><LF>.<CR><LF>')
data: List[bytearray] = []
data = BytesIO()

num_bytes: int = 0
limit: Optional[int] = self.data_size_limit
line_fragments: List[bytes] = []
state: _DataState = _DataState.NOMINAL
status : Union[_Missing, bytes] = MISSING
DOT = ord('.')

chunking = "DATA_CHUNK" in self._handle_hooks

while self.transport is not None: # pragma: nobranch
# Since eof_received cancels this coroutine,
# readuntil() can never raise asyncio.IncompleteReadError.
Expand All @@ -1442,79 +1465,103 @@ async def smtp_DATA(self, arg: str) -> None:
# The line exceeds StreamReader's "stream limit".
# Delay SMTP Status Code sending until data receive is complete
# This seems to be implied in RFC 5321 § 4.2.5

# TODO this (and _handle_client()) will currently read
# an unbounded amount of data from the client looking
# for crlf. Possibly this should return an immediate
# error and close the connection after some limit ~16kb.
if state == _DataState.NOMINAL:
# Transition to TOO_LONG only if we haven't gone TOO_MUCH yet
state = _DataState.TOO_LONG
# Discard data immediately to prevent memory pressure
data *= 0
data.truncate(0)
# Drain the stream anyways
line = await self._reader.read(e.consumed)
assert not line.endswith(b'\r\n')
continue

# A lone dot in a line signals the end of DATA.
if not line_fragments and line == b'.\r\n':
if line == b'.\r\n':
break
num_bytes += len(line)
if state == _DataState.NOMINAL and limit and num_bytes > limit:
# Delay SMTP Status Code sending until data receive is complete
# This seems to be implied in RFC 5321 § 4.2.5
state = _DataState.TOO_MUCH
# Discard data immediately to prevent memory pressure
data *= 0
line_fragments.append(line)
if line.endswith(b'\r\n'):
# Record data only if state is "NOMINAL"
if state == _DataState.NOMINAL:
line = EMPTY_BARR.join(line_fragments)
if len(line) > self.line_length_limit:
# Theoretically we shouldn't reach this place. But it's always
# good to practice DEFENSIVE coding.
state = _DataState.TOO_LONG
# Discard data immediately to prevent memory pressure
data *= 0
else:
data.append(EMPTY_BARR.join(line_fragments))
line_fragments *= 0
data.truncate(0)
assert line.endswith(b'\r\n')
assert len(line) <= (self.line_length_limit + 2)
# Record data only if state is "NOMINAL"
if state != _DataState.NOMINAL:
continue

# Remove extraneous carriage returns and de-transparency
# according to RFC 5321, Section 4.5.2.
if line[0] == DOT:
line = line[1:]

if not chunking:
data.write(line)
continue

if data.tell() + len(line) > self._chunk_size:
data.seek(0)
chunk = data.read()
data.truncate(0)
data.seek(0)
decoded_line = None
if status is MISSING:
status, decoded_line = self._decode_line(chunk)
if status is MISSING:
new_status = await self._call_handler_hook(
'DATA_CHUNK', chunk, decoded_line, False)
if new_status is not None:
status = new_status
data.write(line)

# Day of reckoning! Let's take care of those out-of-nominal situations
if state != _DataState.NOMINAL:
if state != _DataState.NOMINAL or status is not MISSING:
if state == _DataState.TOO_LONG:
await self.push("500 Line too long (see RFC5321 4.5.3.1.6)")
elif state == _DataState.TOO_MUCH: # pragma: nobranch
await self.push('552 Error: Too much mail data')
elif status is not MISSING:
await self.push(status)
self._set_post_data_state()
return

# If unfinished_line is non-empty, then the connection was closed.
assert not line_fragments

# Remove extraneous carriage returns and de-transparency
# according to RFC 5321, Section 4.5.2.
for text in data:
if text.startswith(b'.'):
del text[0]
original_content: bytes = EMPTYBYTES.join(data)
data.seek(0)
original_content: bytes = data.read()
# Discard data immediately to prevent memory pressure
data *= 0
data.truncate(0)

content: Union[str, bytes]
content: Union[str, bytes, None] = None
if self._decode_data:
if self.enable_SMTPUTF8:
content = original_content.decode('utf-8', errors='surrogateescape')
else:
try:
content = original_content.decode('ascii', errors='strict')
except UnicodeDecodeError:
# This happens if enable_smtputf8 is false, meaning that
# the server explicitly does not want to accept non-ascii,
# but the client ignores that and sends non-ascii anyway.
await self.push('500 Error: strict ASCII mode')
return
else:
status, content = self._decode_line(original_content)
if status is not MISSING:
await self.push(status)
return

# Call the new API first if it's implemented.
if chunking:
assert status is MISSING # handled above
new_status = await self._call_handler_hook(
'DATA_CHUNK', original_content, content, True)
assert new_status is not None
status = new_status
assert status is not MISSING
self._set_post_data_state()
await self.push(status)
return

if not self._decode_data:
content = original_content
assert content is not None

self.envelope.content = content
self.envelope.original_content = original_content

# Call the new API first if it's implemented.
if "DATA" in self._handle_hooks:
status = await self._call_handler_hook('DATA')
else:
Expand All @@ -1526,18 +1573,21 @@ async def smtp_DATA(self, arg: str) -> None:
assert self.session is not None
args = (self.session.peer, self.envelope.mail_from,
self.envelope.rcpt_tos, self.envelope.content)
old_status : Optional[bytes] = None
if asyncio.iscoroutinefunction(
self.event_handler.process_message):
status = await self.event_handler.process_message(*args)
old_status = await self.event_handler.process_message(*args)
else:
status = self.event_handler.process_message(*args)
old_status = self.event_handler.process_message(*args)
# The deprecated API can return None which means, return the
# default status. Don't worry about coverage for this case as
# it's a deprecated API that will go away after 1.0.
if status is None: # pragma: nocover
if old_status is None: # pragma: nocover
status = MISSING
else:
status = old_status
self._set_post_data_state()
await self.push('250 OK' if status is MISSING else status)
await self.push(b'250 OK' if status is MISSING else status)

# Commands that have not been implemented.
async def smtp_EXPN(self, arg: str):
Expand Down
38 changes: 37 additions & 1 deletion aiosmtpd/testing/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import sys
import time
from smtplib import SMTP as SMTP_Client
from typing import List
from typing import List, Optional

from aiosmtpd.smtp import Envelope, Session, SMTP

Expand Down Expand Up @@ -58,6 +58,42 @@ async def handle_DATA(
return "250 OK"


class ChunkedReceivingHandler:
def __init__(self):
self.box: List[Envelope] = []
self.response: Optional[str] = '250 OK'
self.respond_last = True
self.sent_response = False

async def handle_DATA_CHUNK(
self, server: SMTP, session: Session, envelope: Envelope,
data: bytes, text: Optional[str], last: bool
) -> Optional[str]:
assert not self.sent_response
assert bool(data)
if text is not None:
if envelope.content is None:
envelope.content = ''
assert isinstance(envelope.content, str)
envelope.content += text
if envelope.original_content is None:
envelope.original_content = b''
envelope.original_content += data
else:
if envelope.content is None:
envelope.content = b''
assert isinstance(envelope.content, bytes)
envelope.content += data

if last:
self.box.append(envelope)
if not last and self.respond_last:
return None
if self.response is not None:
self.sent_response = True
return self.response


def catchup_delay(delay: float = ASYNCIO_CATCHUP_DELAY):
"""
Sleep for awhile to give asyncio's event loop time to catch up.
Expand Down
Loading