Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
261 changes: 162 additions & 99 deletions app/lib/frontend/email_sender.dart
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,96 @@ abstract class EmailSender {
Future<void> sendMessage(EmailMessage message);
}

abstract class EmailSenderConnection {
Future<void> send(EmailMessage message);
Future<void> close();
}

abstract class EmailSenderBase implements EmailSender {
/// We want to send only one email at a time.
final _pool = Pool(1);

/// The current zone when this [EmailSender] instance was created.
final Zone _parentZone;

final _connectionsBySender = <String, Future<_ZonedConnection>>{};
final _forceReconnectSenders = <String>{};
DateTime _backoffUntil = DateTime(0);

EmailSenderBase() : _parentZone = Zone.current;

Future<EmailSenderConnection> connect(String senderEmail);
void invalidateCredentials();

@override
bool get shouldBackoff => clock.now().isBefore(_backoffUntil);

@override
Future<void> sendMessage(EmailMessage message) async {
// One attempt at a time.
await _pool.withResource(() async {
return await _sendMessage(message);
});
}

Future<void> _sendMessage(EmailMessage message) async {
_logger.info('Sending email: ${message.debugInfo}...');
final sender = message.from.email;
try {
await retry(
() async {
final c = await _getConnection(sender);
await c.send(message);
},
retryIf: (e) =>
e is TimeoutException ||
e is IOException ||
e is SmtpClientCommunicationException ||
e is SmtpNoGreetingException,
delayFactor: Duration(seconds: 2),
maxAttempts: 2,
onRetry: (_) {
_forceReconnectSenders.add(sender);
},
);
} on SmtpMessageValidationException catch (e, st) {
_logger.info('Sending email failed: ${message.debugInfo}.', e, st);
throw EmailSenderException.invalid();
} on SmtpClientAuthenticationException catch (e, st) {
_logger.shout('Sending email failed due to invalid auth: $e', e, st);
_forceReconnectSenders.add(sender);
invalidateCredentials();
_backoffUntil = clock.now().add(Duration(minutes: 2));
throw EmailSenderException.failed();
} on MailerException catch (e, st) {
_logger.warning('Sending email failed: ${message.debugInfo}.', e, st);
throw EmailSenderException.failed();
}
}

Future<_ZonedConnection> _getConnection(String sender) async {
final connectionFuture = _connectionsBySender[sender];
final old = connectionFuture == null ? null : await connectionFuture;
final forceReconnect = _forceReconnectSenders.remove(sender);
if (!forceReconnect && old != null && !old.isExpired) {
return old;
}
final newConnectionFuture = Future.microtask(() async {
// closing the old connection if there was any, ignoring errors
await old?.close();

// PersistentConnection needs to be created in its designated zone, as its
// internal message subscription starts inside the constructor.
final connectionZone = _CatchAllZone(_parentZone);
final connection =
await connectionZone._zone.run(() async => connect(sender));
return _ZonedConnection(connectionZone, connection);
});
_connectionsBySender[sender] = newConnectionFuture;
return newConnectionFuture;
}
}

