chore: initial repository scaffold
- Add Hrynco.RabbitMq class library with RabbitMQ client abstractions - Add Hrynco.RabbitMq.Tests xunit project - ImplicitUsings disabled on all projects Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -0,0 +1,148 @@
|
||||
namespace Hrynco.RabbitMq;
|
||||
|
||||
using System;
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using RabbitMQ.Client;
|
||||
using RabbitMQ.Client.Events;
|
||||
|
||||
/// <summary>
|
||||
/// Base class for RabbitMQ consumers. Handles connection management, manual ack,
|
||||
/// and retry with backoff before dead-lettering.
|
||||
/// Override <see cref="SettingsName"/> to use a named <see cref="RabbitMqSettings"/> instance.
|
||||
/// </summary>
|
||||
public abstract class RabbitMqConsumerBase<TMessage, TMessageData> : BackgroundService
|
||||
where TMessage : class, IRabbitMqMessage<TMessageData>
|
||||
{
|
||||
private readonly IOptionsMonitor<RabbitMqSettings> _options;
|
||||
private readonly ILogger _logger;
|
||||
|
||||
private IConnection? _connection;
|
||||
private IChannel? _channel;
|
||||
|
||||
protected RabbitMqConsumerBase(IOptionsMonitor<RabbitMqSettings> options, ILogger logger)
|
||||
{
|
||||
_options = options;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
protected abstract string QueueName { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Name of the <see cref="RabbitMqSettings"/> instance to use.
|
||||
/// Override to use a named instance when multiple RabbitMQ connections are configured.
|
||||
/// Defaults to <see cref="Options.DefaultName"/> (the unnamed instance).
|
||||
/// </summary>
|
||||
protected virtual string SettingsName => Options.DefaultName;
|
||||
|
||||
protected virtual int MaxRetries => 3;
|
||||
protected virtual TimeSpan RetryDelay => TimeSpan.FromSeconds(5);
|
||||
|
||||
private RabbitMqSettings Settings => _options.Get(SettingsName);
|
||||
|
||||
protected abstract Task HandleMessageAsync(TMessage message, CancellationToken cancellationToken);
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
await EnsureConnectionAsync(stoppingToken);
|
||||
|
||||
var consumer = new AsyncEventingBasicConsumer(_channel!);
|
||||
|
||||
consumer.ReceivedAsync += async (_, args) =>
|
||||
{
|
||||
await ProcessMessageAsync(args, stoppingToken);
|
||||
};
|
||||
|
||||
await _channel!.BasicConsumeAsync(queue: QueueName, autoAck: false, consumer: consumer,
|
||||
cancellationToken: stoppingToken);
|
||||
|
||||
// Hold until cancellation — consumer events fire on the channel thread
|
||||
await Task.Delay(Timeout.Infinite, stoppingToken).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
|
||||
}
|
||||
|
||||
private async Task ProcessMessageAsync(BasicDeliverEventArgs args, CancellationToken cancellationToken)
|
||||
{
|
||||
TMessage? message = null;
|
||||
|
||||
try
|
||||
{
|
||||
var json = Encoding.UTF8.GetString(args.Body.ToArray());
|
||||
message = JsonSerializer.Deserialize<TMessage>(json);
|
||||
|
||||
if (message is null)
|
||||
{
|
||||
_logger.LogWarning("Received null message on queue {Queue} — nacking without requeue", QueueName);
|
||||
await _channel!.BasicNackAsync(args.DeliveryTag, multiple: false, requeue: false, cancellationToken: cancellationToken);
|
||||
return;
|
||||
}
|
||||
}
|
||||
catch (JsonException ex)
|
||||
{
|
||||
_logger.LogError(ex, "Failed to deserialize message on queue {Queue} — nacking without requeue", QueueName);
|
||||
await _channel!.BasicNackAsync(args.DeliveryTag, multiple: false, requeue: false, cancellationToken: cancellationToken);
|
||||
return;
|
||||
}
|
||||
|
||||
for (int attempt = 1; attempt <= MaxRetries; attempt++)
|
||||
{
|
||||
try
|
||||
{
|
||||
await HandleMessageAsync(message, cancellationToken);
|
||||
await _channel!.BasicAckAsync(args.DeliveryTag, multiple: false, cancellationToken: cancellationToken);
|
||||
return;
|
||||
}
|
||||
catch (Exception ex) when (attempt < MaxRetries)
|
||||
{
|
||||
_logger.LogWarning(ex,
|
||||
"Attempt {Attempt}/{Max} failed for message on queue {Queue} [CorrelationId={CorrelationId}] — retrying in {Delay}s",
|
||||
attempt, MaxRetries, QueueName, message.CorrelationContext?.CorrelationId, RetryDelay.TotalSeconds);
|
||||
|
||||
await Task.Delay(RetryDelay, cancellationToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex,
|
||||
"All {Max} attempts exhausted for message on queue {Queue} [CorrelationId={CorrelationId}] — nacking without requeue",
|
||||
MaxRetries, QueueName, message.CorrelationContext?.CorrelationId);
|
||||
|
||||
await _channel!.BasicNackAsync(args.DeliveryTag, multiple: false, requeue: false, cancellationToken: cancellationToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task EnsureConnectionAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
var s = Settings;
|
||||
var factory = new ConnectionFactory
|
||||
{
|
||||
HostName = s.Host,
|
||||
Port = s.Port,
|
||||
UserName = s.User,
|
||||
Password = s.Password,
|
||||
VirtualHost = s.VirtualHost
|
||||
};
|
||||
|
||||
_connection = await factory.CreateConnectionAsync(cancellationToken);
|
||||
_channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken);
|
||||
|
||||
await _channel.QueueDeclareAsync(QueueName, durable: true, exclusive: false, autoDelete: false,
|
||||
cancellationToken: cancellationToken);
|
||||
await _channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false,
|
||||
cancellationToken: cancellationToken);
|
||||
|
||||
_logger.LogInformation("RabbitMQ consumer connected to queue {Queue} (settings: {SettingsName})",
|
||||
QueueName, SettingsName);
|
||||
}
|
||||
|
||||
public override void Dispose()
|
||||
{
|
||||
_channel?.DisposeAsync().AsTask().GetAwaiter().GetResult();
|
||||
_connection?.DisposeAsync().AsTask().GetAwaiter().GetResult();
|
||||
base.Dispose();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user