Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,6 @@ jobs:
|| contains(needs.detect-changes.outputs.changes, 'instrumentation-servicefabricremoting')
|| contains(needs.detect-changes.outputs.changes, 'instrumentation-sqlclient')
|| contains(needs.detect-changes.outputs.changes, 'instrumentation-stackexchangeredis')
|| contains(needs.detect-changes.outputs.changes, 'opamp-client')
|| contains(needs.detect-changes.outputs.changes, 'resources-aws')
|| contains(needs.detect-changes.outputs.changes, 'resources-azure')
|| contains(needs.detect-changes.outputs.changes, 'resources-container')
Expand Down
7 changes: 4 additions & 3 deletions src/OpenTelemetry.OpAmp.Client/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Unreleased

* Initial release of
`OpenTelemetry.OpAmp.Client`
project.
* Initial release of `OpenTelemetry.OpAmp.Client` project.
([#2917](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/2917))
* Added support for OpAMP Plain HTTP transport.
([#2926](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/2926))
57 changes: 57 additions & 0 deletions src/OpenTelemetry.OpAmp.Client/FrameBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using Google.Protobuf;
using OpAmp.Proto.V1;

namespace OpenTelemetry.OpAmp.Client;

internal sealed class FrameBuilder : IFrameBuilder
{
private readonly OpAmpClientSettings settings;

private AgentToServer? currentMessage;
private ByteString instanceUid;
private ulong sequenceNum;

public FrameBuilder(OpAmpClientSettings settings)
{
this.settings = settings;
this.instanceUid = ByteString.CopyFrom(this.settings.InstanceUid.ToByteArray());
}

public IFrameBuilder StartBaseMessage()
{
if (this.currentMessage != null)
{
throw new InvalidOperationException("Message base is already initialized.");
}

var message = new AgentToServer()
{
InstanceUid = this.instanceUid,
SequenceNum = ++this.sequenceNum,
};

this.currentMessage = message;
return this;
}

AgentToServer IFrameBuilder.Build()
{
if (this.currentMessage == null)
{
throw new InvalidOperationException("Message base is not initialized.");
}

var message = this.currentMessage;
this.currentMessage = null; // Reset for the next message

return message;
}

public void Reset()
{
this.currentMessage = null;
}
}
60 changes: 60 additions & 0 deletions src/OpenTelemetry.OpAmp.Client/FrameDispatcher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using OpenTelemetry.Internal;
using OpenTelemetry.OpAmp.Client.Transport;

namespace OpenTelemetry.OpAmp.Client;

internal sealed class FrameDispatcher : IDisposable
{
private readonly IOpAmpTransport transport;
private readonly FrameBuilder frameBuilder;
private readonly SemaphoreSlim syncRoot = new(1, 1);

public FrameDispatcher(IOpAmpTransport transport, OpAmpClientSettings settings)
{
Guard.ThrowIfNull(transport, nameof(transport));
Guard.ThrowIfNull(settings, nameof(settings));

this.transport = transport;
this.frameBuilder = new FrameBuilder(settings);
}

// TODO: May need to redesign to request only partials
// so any other message waiting to be sent can be included to optimize transport usage and locking time.
public async Task DispatchServerFrameAsync(CancellationToken token)
{
await this.syncRoot.WaitAsync(token)
.ConfigureAwait(false);

try
{
var message = this.frameBuilder
.StartBaseMessage()
.Build();

// TODO: change to proper logging
Console.WriteLine("Sending identification message.");

await this.transport.SendAsync(message, token)
.ConfigureAwait(false);
}
catch (Exception ex)
{
// TODO: change to proper logging
Console.WriteLine($"[Error]: {ex.Message}");

this.frameBuilder.Reset(); // Reset the builder in case of failure
}
finally
{
this.syncRoot.Release();
}
}

public void Dispose()
{
this.syncRoot.Dispose();
}
}
123 changes: 123 additions & 0 deletions src/OpenTelemetry.OpAmp.Client/FrameProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System.Buffers;
using System.Collections.Concurrent;
using System.Collections.Immutable;
using OpAmp.Proto.V1;
using OpenTelemetry.Internal;
using OpenTelemetry.OpAmp.Client.Listeners;
using OpenTelemetry.OpAmp.Client.Listeners.Messages;

namespace OpenTelemetry.OpAmp.Client;

internal sealed class FrameProcessor
{
private readonly ConcurrentDictionary<Type, ImmutableList<IOpAmpListener>> listeners = [];

public void Subscribe<T>(IOpAmpListener<T> listener)
where T : IOpAmpMessage
{
Guard.ThrowIfNull(listener, nameof(listener));

// It is expected to be much more read-heavy than write-heavy, so we use ImmutableList for thread safety
this.listeners.AddOrUpdate(
typeof(T),
_ => [listener],
(_, list) => list.Add(listener));
}

public void Unsubscribe<T>(IOpAmpListener<T> listener)
where T : IOpAmpMessage
{
Guard.ThrowIfNull(listener, nameof(listener));

this.listeners.AddOrUpdate(
typeof(T),
_ => ImmutableList<IOpAmpListener>.Empty,
(_, list) =>
{
if (list.Count == 1 && list[0] == listener)
{
return ImmutableList<IOpAmpListener>.Empty;
}

return list.Remove(listener);
});
}

public void OnServerFrame(ReadOnlySequence<byte> sequence)
{
this.Deserialize(sequence);
}

private void Deserialize(ReadOnlySequence<byte> sequence)
{
var message = ServerToAgent.Parser.ParseFrom(sequence);

if (message.ErrorResponse != null)
{
this.Dispatch(new ErrorResponseMessage(message.ErrorResponse));
}

if (message.RemoteConfig != null)
{
this.Dispatch(new RemoteConfigMessage(message.RemoteConfig));
}

if (message.ConnectionSettings != null)
{
this.Dispatch(new ConnectionSettingsMessage(message.ConnectionSettings));
}

if (message.PackagesAvailable != null)
{
this.Dispatch(new PackagesAvailableMessage(message.PackagesAvailable));
}

if (message.Flags != 0)
{
this.Dispatch(new FlagsMessage((ServerToAgentFlags)message.Flags));
}

if (message.Capabilities != 0)
{
this.Dispatch(new CapabilitiesMessage((ServerCapabilities)message.Capabilities));
}

if (message.AgentIdentification != null)
{
this.Dispatch(new AgentIdentificationMessage(message.AgentIdentification));
}

if (message.Command != null)
{
this.Dispatch(new CommandMessage(message.Command));
}

if (message.CustomCapabilities != null)
{
this.Dispatch(new CustomCapabilitiesMessage(message.CustomCapabilities));
}

if (message.CustomMessage != null)
{
this.Dispatch(new CustomMessageMessage(message.CustomMessage));
}
}

private void Dispatch<T>(T message)
where T : IOpAmpMessage
{
if (this.listeners.TryGetValue(typeof(T), out var list))
{
foreach (var listener in list)
{
if (listener is IOpAmpListener<T> typedListener)
{
typedListener.HandleMessage(message);
}
}
}
}
}
11 changes: 11 additions & 0 deletions src/OpenTelemetry.OpAmp.Client/IFrameBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using OpAmp.Proto.V1;

namespace OpenTelemetry.OpAmp.Client;

internal interface IFrameBuilder
{
AgentToServer Build();
}
14 changes: 14 additions & 0 deletions src/OpenTelemetry.OpAmp.Client/Listeners/IOpAmpListener.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

namespace OpenTelemetry.OpAmp.Client.Listeners;

internal interface IOpAmpListener
{
}

internal interface IOpAmpListener<TMessage> : IOpAmpListener
where TMessage : IOpAmpMessage
{
void HandleMessage(TMessage message);
}
8 changes: 8 additions & 0 deletions src/OpenTelemetry.OpAmp.Client/Listeners/IOpAmpMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

namespace OpenTelemetry.OpAmp.Client.Listeners;

internal interface IOpAmpMessage
{
}
Comment on lines +6 to +8
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this just intended to be a marker interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, due some partials in ServerToAgent aren't IMessages, just flags.

Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using OpAmp.Proto.V1;

namespace OpenTelemetry.OpAmp.Client.Listeners.Messages;

internal class AgentIdentificationMessage : IOpAmpMessage
{
public AgentIdentificationMessage(AgentIdentification agentIdentification)
{
this.AgentIdentification = agentIdentification;
}

public AgentIdentification AgentIdentification { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using OpAmp.Proto.V1;

namespace OpenTelemetry.OpAmp.Client.Listeners.Messages;

internal class CapabilitiesMessage : IOpAmpMessage
{
public CapabilitiesMessage(ServerCapabilities capabilities)
{
this.Capabilities = capabilities;
}

public ServerCapabilities Capabilities { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using OpAmp.Proto.V1;

namespace OpenTelemetry.OpAmp.Client.Listeners.Messages;

internal class CommandMessage : IOpAmpMessage
{
public CommandMessage(ServerToAgentCommand command)
{
this.Command = command;
}

public ServerToAgentCommand Command { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using OpAmp.Proto.V1;

namespace OpenTelemetry.OpAmp.Client.Listeners.Messages;

internal class ConnectionSettingsMessage : IOpAmpMessage
{
public ConnectionSettingsMessage(ConnectionSettingsOffers connectionSettingsOffers)
{
this.ConnectionSettings = connectionSettingsOffers;
}

public ConnectionSettingsOffers ConnectionSettings { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using OpAmp.Proto.V1;

namespace OpenTelemetry.OpAmp.Client.Listeners.Messages;

internal class CustomCapabilitiesMessage : IOpAmpMessage
{
public CustomCapabilitiesMessage(CustomCapabilities customCapabilities)
{
this.CustomCapabilities = customCapabilities;
}

public CustomCapabilities CustomCapabilities { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using OpAmp.Proto.V1;

namespace OpenTelemetry.OpAmp.Client.Listeners.Messages;

internal class CustomMessageMessage : IOpAmpMessage
{
public CustomMessageMessage(CustomMessage customMessage)
{
this.CustomMessage = customMessage;
}

public CustomMessage CustomMessage { get; set; }
}
Loading
Loading