EmailSender createGmailRelaySender(
String serviceAccountEmail,
http.Client authClient,
Expand All @@ -46,22 +136,32 @@ EmailSender createGmailRelaySender(
authClient,
);

class _LoggingEmailSender implements EmailSender {
class _LoggingEmailSender extends EmailSenderBase {
@override
bool get shouldBackoff => false;
Future<EmailSenderConnection> connect(String senderEmail) async {
return _LoggingEmailSenderConnection();
}

@override
Future<void> sendMessage(EmailMessage message) async {
final debugHeader = '(${message.subject}) '
'from ${message.from} '
'to ${message.recipients.join(', ')}';
void invalidateCredentials() {
// ignore
}
}

class _LoggingEmailSenderConnection extends EmailSenderConnection {
@override
Future<void> send(EmailMessage message) async {
final urls = _simpleUrlRegExp
.allMatches(message.bodyText)
.map((e) => e.group(0))
.toList();
_logger.info('Not sending email (SMTP not configured): '
'$debugHeader${urls.map((e) => '\n$e').join('')}');
'${message.debugInfo}\n${urls.map((e) => '\n$e').join('')}');
}

@override
Future<void> close() async {
// ignored
}
}

Expand Down Expand Up @@ -119,102 +219,33 @@ Address? _toAddress(EmailAddress? input) =>
/// [2]: https://developers.google.com/gmail/imap/xoauth2-protocol#the_sasl_xoauth2_mechanism
/// [3]: https://developers.google.com/identity/protocols/oauth2/service-account
/// [4]: https://cloud.google.com/iam/docs/reference/credentials/rest/v1/projects.serviceAccounts/signJwt
class _GmailSmtpRelay implements EmailSender {
class _GmailSmtpRelay extends EmailSenderBase {
static final _googleOauth2TokenUrl =
Uri.parse('https://oauth2.googleapis.com/token');
static const _scopes = ['https://mail.google.com/'];

final String _serviceAccountEmail;
final http.Client _authClient;

final _connectionsBySender = <String, Future<_GmailConnection>>{};
final _forceReconnectSenders = <String>{};

/// The current zone when this [_GmailSmtpRelay] instance was created.
final Zone _parentZone;
final _pool = Pool(1);

DateTime _accessTokenRefreshed = DateTime(0);
DateTime _backoffUntil = DateTime(0);
Future<String>? _accessToken;

_GmailSmtpRelay(
this._serviceAccountEmail,
this._authClient,
) : _parentZone = Zone.current;

@override
bool get shouldBackoff => clock.now().isBefore(_backoffUntil);
);

@override
Future<void> sendMessage(EmailMessage message) async {
// One attempt at a time.
await _pool.withResource(() async => _sendMessage(message));
Future<EmailSenderConnection> connect(String senderEmail) async {
return _GmailConnection(PersistentConnection(
await _getSmtpServer(senderEmail),
timeout: Duration(seconds: 15),
));
}

Future<void> _sendMessage(EmailMessage message) async {
final debugHeader = 'Message-ID:${message.localMessageId}@pub.dev '
'(${message.subject}) '
'from ${message.from} '
'to ${message.recipients.join(', ')}';
_logger.info('Sending email: $debugHeader...');
final sender = message.from.email;
try {
await retry(
() async {
final c = await _getConnection(sender);
await c.send(_toMessage(message));
},
retryIf: (e) =>
e is TimeoutException ||
e is IOException ||
e is SmtpClientCommunicationException ||
e is SmtpNoGreetingException,
delayFactor: Duration(seconds: 2),
maxAttempts: 2,
onRetry: (_) {
_forceReconnectSenders.add(sender);
},
);
} on SmtpMessageValidationException catch (e, st) {
_logger.info('Sending email failed: $debugHeader.', e, st);
throw EmailSenderException.invalid();
} on SmtpClientAuthenticationException catch (e, st) {
_logger.shout('Sending email failed due to invalid auth: $e', e, st);
_backoffUntil = clock.now().add(Duration(minutes: 2));
_forceReconnectSenders.add(sender);
_accessToken = null;
throw EmailSenderException.failed();
} on MailerException catch (e, st) {
_logger.warning('Sending email failed: $debugHeader.', e, st);
throw EmailSenderException.failed();
}
}

Future<_GmailConnection> _getConnection(String sender) async {
final connectionFuture = _connectionsBySender[sender];
final old = connectionFuture == null ? null : await connectionFuture;
final forceReconnect = _forceReconnectSenders.remove(sender);
if (!forceReconnect && old != null && !old.isExpired) {
return old;
}
final newConnectionFuture = Future.microtask(() async {
// closing the old connection if there was any, ignoring errors
await old?.close();

// PersistentConnection needs to be created in its designated zone, as its
// internal message subscription starts inside the constructor.
final connectionZone = _ConnectionZone(_parentZone);
final connection = await connectionZone._zone.run(
() async => PersistentConnection(
await _getSmtpServer(sender),
timeout: Duration(seconds: 15),
),
);
return _GmailConnection(connectionZone, connection);
});
_connectionsBySender[sender] = newConnectionFuture;
return newConnectionFuture;
@override
void invalidateCredentials() {
_accessToken = null;
}

Future<SmtpServer> _getSmtpServer(String sender) async {
Expand Down Expand Up @@ -284,34 +315,54 @@ class _GmailSmtpRelay implements EmailSender {
}
}

class _ConnectionZone {
class _CatchAllZone {
final Zone _parentZone;
Object? _uncaughtError;

_ConnectionZone(this._parentZone);
_CatchAllZone(this._parentZone);

bool get hasUncaughtError => _uncaughtError != null;

late final _zone = _parentZone.fork(specification: ZoneSpecification(
handleUncaughtError: (self, parent, zone, error, stackTrace) {
_uncaughtError = error;
_logger.severe('Uncaught error while sending email', error, stackTrace);
},
));

Future<R> runAsync<R>(Future<R> Function() fn) async {
final completer = Completer<R>();
_zone.scheduleMicrotask(() async {
try {
final r = await fn();
completer.complete(r);
} catch (e, st) {
completer.completeError(e, st);
}
});
return await completer.future;
}
}

class _GmailConnection {
/// Wraps the physical connection within a [Zone], where the send operation should
/// be wraped, tracking otherwise uncaught exceptions.
///
/// It also tracks age, usage and expiration.
class _ZonedConnection {
final DateTime created;
final PersistentConnection _connection;
final _ConnectionZone _connectionZone;
final _CatchAllZone _zone;
final EmailSenderConnection _connection;

DateTime _lastUsed;
var _sentCount = 0;

_GmailConnection(this._connectionZone, this._connection)
_ZonedConnection(this._zone, this._connection)
: created = clock.now(),
_lastUsed = clock.now();

bool get isExpired {
// The connection is in an unknown state, better not use it.
if (_connectionZone._uncaughtError != null) {
if (_zone.hasUncaughtError) {
return true;
}
// There is a 100-recipient limit per SMTP transaction for smtp-relay.gmail.com.
Expand All @@ -331,27 +382,39 @@ class _GmailConnection {
return false;
}

Future<SendReport> send(Message message) async {
Future<void> send(EmailMessage message) async {
_sentCount += message.recipients.length + message.ccRecipients.length;
try {
final r = await _connectionZone._zone
.run(() async => await _connection.send(message));
if (_connectionZone._uncaughtError != null) {
if (_zone.hasUncaughtError) {
throw EmailSenderException.failed();
}
return r;
await _zone.runAsync(() async {
await _connection.send(message);
});
} finally {
_lastUsed = clock.now();
}
}

Future<void> close() async {
try {
await _connectionZone._zone.run(() async {
await _zone.runAsync(() async {
await _connection.close();
});
} catch (e, st) {
_logger.warning('Unable to close SMTP connection.', e, st);
}
}
}

class _GmailConnection extends EmailSenderConnection {
final PersistentConnection _connection;
_GmailConnection(this._connection);

@override
Future<void> send(EmailMessage message) =>
_connection.send(_toMessage(message));

@override
Future<void> close() => _connection.close();
}
8 changes: 8 additions & 0 deletions app/lib/shared/email.dart
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,14 @@ class EmailMessage {
'bodyText': bodyText,
};
}

late final debugInfo = [
'Message-ID:[email protected]',
'($subject)',
'from $from',
'to ${recipients.join(', ')}',
if (ccRecipients.isNotEmpty) 'cc ${ccRecipients.join(', ')}',
].join(' ');
}

/// Parses the body text and splits the [input] paragraphs to [lineLength]
Expand Down
Loading