-
Notifications
You must be signed in to change notification settings - Fork 351
[OpAMP.Client] Add support for plain http transport #2926
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
b236c0b
34fea2b
f1fe378
0ce14b0
f494074
84bfe5e
9591147
f96fec1
886be93
8db011f
559c42b
5c4ee57
dcc3b61
4c90ba3
8506ef0
bd930fa
54d57a8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
using Google.Protobuf; | ||
using OpAmp.Protocol; | ||
|
||
namespace OpenTelemetry.OpAmp.Client; | ||
|
||
internal class FrameBuilder : IFrameBuilder | ||
|
||
{ | ||
private readonly OpAmpClientSettings settings; | ||
|
||
private AgentToServer? currentMessage; | ||
private ByteString instanceUid; | ||
private ulong sequenceNum; | ||
|
||
public FrameBuilder(OpAmpClientSettings settings) | ||
{ | ||
this.settings = settings; | ||
this.instanceUid = ByteString.CopyFrom(this.settings.InstanceUid.ToByteArray()); | ||
} | ||
|
||
public IFrameBuilder StartBaseMessage() | ||
{ | ||
if (this.currentMessage != null) | ||
{ | ||
throw new InvalidOperationException("Message base is already initialized."); | ||
} | ||
|
||
var message = new AgentToServer() | ||
{ | ||
InstanceUid = this.instanceUid, | ||
SequenceNum = ++this.sequenceNum, | ||
martincostello marked this conversation as resolved.
Show resolved
Hide resolved
|
||
}; | ||
|
||
this.currentMessage = message; | ||
return this; | ||
} | ||
|
||
AgentToServer IFrameBuilder.Build() | ||
martincostello marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ | ||
if (this.currentMessage == null) | ||
{ | ||
throw new InvalidOperationException("Message base is not initialized."); | ||
} | ||
|
||
var message = this.currentMessage; | ||
this.currentMessage = null; // Reset for the next message | ||
|
||
return message; | ||
} | ||
|
||
public void Reset() | ||
{ | ||
this.currentMessage = null; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
using OpenTelemetry.Internal; | ||
using OpenTelemetry.OpAmp.Client.Transport; | ||
|
||
namespace OpenTelemetry.OpAmp.Client; | ||
|
||
internal class FrameDispatcher : IDisposable | ||
{ | ||
private readonly IOpAmpTransport transport; | ||
private readonly FrameBuilder frameBuilder; | ||
private readonly SemaphoreSlim syncRoot = new(1, 1); | ||
|
||
public FrameDispatcher(IOpAmpTransport transport, OpAmpClientSettings settings) | ||
{ | ||
Guard.ThrowIfNull(transport, nameof(transport)); | ||
Guard.ThrowIfNull(settings, nameof(settings)); | ||
|
||
this.transport = transport; | ||
this.frameBuilder = new FrameBuilder(settings); | ||
} | ||
|
||
// TODO: May need to redesign to request only partials | ||
// so any other message waiting to be sent can be included to optimize transport usage and locking time. | ||
public async Task DispatchServerFrameAsync(CancellationToken token) | ||
{ | ||
await this.syncRoot.WaitAsync(token) | ||
.ConfigureAwait(false); | ||
|
||
try | ||
{ | ||
var message = this.frameBuilder | ||
.StartBaseMessage() | ||
.Build(); | ||
|
||
// TODO: change to proper logging | ||
Console.WriteLine("Sending identification message."); | ||
|
||
await this.transport.SendAsync(message, token) | ||
.ConfigureAwait(false); | ||
} | ||
catch (Exception ex) | ||
{ | ||
// TODO: change to proper logging | ||
Console.WriteLine($"[Error]: {ex.Message}"); | ||
|
||
this.frameBuilder.Reset(); // Reset the builder in case of failure | ||
} | ||
finally | ||
{ | ||
this.syncRoot.Release(); | ||
} | ||
} | ||
|
||
public void Dispose() | ||
{ | ||
this.syncRoot.Dispose(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
using System.Buffers; | ||
using System.Collections.Concurrent; | ||
using System.Collections.Immutable; | ||
using OpAmp.Protocol; | ||
using OpenTelemetry.Internal; | ||
using OpenTelemetry.OpAmp.Client.Listeners; | ||
using OpenTelemetry.OpAmp.Client.Listeners.Messages; | ||
|
||
namespace OpenTelemetry.OpAmp.Client; | ||
|
||
internal class FrameProcessor | ||
martincostello marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
{ | ||
private readonly ConcurrentDictionary<Type, ImmutableList<IOpAmpListener>> listeners = []; | ||
|
||
public void Subscribe<T>(IOpAmpListener<T> listener) | ||
where T : IOpAmpMessage | ||
{ | ||
Guard.ThrowIfNull(listener, nameof(listener)); | ||
|
||
// It is expected to be much more read-heavy than write-heavy, so we use ImmutableList for thread safety | ||
this.listeners.AddOrUpdate( | ||
typeof(T), | ||
_ => [listener], | ||
(_, list) => list.Add(listener)); | ||
} | ||
|
||
public void Unsubscribe<T>(IOpAmpListener<T> listener) | ||
where T : IOpAmpMessage | ||
{ | ||
Guard.ThrowIfNull(listener, nameof(listener)); | ||
|
||
this.listeners.AddOrUpdate( | ||
typeof(T), | ||
_ => ImmutableList<IOpAmpListener>.Empty, | ||
(_, list) => | ||
{ | ||
if (list.Count == 1 && list[0] == listener) | ||
{ | ||
return ImmutableList<IOpAmpListener>.Empty; | ||
} | ||
|
||
return list.Remove(listener); | ||
}); | ||
} | ||
|
||
public void OnServerFrame(ReadOnlySequence<byte> sequence) | ||
{ | ||
this.Deserialize(sequence); | ||
} | ||
|
||
private void Deserialize(ReadOnlySequence<byte> sequence) | ||
{ | ||
var message = ServerToAgent.Parser.ParseFrom(sequence); | ||
|
||
if (message.ErrorResponse != null) | ||
martincostello marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ | ||
this.Dispatch(new ErrorResponseMessage() { ErrorResponse = message.ErrorResponse }); | ||
} | ||
|
||
if (message.RemoteConfig != null) | ||
{ | ||
this.Dispatch(new RemoteConfigMessage() { RemoteConfig = message.RemoteConfig }); | ||
} | ||
|
||
if (message.ConnectionSettings != null) | ||
{ | ||
this.Dispatch(new ConnectionSettingsMessage() { ConnectionSettings = message.ConnectionSettings }); | ||
} | ||
|
||
if (message.PackagesAvailable != null) | ||
{ | ||
this.Dispatch(new PackagesAvailableMessage() { PackagesAvailable = message.PackagesAvailable }); | ||
} | ||
|
||
if (message.Flags != 0) | ||
{ | ||
this.Dispatch(new FlagsMessage() { Flags = (ServerToAgentFlags)message.Flags }); | ||
} | ||
|
||
if (message.Capabilities != 0) | ||
{ | ||
this.Dispatch(new CapabilitiesMessage() { Capabilities = (ServerCapabilities)message.Capabilities }); | ||
} | ||
|
||
if (message.AgentIdentification != null) | ||
{ | ||
this.Dispatch(new AgentIdentificationMessage() { AgentIdentification = message.AgentIdentification }); | ||
} | ||
|
||
if (message.Command != null) | ||
{ | ||
this.Dispatch(new CommandMessage() { Command = message.Command }); | ||
} | ||
|
||
if (message.CustomCapabilities != null) | ||
{ | ||
this.Dispatch(new CustomCapabilitiesMessage() { CustomCapabilities = message.CustomCapabilities }); | ||
} | ||
|
||
if (message.CustomMessage != null) | ||
{ | ||
this.Dispatch(new CustomMessageMessage() { CustomMessage = message.CustomMessage }); | ||
} | ||
} | ||
|
||
private void Dispatch<T>(T message) | ||
where T : IOpAmpMessage | ||
{ | ||
if (this.listeners.TryGetValue(typeof(T), out var list)) | ||
{ | ||
foreach (var listener in list) | ||
{ | ||
if (listener is IOpAmpListener<T> typedListener) | ||
{ | ||
typedListener.HandleMessage(message); | ||
} | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
using OpAmp.Protocol; | ||
|
||
namespace OpenTelemetry.OpAmp.Client; | ||
|
||
internal interface IFrameBuilder | ||
{ | ||
AgentToServer Build(); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
namespace OpenTelemetry.OpAmp.Client.Listeners; | ||
|
||
internal interface IOpAmpListener | ||
{ | ||
} | ||
|
||
internal interface IOpAmpListener<TMessage> : IOpAmpListener | ||
where TMessage : IOpAmpMessage | ||
{ | ||
void HandleMessage(TMessage message); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
namespace OpenTelemetry.OpAmp.Client.Listeners; | ||
|
||
internal interface IOpAmpMessage | ||
{ | ||
} | ||
Comment on lines
+6
to
+8
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this just intended to be a marker interface? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, due some partials in ServerToAgent aren't IMessages, just flags. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
using OpAmp.Protocol; | ||
|
||
namespace OpenTelemetry.OpAmp.Client.Listeners.Messages; | ||
|
||
internal class AgentIdentificationMessage : IOpAmpMessage | ||
{ | ||
public AgentIdentification AgentIdentification { get; set; } | ||
Check failure on line 10 in src/OpenTelemetry.OpAmp.Client/Listeners/Messages/AgentIdentificationMessage.cs
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
using OpAmp.Protocol; | ||
|
||
namespace OpenTelemetry.OpAmp.Client.Listeners.Messages; | ||
|
||
internal class CapabilitiesMessage : IOpAmpMessage | ||
{ | ||
public ServerCapabilities Capabilities { get; set; } | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
using OpAmp.Protocol; | ||
|
||
namespace OpenTelemetry.OpAmp.Client.Listeners.Messages; | ||
|
||
internal class CommandMessage : IOpAmpMessage | ||
{ | ||
public ServerToAgentCommand Command { get; set; } | ||
Check failure on line 10 in src/OpenTelemetry.OpAmp.Client/Listeners/Messages/CommandMessage.cs
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
using OpAmp.Protocol; | ||
|
||
namespace OpenTelemetry.OpAmp.Client.Listeners.Messages; | ||
|
||
internal class ConnectionSettingsMessage : IOpAmpMessage | ||
{ | ||
public ConnectionSettingsOffers ConnectionSettings { get; set; } | ||
Check failure on line 10 in src/OpenTelemetry.OpAmp.Client/Listeners/Messages/ConnectionSettingsMessage.cs
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
using OpAmp.Protocol; | ||
|
||
namespace OpenTelemetry.OpAmp.Client.Listeners.Messages; | ||
|
||
internal class CustomCapabilitiesMessage : IOpAmpMessage | ||
{ | ||
public CustomCapabilities CustomCapabilities { get; set; } | ||
Check failure on line 10 in src/OpenTelemetry.OpAmp.Client/Listeners/Messages/CustomCapabilitiesMessage.cs
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
using OpAmp.Protocol; | ||
|
||
namespace OpenTelemetry.OpAmp.Client.Listeners.Messages; | ||
|
||
internal class CustomMessageMessage : IOpAmpMessage | ||
{ | ||
public CustomMessage CustomMessage { get; set; } | ||
Check failure on line 10 in src/OpenTelemetry.OpAmp.Client/Listeners/Messages/CustomMessageMessage.cs
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
using OpAmp.Protocol; | ||
|
||
namespace OpenTelemetry.OpAmp.Client.Listeners.Messages; | ||
|
||
internal class ErrorResponseMessage : IOpAmpMessage | ||
{ | ||
public ServerErrorResponse ErrorResponse { get; set; } | ||
Check failure on line 10 in src/OpenTelemetry.OpAmp.Client/Listeners/Messages/ErrorResponseMessage.cs
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
using OpAmp.Protocol; | ||
|
||
namespace OpenTelemetry.OpAmp.Client.Listeners.Messages; | ||
|
||
internal class FlagsMessage : IOpAmpMessage | ||
{ | ||
public ServerToAgentFlags Flags { get; set; } | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
using OpAmp.Protocol; | ||
|
||
namespace OpenTelemetry.OpAmp.Client.Listeners.Messages; | ||
|
||
internal class PackagesAvailableMessage : IOpAmpMessage | ||
{ | ||
public PackagesAvailable PackagesAvailable { get; set; } | ||
Check failure on line 10 in src/OpenTelemetry.OpAmp.Client/Listeners/Messages/PackagesAvailableMessage.cs
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also needs a link to this PR.