From 5b963b25a73a0e1d6b4c12bc4f12c49e63a27a71 Mon Sep 17 00:00:00 2001 From: Anatolii Grynchuk Date: Fri, 1 May 2026 12:25:42 +0300 Subject: [PATCH] chore: initial repository scaffold - Add Hrynco.RabbitMq class library with RabbitMQ client abstractions - Add Hrynco.RabbitMq.Tests xunit project - ImplicitUsings disabled on all projects Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .gitignore | 6 + .../Hrynco.RabbitMq.Tests.csproj | 25 +++ Hrynco.RabbitMq.slnx | 4 + Hrynco.RabbitMq/CorrelationContext.cs | 6 + Hrynco.RabbitMq/Hrynco.RabbitMq.csproj | 27 ++++ Hrynco.RabbitMq/IRabbitMqMessage.cs | 7 + Hrynco.RabbitMq/IRabbitMqPublisher.cs | 9 ++ Hrynco.RabbitMq/README.md | 15 ++ Hrynco.RabbitMq/RabbitMqConsumerBase.cs | 148 ++++++++++++++++++ Hrynco.RabbitMq/RabbitMqPublisher.cs | 68 ++++++++ Hrynco.RabbitMq/RabbitMqSettings.cs | 10 ++ 11 files changed, 325 insertions(+) create mode 100644 .gitignore create mode 100644 Hrynco.RabbitMq.Tests/Hrynco.RabbitMq.Tests.csproj create mode 100644 Hrynco.RabbitMq.slnx create mode 100644 Hrynco.RabbitMq/CorrelationContext.cs create mode 100644 Hrynco.RabbitMq/Hrynco.RabbitMq.csproj create mode 100644 Hrynco.RabbitMq/IRabbitMqMessage.cs create mode 100644 Hrynco.RabbitMq/IRabbitMqPublisher.cs create mode 100644 Hrynco.RabbitMq/README.md create mode 100644 Hrynco.RabbitMq/RabbitMqConsumerBase.cs create mode 100644 Hrynco.RabbitMq/RabbitMqPublisher.cs create mode 100644 Hrynco.RabbitMq/RabbitMqSettings.cs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b188456 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +bin/ +obj/ +artifacts/ +TestResults/ +.idea/ +*.DotSettings.user diff --git a/Hrynco.RabbitMq.Tests/Hrynco.RabbitMq.Tests.csproj b/Hrynco.RabbitMq.Tests/Hrynco.RabbitMq.Tests.csproj new file mode 100644 index 0000000..7dfd582 --- /dev/null +++ b/Hrynco.RabbitMq.Tests/Hrynco.RabbitMq.Tests.csproj @@ -0,0 +1,25 @@ + + + + net10.0 + disable + enable + false + + + + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + + + + + + \ No newline at end of file diff --git a/Hrynco.RabbitMq.slnx b/Hrynco.RabbitMq.slnx new file mode 100644 index 0000000..08f2a98 --- /dev/null +++ b/Hrynco.RabbitMq.slnx @@ -0,0 +1,4 @@ + + + + diff --git a/Hrynco.RabbitMq/CorrelationContext.cs b/Hrynco.RabbitMq/CorrelationContext.cs new file mode 100644 index 0000000..4745829 --- /dev/null +++ b/Hrynco.RabbitMq/CorrelationContext.cs @@ -0,0 +1,6 @@ +namespace Hrynco.RabbitMq; + +public record CorrelationContext +{ + public required string CorrelationId { get; init; } +} diff --git a/Hrynco.RabbitMq/Hrynco.RabbitMq.csproj b/Hrynco.RabbitMq/Hrynco.RabbitMq.csproj new file mode 100644 index 0000000..6c3cb30 --- /dev/null +++ b/Hrynco.RabbitMq/Hrynco.RabbitMq.csproj @@ -0,0 +1,27 @@ + + + + net10.0 + disable + enable + Hrynco.RabbitMq + HrynCo.RabbitMq + HrynCo + RabbitMQ publisher and consumer base for HrynCo applications. + hrynco rabbitmq messaging + git + https://gitea.grynco.com.ua/hrynco/hrynco-rabbitmq.git + README.md + + + + + + + + + + + + + diff --git a/Hrynco.RabbitMq/IRabbitMqMessage.cs b/Hrynco.RabbitMq/IRabbitMqMessage.cs new file mode 100644 index 0000000..a07760d --- /dev/null +++ b/Hrynco.RabbitMq/IRabbitMqMessage.cs @@ -0,0 +1,7 @@ +namespace Hrynco.RabbitMq; + +public interface IRabbitMqMessage +{ + CorrelationContext CorrelationContext { get; set; } + TMessageData Data { get; set; } +} diff --git a/Hrynco.RabbitMq/IRabbitMqPublisher.cs b/Hrynco.RabbitMq/IRabbitMqPublisher.cs new file mode 100644 index 0000000..d9542bb --- /dev/null +++ b/Hrynco.RabbitMq/IRabbitMqPublisher.cs @@ -0,0 +1,9 @@ +namespace Hrynco.RabbitMq; + +using System.Threading; +using System.Threading.Tasks; + +public interface IRabbitMqPublisher +{ + Task PublishAsync(string queue, IRabbitMqMessage message, CancellationToken cancellationToken = default); +} diff --git a/Hrynco.RabbitMq/README.md b/Hrynco.RabbitMq/README.md new file mode 100644 index 0000000..25da421 --- /dev/null +++ b/Hrynco.RabbitMq/README.md @@ -0,0 +1,15 @@ +# HrynCo.RabbitMq + +RabbitMQ publisher and consumer base for HrynCo applications. + +## Contents + +- `RabbitMqSettings` — connection settings record (host, port, user, password, virtual host) +- `IRabbitMqPublisher` / `RabbitMqPublisher` — publishes JSON-serialized messages to a named queue +- `RabbitMqConsumerBase` — abstract background service base for consumers, with retry + dead-letter support +- `IRabbitMqMessage` — message contract interface +- `CorrelationContext` — correlation ID carrier + +## Packaging + +This package is intended for reuse through NuGet. The test project is excluded from packing. diff --git a/Hrynco.RabbitMq/RabbitMqConsumerBase.cs b/Hrynco.RabbitMq/RabbitMqConsumerBase.cs new file mode 100644 index 0000000..6b6e90e --- /dev/null +++ b/Hrynco.RabbitMq/RabbitMqConsumerBase.cs @@ -0,0 +1,148 @@ +namespace Hrynco.RabbitMq; + +using System; +using System.Text; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; + +/// +/// Base class for RabbitMQ consumers. Handles connection management, manual ack, +/// and retry with backoff before dead-lettering. +/// Override to use a named instance. +/// +public abstract class RabbitMqConsumerBase : BackgroundService + where TMessage : class, IRabbitMqMessage +{ + private readonly IOptionsMonitor _options; + private readonly ILogger _logger; + + private IConnection? _connection; + private IChannel? _channel; + + protected RabbitMqConsumerBase(IOptionsMonitor options, ILogger logger) + { + _options = options; + _logger = logger; + } + + protected abstract string QueueName { get; } + + /// + /// Name of the instance to use. + /// Override to use a named instance when multiple RabbitMQ connections are configured. + /// Defaults to (the unnamed instance). + /// + protected virtual string SettingsName => Options.DefaultName; + + protected virtual int MaxRetries => 3; + protected virtual TimeSpan RetryDelay => TimeSpan.FromSeconds(5); + + private RabbitMqSettings Settings => _options.Get(SettingsName); + + protected abstract Task HandleMessageAsync(TMessage message, CancellationToken cancellationToken); + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + await EnsureConnectionAsync(stoppingToken); + + var consumer = new AsyncEventingBasicConsumer(_channel!); + + consumer.ReceivedAsync += async (_, args) => + { + await ProcessMessageAsync(args, stoppingToken); + }; + + await _channel!.BasicConsumeAsync(queue: QueueName, autoAck: false, consumer: consumer, + cancellationToken: stoppingToken); + + // Hold until cancellation — consumer events fire on the channel thread + await Task.Delay(Timeout.Infinite, stoppingToken).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); + } + + private async Task ProcessMessageAsync(BasicDeliverEventArgs args, CancellationToken cancellationToken) + { + TMessage? message = null; + + try + { + var json = Encoding.UTF8.GetString(args.Body.ToArray()); + message = JsonSerializer.Deserialize(json); + + if (message is null) + { + _logger.LogWarning("Received null message on queue {Queue} — nacking without requeue", QueueName); + await _channel!.BasicNackAsync(args.DeliveryTag, multiple: false, requeue: false, cancellationToken: cancellationToken); + return; + } + } + catch (JsonException ex) + { + _logger.LogError(ex, "Failed to deserialize message on queue {Queue} — nacking without requeue", QueueName); + await _channel!.BasicNackAsync(args.DeliveryTag, multiple: false, requeue: false, cancellationToken: cancellationToken); + return; + } + + for (int attempt = 1; attempt <= MaxRetries; attempt++) + { + try + { + await HandleMessageAsync(message, cancellationToken); + await _channel!.BasicAckAsync(args.DeliveryTag, multiple: false, cancellationToken: cancellationToken); + return; + } + catch (Exception ex) when (attempt < MaxRetries) + { + _logger.LogWarning(ex, + "Attempt {Attempt}/{Max} failed for message on queue {Queue} [CorrelationId={CorrelationId}] — retrying in {Delay}s", + attempt, MaxRetries, QueueName, message.CorrelationContext?.CorrelationId, RetryDelay.TotalSeconds); + + await Task.Delay(RetryDelay, cancellationToken); + } + catch (Exception ex) + { + _logger.LogError(ex, + "All {Max} attempts exhausted for message on queue {Queue} [CorrelationId={CorrelationId}] — nacking without requeue", + MaxRetries, QueueName, message.CorrelationContext?.CorrelationId); + + await _channel!.BasicNackAsync(args.DeliveryTag, multiple: false, requeue: false, cancellationToken: cancellationToken); + } + } + } + + private async Task EnsureConnectionAsync(CancellationToken cancellationToken) + { + var s = Settings; + var factory = new ConnectionFactory + { + HostName = s.Host, + Port = s.Port, + UserName = s.User, + Password = s.Password, + VirtualHost = s.VirtualHost + }; + + _connection = await factory.CreateConnectionAsync(cancellationToken); + _channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken); + + await _channel.QueueDeclareAsync(QueueName, durable: true, exclusive: false, autoDelete: false, + cancellationToken: cancellationToken); + await _channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false, + cancellationToken: cancellationToken); + + _logger.LogInformation("RabbitMQ consumer connected to queue {Queue} (settings: {SettingsName})", + QueueName, SettingsName); + } + + public override void Dispose() + { + _channel?.DisposeAsync().AsTask().GetAwaiter().GetResult(); + _connection?.DisposeAsync().AsTask().GetAwaiter().GetResult(); + base.Dispose(); + } +} diff --git a/Hrynco.RabbitMq/RabbitMqPublisher.cs b/Hrynco.RabbitMq/RabbitMqPublisher.cs new file mode 100644 index 0000000..8d93b30 --- /dev/null +++ b/Hrynco.RabbitMq/RabbitMqPublisher.cs @@ -0,0 +1,68 @@ +namespace Hrynco.RabbitMq; + +using System.Text; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using RabbitMQ.Client; + +public sealed class RabbitMqPublisher : IRabbitMqPublisher +{ + private readonly IOptionsMonitor _options; + private readonly string _settingsName; + private readonly ILogger _logger; + + /// Named options monitor for . + /// Logger. + /// + /// Name of the instance to use. + /// Defaults to (i.e. the unnamed instance). + /// + public RabbitMqPublisher( + IOptionsMonitor options, + ILogger logger, + string? settingsName = null) + { + _options = options; + _logger = logger; + _settingsName = settingsName ?? Options.DefaultName; + } + + private RabbitMqSettings Settings => _options.Get(_settingsName); + + public async Task PublishAsync(string queue, IRabbitMqMessage message, CancellationToken cancellationToken = default) + { + var factory = BuildFactory(Settings); + + await using var connection = await factory.CreateConnectionAsync(cancellationToken); + await using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken); + + await channel.QueueDeclareAsync(queue, durable: true, exclusive: false, autoDelete: false, + cancellationToken: cancellationToken); + + var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(message)); + + var props = new BasicProperties + { + Persistent = true, + ContentType = "application/json" + }; + + await channel.BasicPublishAsync(exchange: "", routingKey: queue, mandatory: false, + basicProperties: props, body: body, cancellationToken: cancellationToken); + + _logger.LogDebug("Published to queue {Queue} [CorrelationId={CorrelationId}]", + queue, message.CorrelationContext?.CorrelationId); + } + + private static ConnectionFactory BuildFactory(RabbitMqSettings s) => new() + { + HostName = s.Host, + Port = s.Port, + UserName = s.User, + Password = s.Password, + VirtualHost = s.VirtualHost + }; +} diff --git a/Hrynco.RabbitMq/RabbitMqSettings.cs b/Hrynco.RabbitMq/RabbitMqSettings.cs new file mode 100644 index 0000000..49efa6b --- /dev/null +++ b/Hrynco.RabbitMq/RabbitMqSettings.cs @@ -0,0 +1,10 @@ +namespace Hrynco.RabbitMq; + +public record RabbitMqSettings +{ + public required string Host { get; init; } + public required string User { get; init; } + public required string Password { get; init; } + public int Port { get; init; } = 5672; + public string VirtualHost { get; init; } = "/"; +}