Skip to content

Commit b50deee

Browse files
committed
Add dodgy hook when a subscriber is established
1 parent 9a595aa commit b50deee

File tree

7 files changed

+81
-9
lines changed

7 files changed

+81
-9
lines changed

source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public void SetUp()
2727
stream.SetRemoteIdentity(new RemoteIdentity(RemoteIdentityType.Server));
2828
var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build();
2929
var activeConnectionsLimiter = new ActiveTcpConnectionsLimiter(limits);
30-
protocol = new MessageExchangeProtocol(stream, new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), activeConnectionsLimiter, Substitute.For<ILog>());
30+
protocol = new MessageExchangeProtocol(stream, new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), activeConnectionsLimiter, Substitute.For<ILog>(), new NullSubscriberObserver());
3131
}
3232

3333
// TODO - ASYNC ME UP! ExchangeAsClientAsync cancellation

source/Halibut.Tests/Transport/SecureClientFixture.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public async Task SecureClientClearsPoolWhenAllConnectionsCorrupt()
6767
var connection = Substitute.For<IConnection>();
6868
var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build();
6969
var activeConnectionLimiter = new ActiveTcpConnectionsLimiter(limits);
70-
connection.Protocol.Returns(new MessageExchangeProtocol(stream, limits, activeConnectionLimiter, log));
70+
connection.Protocol.Returns(new MessageExchangeProtocol(stream, limits, activeConnectionLimiter, log, new NullSubscriberObserver()));
7171

7272
await connectionManager.ReleaseConnectionAsync(endpoint, connection, CancellationToken.None);
7373
}
@@ -102,7 +102,7 @@ static MessageExchangeProtocol GetProtocol(Stream stream, ILog logger)
102102
{
103103
var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build();
104104
var activeConnectionLimiter = new ActiveTcpConnectionsLimiter(limits);
105-
return new MessageExchangeProtocol(new MessageExchangeStream(stream, new MessageSerializerBuilder(new LogFactory()).Build(), new NoOpControlMessageObserver(), limits, logger), limits, activeConnectionLimiter, logger);
105+
return new MessageExchangeProtocol(new MessageExchangeStream(stream, new MessageSerializerBuilder(new LogFactory()).Build(), new NoOpControlMessageObserver(), limits, logger), limits, activeConnectionLimiter, logger, new NullSubscriberObserver());
106106
}
107107
}
108108
}

source/Halibut/HalibutRuntime.cs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public class HalibutRuntime : IHalibutRuntime
4747
readonly IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter;
4848
readonly IControlMessageObserver controlMessageObserver;
4949
readonly ISslConfigurationProvider sslConfigurationProvider;
50+
readonly ISubscriberObserver subscriberObserver;
5051

