Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions RabbitMQ.AMQP.Client/IConsumerBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
namespace RabbitMQ.AMQP.Client;


public enum StreamOffsetSpecification
{
First,
Expand All @@ -18,7 +17,7 @@ public interface IConsumerBuilder

IStreamOptions Stream();

IConsumer Build();
Task<IConsumer> BuildAsync(CancellationToken cancellationToken = default);

public interface IStreamOptions
{
Expand All @@ -36,6 +35,4 @@ public interface IStreamOptions

IConsumerBuilder Builder();
}


}
8 changes: 4 additions & 4 deletions RabbitMQ.AMQP.Client/IEntities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public interface IEntityInfo
/// <typeparam name="T"></typeparam>
public interface IEntityInfoDeclaration<T> where T : IEntityInfo
{
// TODO this really should be named DeclareAsync
Task<T> Declare();
}

Expand All @@ -18,6 +19,7 @@ public interface IEntityInfoDeclaration<T> where T : IEntityInfo
/// </summary>
public interface IEntityDeclaration
{
// TODO this really should be named DeclareAsync
Task Declare();
}

Expand Down Expand Up @@ -49,7 +51,6 @@ public interface IQueueSpecification : IEntityInfoDeclaration<IQueueInfo>
IQueueSpecification Type(QueueType type);
public QueueType Type();


IQueueSpecification DeadLetterExchange(string dlx);

IQueueSpecification DeadLetterRoutingKey(string dlrk);
Expand All @@ -60,7 +61,6 @@ public interface IQueueSpecification : IEntityInfoDeclaration<IQueueInfo>

IQueueSpecification SingleActiveConsumer(bool singleActiveConsumer);


IQueueSpecification Expires(TimeSpan expiration);

IStreamSpecification Stream();
Expand All @@ -69,10 +69,8 @@ public interface IQueueSpecification : IEntityInfoDeclaration<IQueueInfo>

IClassicQueueSpecification Classic();


IQueueSpecification MaxLength(long maxLength);


IQueueSpecification MessageTtl(TimeSpan ttl);
}

Expand Down Expand Up @@ -135,6 +133,7 @@ public interface IClassicQueueSpecification
public interface IQueueDeletion
{
// TODO consider returning a QueueStatus object with some info after deletion
// TODO should be named DeleteAsync and take CancellationToken
Task<IEntityInfo> Delete(string name);
}

Expand All @@ -154,6 +153,7 @@ public interface IExchangeSpecification : IEntityDeclaration
public interface IExchangeDeletion
{
// TODO consider returning a ExchangeStatus object with some info after deletion
// TODO should be named DeleteAsync and take CancellationToken
Task Delete(string name);
}

Expand Down
4 changes: 3 additions & 1 deletion RabbitMQ.AMQP.Client/IEntitiesInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ public enum QueueType

public interface IQueueInfo : IEntityInfo
{
// TODO these should be properties, not methods
string Name();

bool Durable();
Expand All @@ -19,10 +20,12 @@ public interface IQueueInfo : IEntityInfo

QueueType Type();

// TODO IDictionary
Dictionary<string, object> Arguments();

string Leader();

// TODO IEnumerable? ICollection?
List<string> Replicas();

ulong MessageCount();
Expand All @@ -38,4 +41,3 @@ public enum ExchangeType
TOPIC,
HEADERS
}

2 changes: 1 addition & 1 deletion RabbitMQ.AMQP.Client/ILifeCycle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public override string ToString()

public delegate void LifeCycleCallBack(object sender, State previousState, State currentState, Error? failureCause);

