diff --git a/app/lib/frontend/email_sender.dart b/app/lib/frontend/email_sender.dart index 96a8ccc6fc..29978b295f 100644 --- a/app/lib/frontend/email_sender.dart +++ b/app/lib/frontend/email_sender.dart @@ -37,6 +37,96 @@ abstract class EmailSender { Future sendMessage(EmailMessage message); } +abstract class EmailSenderConnection { + Future send(EmailMessage message); + Future close(); +} + +abstract class EmailSenderBase implements EmailSender { + /// We want to send only one email at a time. + final _pool = Pool(1); + + /// The current zone when this [EmailSender] instance was created. + final Zone _parentZone; + + final _connectionsBySender = >{}; + final _forceReconnectSenders = {}; + DateTime _backoffUntil = DateTime(0); + + EmailSenderBase() : _parentZone = Zone.current; + + Future connect(String senderEmail); + void invalidateCredentials(); + + @override + bool get shouldBackoff => clock.now().isBefore(_backoffUntil); + + @override + Future sendMessage(EmailMessage message) async { + // One attempt at a time. + await _pool.withResource(() async { + return await _sendMessage(message); + }); + } + + Future _sendMessage(EmailMessage message) async { + _logger.info('Sending email: ${message.debugInfo}...'); + final sender = message.from.email; + try { + await retry( + () async { + final c = await _getConnection(sender); + await c.send(message); + }, + retryIf: (e) => + e is TimeoutException || + e is IOException || + e is SmtpClientCommunicationException || + e is SmtpNoGreetingException, + delayFactor: Duration(seconds: 2), + maxAttempts: 2, + onRetry: (_) { + _forceReconnectSenders.add(sender); + }, + ); + } on SmtpMessageValidationException catch (e, st) { + _logger.info('Sending email failed: ${message.debugInfo}.', e, st); + throw EmailSenderException.invalid(); + } on SmtpClientAuthenticationException catch (e, st) { + _logger.shout('Sending email failed due to invalid auth: $e', e, st); + _forceReconnectSenders.add(sender); + invalidateCredentials(); + _backoffUntil = clock.now().add(Duration(minutes: 2)); + throw EmailSenderException.failed(); + } on MailerException catch (e, st) { + _logger.warning('Sending email failed: ${message.debugInfo}.', e, st); + throw EmailSenderException.failed(); + } + } + + Future<_ZonedConnection> _getConnection(String sender) async { + final connectionFuture = _connectionsBySender[sender]; + final old = connectionFuture == null ? null : await connectionFuture; + final forceReconnect = _forceReconnectSenders.remove(sender); + if (!forceReconnect && old != null && !old.isExpired) { + return old; + } + final newConnectionFuture = Future.microtask(() async { + // closing the old connection if there was any, ignoring errors + await old?.close(); + + // PersistentConnection needs to be created in its designated zone, as its + // internal message subscription starts inside the constructor. + final connectionZone = _CatchAllZone(_parentZone); + final connection = + await connectionZone._zone.run(() async => connect(sender)); + return _ZonedConnection(connectionZone, connection); + }); + _connectionsBySender[sender] = newConnectionFuture; + return newConnectionFuture; + } +} + EmailSender createGmailRelaySender( String serviceAccountEmail, http.Client authClient, @@ -46,22 +136,32 @@ EmailSender createGmailRelaySender( authClient, ); -class _LoggingEmailSender implements EmailSender { +class _LoggingEmailSender extends EmailSenderBase { @override - bool get shouldBackoff => false; + Future connect(String senderEmail) async { + return _LoggingEmailSenderConnection(); + } @override - Future sendMessage(EmailMessage message) async { - final debugHeader = '(${message.subject}) ' - 'from ${message.from} ' - 'to ${message.recipients.join(', ')}'; + void invalidateCredentials() { + // ignore + } +} +class _LoggingEmailSenderConnection extends EmailSenderConnection { + @override + Future send(EmailMessage message) async { final urls = _simpleUrlRegExp .allMatches(message.bodyText) .map((e) => e.group(0)) .toList(); _logger.info('Not sending email (SMTP not configured): ' - '$debugHeader${urls.map((e) => '\n$e').join('')}'); + '${message.debugInfo}\n${urls.map((e) => '\n$e').join('')}'); + } + + @override + Future close() async { + // ignored } } @@ -119,7 +219,7 @@ Address? _toAddress(EmailAddress? input) => /// [2]: https://developers.google.com/gmail/imap/xoauth2-protocol#the_sasl_xoauth2_mechanism /// [3]: https://developers.google.com/identity/protocols/oauth2/service-account /// [4]: https://cloud.google.com/iam/docs/reference/credentials/rest/v1/projects.serviceAccounts/signJwt -class _GmailSmtpRelay implements EmailSender { +class _GmailSmtpRelay extends EmailSenderBase { static final _googleOauth2TokenUrl = Uri.parse('https://oauth2.googleapis.com/token'); static const _scopes = ['https://mail.google.com/']; @@ -127,94 +227,25 @@ class _GmailSmtpRelay implements EmailSender { final String _serviceAccountEmail; final http.Client _authClient; - final _connectionsBySender = >{}; - final _forceReconnectSenders = {}; - - /// The current zone when this [_GmailSmtpRelay] instance was created. - final Zone _parentZone; - final _pool = Pool(1); - DateTime _accessTokenRefreshed = DateTime(0); - DateTime _backoffUntil = DateTime(0); Future? _accessToken; _GmailSmtpRelay( this._serviceAccountEmail, this._authClient, - ) : _parentZone = Zone.current; - - @override - bool get shouldBackoff => clock.now().isBefore(_backoffUntil); + ); @override - Future sendMessage(EmailMessage message) async { - // One attempt at a time. - await _pool.withResource(() async => _sendMessage(message)); + Future connect(String senderEmail) async { + return _GmailConnection(PersistentConnection( + await _getSmtpServer(senderEmail), + timeout: Duration(seconds: 15), + )); } - Future _sendMessage(EmailMessage message) async { - final debugHeader = 'Message-ID:${message.localMessageId}@pub.dev ' - '(${message.subject}) ' - 'from ${message.from} ' - 'to ${message.recipients.join(', ')}'; - _logger.info('Sending email: $debugHeader...'); - final sender = message.from.email; - try { - await retry( - () async { - final c = await _getConnection(sender); - await c.send(_toMessage(message)); - }, - retryIf: (e) => - e is TimeoutException || - e is IOException || - e is SmtpClientCommunicationException || - e is SmtpNoGreetingException, - delayFactor: Duration(seconds: 2), - maxAttempts: 2, - onRetry: (_) { - _forceReconnectSenders.add(sender); - }, - ); - } on SmtpMessageValidationException catch (e, st) { - _logger.info('Sending email failed: $debugHeader.', e, st); - throw EmailSenderException.invalid(); - } on SmtpClientAuthenticationException catch (e, st) { - _logger.shout('Sending email failed due to invalid auth: $e', e, st); - _backoffUntil = clock.now().add(Duration(minutes: 2)); - _forceReconnectSenders.add(sender); - _accessToken = null; - throw EmailSenderException.failed(); - } on MailerException catch (e, st) { - _logger.warning('Sending email failed: $debugHeader.', e, st); - throw EmailSenderException.failed(); - } - } - - Future<_GmailConnection> _getConnection(String sender) async { - final connectionFuture = _connectionsBySender[sender]; - final old = connectionFuture == null ? null : await connectionFuture; - final forceReconnect = _forceReconnectSenders.remove(sender); - if (!forceReconnect && old != null && !old.isExpired) { - return old; - } - final newConnectionFuture = Future.microtask(() async { - // closing the old connection if there was any, ignoring errors - await old?.close(); - - // PersistentConnection needs to be created in its designated zone, as its - // internal message subscription starts inside the constructor. - final connectionZone = _ConnectionZone(_parentZone); - final connection = await connectionZone._zone.run( - () async => PersistentConnection( - await _getSmtpServer(sender), - timeout: Duration(seconds: 15), - ), - ); - return _GmailConnection(connectionZone, connection); - }); - _connectionsBySender[sender] = newConnectionFuture; - return newConnectionFuture; + @override + void invalidateCredentials() { + _accessToken = null; } Future _getSmtpServer(String sender) async { @@ -284,11 +315,13 @@ class _GmailSmtpRelay implements EmailSender { } } -class _ConnectionZone { +class _CatchAllZone { final Zone _parentZone; Object? _uncaughtError; - _ConnectionZone(this._parentZone); + _CatchAllZone(this._parentZone); + + bool get hasUncaughtError => _uncaughtError != null; late final _zone = _parentZone.fork(specification: ZoneSpecification( handleUncaughtError: (self, parent, zone, error, stackTrace) { @@ -296,22 +329,40 @@ class _ConnectionZone { _logger.severe('Uncaught error while sending email', error, stackTrace); }, )); + + Future runAsync(Future Function() fn) async { + final completer = Completer(); + _zone.scheduleMicrotask(() async { + try { + final r = await fn(); + completer.complete(r); + } catch (e, st) { + completer.completeError(e, st); + } + }); + return await completer.future; + } } -class _GmailConnection { +/// Wraps the physical connection within a [Zone], where the send operation should +/// be wraped, tracking otherwise uncaught exceptions. +/// +/// It also tracks age, usage and expiration. +class _ZonedConnection { final DateTime created; - final PersistentConnection _connection; - final _ConnectionZone _connectionZone; + final _CatchAllZone _zone; + final EmailSenderConnection _connection; + DateTime _lastUsed; var _sentCount = 0; - _GmailConnection(this._connectionZone, this._connection) + _ZonedConnection(this._zone, this._connection) : created = clock.now(), _lastUsed = clock.now(); bool get isExpired { // The connection is in an unknown state, better not use it. - if (_connectionZone._uncaughtError != null) { + if (_zone.hasUncaughtError) { return true; } // There is a 100-recipient limit per SMTP transaction for smtp-relay.gmail.com. @@ -331,15 +382,15 @@ class _GmailConnection { return false; } - Future send(Message message) async { + Future send(EmailMessage message) async { _sentCount += message.recipients.length + message.ccRecipients.length; try { - final r = await _connectionZone._zone - .run(() async => await _connection.send(message)); - if (_connectionZone._uncaughtError != null) { + if (_zone.hasUncaughtError) { throw EmailSenderException.failed(); } - return r; + await _zone.runAsync(() async { + await _connection.send(message); + }); } finally { _lastUsed = clock.now(); } @@ -347,7 +398,7 @@ class _GmailConnection { Future close() async { try { - await _connectionZone._zone.run(() async { + await _zone.runAsync(() async { await _connection.close(); }); } catch (e, st) { @@ -355,3 +406,15 @@ class _GmailConnection { } } } + +class _GmailConnection extends EmailSenderConnection { + final PersistentConnection _connection; + _GmailConnection(this._connection); + + @override + Future send(EmailMessage message) => + _connection.send(_toMessage(message)); + + @override + Future close() => _connection.close(); +} diff --git a/app/lib/shared/email.dart b/app/lib/shared/email.dart index 666fb4a0a6..1fdd0711a9 100644 --- a/app/lib/shared/email.dart +++ b/app/lib/shared/email.dart @@ -156,6 +156,14 @@ class EmailMessage { 'bodyText': bodyText, }; } + + late final debugInfo = [ + 'Message-ID:$localMessageId@pub.dev', + '($subject)', + 'from $from', + 'to ${recipients.join(', ')}', + if (ccRecipients.isNotEmpty) 'cc ${ccRecipients.join(', ')}', + ].join(' '); } /// Parses the body text and splits the [input] paragraphs to [lineLength] diff --git a/app/test/service/email/email_sender_test.dart b/app/test/service/email/email_sender_test.dart new file mode 100644 index 0000000000..e8c6423d1f --- /dev/null +++ b/app/test/service/email/email_sender_test.dart @@ -0,0 +1,144 @@ +// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:clock/clock.dart'; +import 'package:mailer/mailer.dart'; +import 'package:pub_dev/admin/actions/actions.dart'; +import 'package:pub_dev/frontend/email_sender.dart'; +import 'package:pub_dev/shared/email.dart'; +import 'package:test/test.dart'; + +void main() { + group('EmailSenderBase', () { + EmailMessage newEmailMessage() => EmailMessage( + EmailAddress('admin@pub.dev'), + [EmailAddress('user@pub.dev')], + 'subject', + 'bodyText', + ); + + test('sending email', () async { + final log = []; + final sender = _EmailSender(log, (message) {}); + + await sender.sendMessage(newEmailMessage()); + expect(log, [ + 'Connecting #0 for admin@pub.dev', + '#0 sending to user@pub.dev', + ]); + + log.clear(); + await sender.sendMessage(newEmailMessage()); + expect(log, [ + '#0 sending to user@pub.dev', + ]); + + log.clear(); + await withClock(Clock.fixed(clock.minutesFromNow(2)), () async { + await sender.sendMessage(newEmailMessage()); + }); + expect(log, [ + '#0 closing connection.', + 'Connecting #1 for admin@pub.dev', + '#1 sending to user@pub.dev', + ]); + }); + + test('throwing MailerException', () async { + final log = []; + final sender = _EmailSender(log, (message) { + log.add('Throwing SmtpClientCommunicationException.'); + throw SmtpClientCommunicationException('test'); + }); + await expectLater( + () => sender.sendMessage(newEmailMessage()), + throwsA(isA()), + ); + expect(log, [ + 'Connecting #0 for admin@pub.dev', + '#0 sending to user@pub.dev', + 'Throwing SmtpClientCommunicationException.', + '#0 closing connection.', + 'Connecting #1 for admin@pub.dev', + '#1 sending to user@pub.dev', + 'Throwing SmtpClientCommunicationException.', + ]); + }); + + test('later async exception invalidates the connection', () async { + final log = []; + final sender = _EmailSender(log, (message) async { + scheduleMicrotask(() async { + await Future.delayed(Duration(seconds: 1)); + log.add('Throwing Exception from microtask.'); + throw Exception(); + }); + log.add('Completed sending.'); + }); + // first message triggers async exception in the zone + await sender.sendMessage(newEmailMessage()); + + // waiting for the exception to happen + await Future.delayed(Duration(seconds: 2)); + + // second message finds the connection as invalid, creates a new one + await sender.sendMessage(newEmailMessage()); + expect(log, [ + 'Connecting #0 for admin@pub.dev', + '#0 sending to user@pub.dev', + 'Completed sending.', + 'Throwing Exception from microtask.', + '#0 closing connection.', + 'Connecting #1 for admin@pub.dev', + '#1 sending to user@pub.dev', + 'Completed sending.', + ]); + }); + }); +} + +typedef _EmailSenderFn = FutureOr Function(EmailMessage message); + +class _EmailSender extends EmailSenderBase { + final List _log; + final _EmailSenderFn _emailSenderFn; + int _connectionCount = 0; + + _EmailSender(this._log, this._emailSenderFn); + + @override + Future connect(String senderEmail) async { + try { + _log.add('Connecting #$_connectionCount for $senderEmail'); + return _EmailSenderConnection(_connectionCount, this); + } finally { + _connectionCount++; + } + } + + @override + void invalidateCredentials() { + _log.add('Invalidate credentials.'); + } +} + +class _EmailSenderConnection extends EmailSenderConnection { + final int _id; + final _EmailSender _sender; + + _EmailSenderConnection(this._id, this._sender); + + @override + Future send(EmailMessage message) async { + _sender._log.add('#$_id sending to ${message.recipients.join(', ')}'); + await _sender._emailSenderFn(message); + } + + @override + Future close() async { + _sender._log.add('#$_id closing connection.'); + } +}