5152
internal HalibutRuntime(
5253
IServiceFactory serviceFactory,
@@ -63,8 +64,8 @@ internal HalibutRuntime(
6364
IConnectionsObserver connectionsObserver,
6465
IControlMessageObserver controlMessageObserver,
6566
ISecureConnectionObserver secureConnectionObserver,
66-
ISslConfigurationProvider sslConfigurationProvider
67-
)
67+
ISslConfigurationProvider sslConfigurationProvider,
68+
ISubscriberObserver subscriberObserver)
6869
{
6970
this.serverCertificate = serverCertificate;
7071
this.trustProvider = trustProvider;
@@ -79,6 +80,7 @@ ISslConfigurationProvider sslConfigurationProvider
7980
TimeoutsAndLimits = halibutTimeoutsAndLimits;
8081
this.connectionsObserver = connectionsObserver;
8182
this.secureConnectionObserver = secureConnectionObserver;
83+
this.subscriberObserver = subscriberObserver;
8284
this.controlMessageObserver = controlMessageObserver;
8385
this.sslConfigurationProvider = sslConfigurationProvider;
8486

@@ -121,7 +123,12 @@ public int Listen(int port)
121123

122124
ExchangeProtocolBuilder ExchangeProtocolBuilder()
123125
{
124-
return (stream, log) => new MessageExchangeProtocol(new MessageExchangeStream(stream, messageSerializer, controlMessageObserver, TimeoutsAndLimits, log), TimeoutsAndLimits, activeTcpConnectionsLimiter, log);
126+
return (stream, log) => new MessageExchangeProtocol(
127+
new MessageExchangeStream(stream, messageSerializer, controlMessageObserver, TimeoutsAndLimits, log),
128+
TimeoutsAndLimits,
129+
activeTcpConnectionsLimiter,
130+
log,
131+
subscriberObserver);
125132
}
126133

127134
public int Listen(IPEndPoint endpoint)

source/Halibut/HalibutRuntimeBuilder.cs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public class HalibutRuntimeBuilder
3333
IControlMessageObserver? controlMessageObserver;
3434
MessageStreamWrappers queueMessageStreamWrappers = new();
3535
ISslConfigurationProvider? sslConfigurationProvider;
36+
ISubscriberObserver? subscriberObserver;
3637

3738
public HalibutRuntimeBuilder WithQueueMessageStreamWrappers(MessageStreamWrappers queueMessageStreamWrappers)
3839
{
@@ -58,6 +59,12 @@ public HalibutRuntimeBuilder WithSslConfigurationProvider(ISslConfigurationProvi
5859
return this;
5960
}
6061

62+
public HalibutRuntimeBuilder WithSubscriptionObserver(ISubscriberObserver subscriberObserver)
63+
{
64+
this.subscriberObserver = subscriberObserver;
65+
return this;
66+
}
67+
6168
internal HalibutRuntimeBuilder WithStreamFactory(IStreamFactory streamFactory)
6269
{
6370
this.streamFactory = streamFactory;
@@ -194,6 +201,7 @@ public HalibutRuntime Build()
194201
var rpcObserver = this.rpcObserver ?? new NoRpcObserver();
195202
var controlMessageObserver = this.controlMessageObserver ?? new NoOpControlMessageObserver();
196203
var sslConfigurationProvider = this.sslConfigurationProvider ?? SslConfiguration.Default;
204+
var subscriberObserver = this.subscriberObserver ?? new NullSubscriberObserver();
197205

198206
var halibutRuntime = new HalibutRuntime(
199207
serviceFactory,
@@ -210,7 +218,8 @@ public HalibutRuntime Build()
210218
connectionsObserver,
211219
controlMessageObserver,
212220
secureConnectionObserver,
213-
sslConfigurationProvider
221+
sslConfigurationProvider,
222+
subscriberObserver
214223
);
215224

216225
if (onUnauthorizedClientConnect is not null)
@@ -221,4 +230,4 @@ public HalibutRuntime Build()
221230
return halibutRuntime;
222231
}
223232
}
224-
}
233+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright 2012-2013 Octopus Deploy Pty. Ltd.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
using System.Threading;
16+
using System.Threading.Tasks;
17+
18+
namespace Halibut.Transport.Observability
19+
{
20+
public interface ISubscriberObserver
21+
{
22+
Task SubscriberConnected(string uri, CancellationToken cancellationToken);
23+
}
24+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// Copyright 2012-2013 Octopus Deploy Pty. Ltd.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
using System.Threading;
16+
using System.Threading.Tasks;
17+
18+
namespace Halibut.Transport.Observability
19+
{
20+
public class NullSubscriberObserver : ISubscriberObserver
21+
{
22+
public Task SubscriberConnected(string uri, CancellationToken cancellationToken)
23+
{
24+
return Task.CompletedTask;
25+
}
26+
}
27+
}

source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using Halibut.Diagnostics;
66
using Halibut.Exceptions;
77
using Halibut.ServiceModel;
8+
using Halibut.Transport.Observability;
89

910
namespace Halibut.Transport.Protocol
1011
{
@@ -21,15 +22,18 @@ public class MessageExchangeProtocol
2122
readonly HalibutTimeoutsAndLimits halibutTimeoutsAndLimits;
2223
readonly IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter;
2324
readonly ILog log;
25+
readonly ISubscriberObserver subscriberObserver;
2426
bool identified;
2527
volatile bool acceptClientRequests = true;
2628

27-
public MessageExchangeProtocol(IMessageExchangeStream stream, HalibutTimeoutsAndLimits halibutTimeoutsAndLimits, IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter, ILog log)
29+
public MessageExchangeProtocol(IMessageExchangeStream stream, HalibutTimeoutsAndLimits halibutTimeoutsAndLimits, IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter, ILog log,
30+
ISubscriberObserver subscriberObserver)
2831
{
2932
this.stream = stream;
3033
this.halibutTimeoutsAndLimits = halibutTimeoutsAndLimits;
3134
this.activeTcpConnectionsLimiter = activeTcpConnectionsLimiter;
3235
this.log = log;
36+
this.subscriberObserver = subscriberObserver;
3337
}
3438

3539
public async Task<ResponseMessage> ExchangeAsClientAsync(RequestMessage request, CancellationToken cancellationToken)
@@ -112,6 +116,7 @@ public async Task ExchangeAsServerAsync(Func<RequestMessage, Task<ResponseMessag
112116
//if the remote identity is a subscriber, we might need to limit their active TCP connections
113117
if (identity.IdentityType == RemoteIdentityType.Subscriber)
114118
{
119+
await subscriberObserver.SubscriberConnected(identity.SubscriptionId.ToString(), cancellationToken);
115120
limitedConnectionLease = activeTcpConnectionsLimiter.LeaseActiveTcpConnection(identity.SubscriptionId);
116121
}
117122

0 commit comments

Comments
 (0)