Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 103 additions & 95 deletions src/Nullinside.Api.Common/Twitch/TwitchClientProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ public async Task AddMessageCallback(string channel, Action<OnMessageReceivedArg
await JoinChannel(channel);
var channelSan = channel.ToLowerInvariant();
lock (_onMessageReceived) {
_onMessageReceived[channelSan] = callback;
if (!_onMessageReceived.TryAdd(channelSan, callback)) {
_onMessageReceived[channelSan] += callback;
}
}
}

Expand All @@ -167,7 +169,16 @@ public void RemoveMessageCallback(string channel, Action<OnMessageReceivedArgs>
bool shouldRemove = false;
var channelSan = channel.ToLowerInvariant();
lock (_onMessageReceived) {
_onMessageReceived.Remove(channel);
if (_onMessageReceived.ContainsKey(channelSan)) {
var item = _onMessageReceived[channelSan];
item -= callback;
if (null == item) {
_onMessageReceived.Remove(channelSan);
}
else {
_onMessageReceived[channelSan] = item;
}
}
}

if (shouldRemove) {
Expand Down Expand Up @@ -233,33 +244,31 @@ private async Task<bool> JoinChannel(string channel) {
if (!await Connect()) {
return false;
}

try {
// If we don't have a client, give up.
if (null == client) {
return false;
}

return await Task.Run(() => {
try {
// If we don't have a client, give up.
if (null == client) {
return false;
}

lock (twitchClientLock) {
// If we are already in the channel, we are done.
if (null != client.JoinedChannels.FirstOrDefault(c =>
channel.Equals(c.Channel, StringComparison.InvariantCultureIgnoreCase))) {
return true;
}

// Otherwise, join the channel. At one point we waited here on the "OnJoinedChannel" to ensure the
// connection before moving onto the next channel. However, it was causing a massive slowdown in
// the application, and we've been working fine without it...so for now...we try this...
client.JoinChannel(channel);
lock (twitchClientLock) {
// If we are already in the channel, we are done.
if (null != client.JoinedChannels.FirstOrDefault(c =>
channel.Equals(c.Channel, StringComparison.InvariantCultureIgnoreCase))) {
return true;
}

return true;
}
catch {
return false;
// Otherwise, join the channel. At one point we waited here on the "OnJoinedChannel" to ensure the
// connection before moving onto the next channel. However, it was causing a massive slowdown in
// the application, and we've been working fine without it...so for now...we try this...
client.JoinChannel(channel);
}
});

return true;
}
catch {
return false;
}
}

/// <summary>
Expand Down Expand Up @@ -290,93 +299,92 @@ private async void TwitchChatClientReconnectOnElapsed(object? sender, ElapsedEve
/// Connects to twitch chat.
/// </summary>
/// <returns>True if successful, false otherwise.</returns>
private async Task<bool> Connect() {
private Task<bool> Connect() {
// If we're already connected, we are good to go.
lock (twitchClientLock) {
if (client?.IsConnected ?? false) {
return true;
return Task.FromResult(true);
}
}

// If we don't have the ability to connect, we can leave early.
if (string.IsNullOrWhiteSpace(TwitchUsername) || string.IsNullOrWhiteSpace(TwitchOAuthToken)) {
return false;
return Task.FromResult(false);
}

return await Task.Run(() => {
try {
bool isConnected = false;
lock (twitchClientLock) {
if (client?.IsConnected ?? false) {
return true;
}

// If this is a first time initialization, create a brand-new client.
bool haveNoClient = null == client;
if (haveNoClient) {
var clientOptions = new ClientOptions
{ MessagesAllowedInPeriod = 750, ThrottlingPeriod = TimeSpan.FromSeconds(30) };

socket = new WebSocketClient(clientOptions);
client = new TwitchClient(socket);
var credentials = new ConnectionCredentials(TwitchUsername, TwitchOAuthToken);
client.Initialize(credentials);
client.AutoReListenOnException = true;
client.OnMessageReceived += TwitchChatClient_OnMessageReceived;
client.OnUserBanned += TwitchChatClient_OnUserBanned;
client.OnRaidNotification += TwitchChatClient_OnRaidNotification;
client.OnDisconnected += (sender, args) => {
LOG.Error("Twitch Client Disconnected");
};
client.OnConnectionError += (sender, args) => {
LOG.Error($"Twitch Client Connection Error: {args.Error.Message}");
};
client.OnError += (sender, args) => {
LOG.Error($"Twitch Client Error: {args.Exception.Message}");
};
client.OnIncorrectLogin += (sender, args) => {
LOG.Error($"Twitch Client Incorrect Login: {args.Exception.Message}");
};
client.OnNoPermissionError += (sender, args) => {
LOG.Error("Twitch Client No Permission Error");
};
}
try {
bool isConnected = false;
lock (twitchClientLock) {
if (client?.IsConnected ?? false) {
return Task.FromResult(true);
}

try {
// If we are not connect, connect.
if (null != client && !client.IsConnected) {
// If this is a new chat client, connect for the first time, otherwise reconnect.
Action connect = haveNoClient ? () => client.Connect() : () => client.Reconnect();
using var connectedEvent = new ManualResetEventSlim(false);
EventHandler<OnConnectedArgs> onConnected = (_, _) => connectedEvent.Set();
EventHandler<OnReconnectedEventArgs> onReconnect = (_, _) => connectedEvent.Set();
try {
client!.OnConnected += onConnected;
client!.OnReconnected += onReconnect;
connect();
if (!connectedEvent.Wait(30 * 1000)) {
return false;
}
}
finally {
client.OnConnected -= onConnected;
client.OnReconnected -= onReconnect;
// If this is a first time initialization, create a brand-new client.
bool haveNoClient = null == client;
if (haveNoClient) {
var clientOptions = new ClientOptions
{ MessagesAllowedInPeriod = 750, ThrottlingPeriod = TimeSpan.FromSeconds(30) };

socket = new WebSocketClient(clientOptions);
client = new TwitchClient(socket);
var credentials = new ConnectionCredentials(TwitchUsername, TwitchOAuthToken);
client.Initialize(credentials);
client.AutoReListenOnException = true;
client.OnMessageReceived += TwitchChatClient_OnMessageReceived;
client.OnUserBanned += TwitchChatClient_OnUserBanned;
client.OnRaidNotification += TwitchChatClient_OnRaidNotification;
client.OnDisconnected += (sender, args) => {
LOG.Error("Twitch Client Disconnected");
};
client.OnConnectionError += (sender, args) => {
LOG.Error($"Twitch Client Connection Error: {args.Error.Message}");
};
client.OnError += (sender, args) => {
LOG.Error($"Twitch Client Error: {args.Exception.Message}");
};
client.OnIncorrectLogin += (sender, args) => {
LOG.Error($"Twitch Client Incorrect Login: {args.Exception.Message}");
};
client.OnNoPermissionError += (sender, args) => {
LOG.Error("Twitch Client No Permission Error");
};
}

try {
// If we are not connect, connect.
if (null != client && !client.IsConnected) {
// If this is a new chat client, connect for the first time, otherwise reconnect.
Action connect = haveNoClient ? () => client.Connect() : () => client.Reconnect();
using var connectedEvent = new ManualResetEventSlim(false);
EventHandler<OnConnectedArgs> onConnected = (_, _) => connectedEvent.Set();
EventHandler<OnReconnectedEventArgs> onReconnect = (_, _) => connectedEvent.Set();
try {
client!.OnConnected += onConnected;
client!.OnReconnected += onReconnect;
connect();
if (!connectedEvent.Wait(30 * 1000)) {
return Task.FromResult(false);
}
}
finally {
client.OnConnected -= onConnected;
client.OnReconnected -= onReconnect;
}
}
catch {
}

// Determine if we successfully connected.
isConnected = client?.IsConnected ?? false;
}
catch {
// Do nothing, just try.
}

return isConnected;
}
catch {
return false;
// Determine if we successfully connected.
isConnected = client?.IsConnected ?? false;
}
});

return Task.FromResult(isConnected);
}
catch {
return Task.FromResult(false);
}
}

/// <summary>
Expand Down