public interface ILifeCycle
public interface ILifeCycle : IDisposable
{
Task CloseAsync();

Expand Down
3 changes: 3 additions & 0 deletions RabbitMQ.AMQP.Client/IManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ public interface IManagement : ILifeCycle
IQueueSpecification Queue();
IQueueSpecification Queue(string name);

Task<IQueueInfo> GetQueueInfoAsync(string queueName,
CancellationToken cancellationToken = default);

IQueueDeletion QueueDeletion();

IExchangeSpecification Exchange();
Expand Down
1 change: 1 addition & 0 deletions RabbitMQ.AMQP.Client/IPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class OutcomeDescriptor(ulong code, string description, OutcomeState stat

public interface IPublisher : ILifeCycle
{
// TODO this should be named PublishAsync
Task Publish(IMessage message,
OutcomeDescriptorCallback outcomeCallback); // TODO: Add CancellationToken and callBack
}
3 changes: 2 additions & 1 deletion RabbitMQ.AMQP.Client/IPublisherBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ public interface IPublisherBuilder : IAddressBuilder<IPublisherBuilder>
IPublisherBuilder PublishTimeout(TimeSpan timeout);

IPublisherBuilder MaxInflightMessages(int maxInFlight);
IPublisher Build();

Task<IPublisher> BuildAsync(CancellationToken cancellationToken = default);
}
39 changes: 35 additions & 4 deletions RabbitMQ.AMQP.Client/Impl/AbstractLifeCycle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ public class AmqpNotOpenException(string message) : Exception(message);

public abstract class AbstractLifeCycle : ILifeCycle
{
protected virtual Task OpenAsync()
private bool _disposedValue;

public virtual Task OpenAsync()
{
OnNewStatus(State.Open, null);
return Task.CompletedTask;
Expand All @@ -16,6 +18,15 @@ protected virtual Task OpenAsync()

public State State { get; internal set; } = State.Closed;

public event LifeCycleCallBack? ChangeState;

public void Dispose()
{
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
Dispose(disposing: true);
GC.SuppressFinalize(this);
}

protected void ThrowIfClosed()
{
switch (State)
Expand Down Expand Up @@ -49,7 +60,27 @@ protected void OnNewStatus(State newState, Error? error)
ChangeState?.Invoke(this, oldStatus, newState, error);
}

public event LifeCycleCallBack? ChangeState;
protected virtual void Dispose(bool disposing)
{
if (!_disposedValue)
{
if (disposing)
{
// TODO: dispose managed state (managed objects)
}

// TODO: free unmanaged resources (unmanaged objects) and override finalizer
// TODO: set large fields to null
_disposedValue = true;
}
}

// // TODO: override finalizer only if 'Dispose(bool disposing)' has code to free unmanaged resources
// ~AbstractLifeCycle()
// {
// // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
// Dispose(disposing: false);
// }
}

public abstract class AbstractReconnectLifeCycle : AbstractLifeCycle
Expand All @@ -61,7 +92,7 @@ internal void ChangeStatus(State newState, Error? error)
OnNewStatus(newState, error);
}

internal async Task Reconnect()
internal async Task ReconnectAsync()
{
try
{
Expand All @@ -84,7 +115,7 @@ internal async Task Reconnect()
await Task.Delay(delay).ConfigureAwait(false);
if (_backOffDelayPolicy.IsActive())
{
await Reconnect().ConfigureAwait(false);
await ReconnectAsync().ConfigureAwait(false);
}
}
}
Expand Down
9 changes: 4 additions & 5 deletions RabbitMQ.AMQP.Client/Impl/AmqpBindingSpecification.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public async Task Bind()
{ ToQueue ? "destination_queue" : "destination_exchange", Destination }
};

await Management.Request(kv, $"/{Consts.Bindings}",
await Management.RequestAsync(kv, $"/{Consts.Bindings}",
AmqpManagement.Post,
[
AmqpManagement.Code204,
Expand All @@ -54,7 +54,7 @@ public async Task Unbind()
$"{($"{destinationCharacter}={Utils.EncodePathSegment(Destination)};" +
$"key={Utils.EncodePathSegment(RoutingKey)};args=")}";

await Management.Request(
await Management.RequestAsync(
null, target,
AmqpManagement.Delete, new[] { AmqpManagement.Code204 }).ConfigureAwait(false);
}
Expand All @@ -65,7 +65,7 @@ await Management.Request(
string? uri = MatchBinding(bindings, RoutingKey, ArgsToMap());
if (uri != null)
{
await Management.Request(
await Management.RequestAsync(
null, uri,
AmqpManagement.Delete, new[] { AmqpManagement.Code204 }).ConfigureAwait(false);
}
Expand Down Expand Up @@ -125,7 +125,7 @@ private string BindingsTarget(

private async Task<List<Map>> GetBindings(string path)
{
var result = await Management.Request(
Amqp.Message result = await Management.RequestAsync(
null, path,
AmqpManagement.Get, new[] { AmqpManagement.Code200 }).ConfigureAwait(false);

Expand All @@ -134,7 +134,6 @@ private async Task<List<Map>> GetBindings(string path)
return [];
}


var l = new List<Map>() { };
foreach (object o in list)
{
Expand Down
Loading