refactor: modularize email processing logic and improve service structure

- Extract email template handling, rendering, and sending code into `Worker.Services` project.
- Introduce `EmailTemplateService`, `EmailTemplateRenderingService`, and `SendEmailService`.
- Simplify consumer logic by delegating to scoped services.
- Update project dependencies and package references accordingly.
This commit is contained in:
Anatolii Grynchuk
2026-05-14 22:15:15 +03:00
parent 0861e18cec
commit 25fb48ccf0
18 changed files with 394 additions and 174 deletions
@@ -1,174 +1,32 @@
namespace HrynCo.NotificationService.Worker;
using System.Text;
using System.Text.Json;
using HrynCo.NotificationService.Contracts.Messages;
using HrynCo.NotificationService.DAL.Abstract.Providers;
using HrynCo.NotificationService.DAL.Abstract.Repositories;
using HrynCo.NotificationService.DAL.Abstract.Templates;
using HrynCo.NotificationService.Services.EmailChannels.Send;
using Hrynco.RabbitMq;
using MediatR;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using HrynCo.NotificationService.Worker.Services.EmailProcessing;
public sealed class SendEmailConsumer(
IOptionsMonitor<RabbitMqSettings> options,
IEmailChannelRepository channelRepository,
IEmailTemplateRepository templateRepository,
IEmailChannelUsageRepository usageRepository,
IMediator mediator,
AppSettings appSettings,
ILogger<SendEmailConsumer> logger)
: RabbitMqConsumerBase<SendEmailMessage, SendEmailMessageData>(options, logger)
public sealed class SendEmailConsumer : RabbitMqConsumerBase<SendEmailMessage, SendEmailMessageData>
{
private readonly IServiceScopeFactory _scopeFactory;
public SendEmailConsumer(
IOptionsMonitor<RabbitMqSettings> options,
IServiceScopeFactory scopeFactory,
ILogger<SendEmailConsumer> logger)
: base(options, logger)
{
_scopeFactory = scopeFactory;
}
private const string IncomingQueue = "notification.send-email";
protected override string QueueName => IncomingQueue;
protected override async Task HandleMessageAsync(SendEmailMessage message, CancellationToken cancellationToken)
{
var data = message.Data;
logger.LogInformation(
"Processing SendEmail for service={Service} template={Template} recipient={Recipient} [CorrelationId={CorrelationId}]",
data.ServiceName, data.TemplateKey, data.RecipientEmail, message.CorrelationContext?.CorrelationId);
var channel = await ResolveChannelAsync(data.ServiceName, cancellationToken);
var template = await ResolveTemplateAsync(data.ServiceName, data.TemplateKey, data.LanguageCode, cancellationToken);
await EnforceLimitsAsync(channel, cancellationToken);
var rendered = RenderTemplate(template, data);
var sendResult = await mediator.Send(
new SendEmailCommand(channel.Id, data.RecipientEmail, data.RecipientName,
rendered.Subject, rendered.HtmlBody, rendered.TextBody),
cancellationToken);
if (!sendResult.IsSuccess)
throw new InvalidOperationException(sendResult.Error?.Message ?? "Send failed.");
logger.LogInformation(
"Email sent successfully service={Service} template={Template} recipient={Recipient}",
data.ServiceName, data.TemplateKey, data.RecipientEmail);
await PublishResultAsync(message.CorrelationContext, data, errorMessage: null, cancellationToken);
}
private async Task<EmailChannel> ResolveChannelAsync(string serviceName, CancellationToken ct)
{
var channels = await channelRepository.GetByServiceAsync(serviceName, ct);
return channels
.Where(c => c.IsActive)
.OrderBy(c => c.Priority)
.FirstOrDefault()
?? throw new InvalidOperationException(
$"No active email channel found for service '{serviceName}'.");
}
private async Task<EmailTemplate> ResolveTemplateAsync(
string serviceName, string templateKey, string? languageCode, CancellationToken ct)
{
var lang = string.IsNullOrWhiteSpace(languageCode) ? "en" : languageCode;
var template = await templateRepository.GetAsync(serviceName, templateKey, lang, ct);
if (template is null && lang != "en")
template = await templateRepository.GetAsync(serviceName, templateKey, "en", ct);
return template
?? throw new InvalidOperationException(
$"Template not found: service='{serviceName}' key='{templateKey}' language='{lang}'.");
}
private async Task EnforceLimitsAsync(EmailChannel channel, CancellationToken ct)
{
var today = DateOnly.FromDateTime(DateTime.UtcNow);
if (channel.DailyLimit.HasValue)
{
var daily = await usageRepository.GetDailyCountAsync(channel.Id, today, ct);
if (daily >= channel.DailyLimit.Value)
throw new InvalidOperationException(
$"Channel '{channel.Id}' daily limit of {channel.DailyLimit.Value} reached ({daily} sent today).");
}
if (channel.MonthlyLimit.HasValue)
{
var monthly = await usageRepository.GetMonthlyCountAsync(channel.Id, today.Year, today.Month, ct);
if (monthly >= channel.MonthlyLimit.Value)
throw new InvalidOperationException(
$"Channel '{channel.Id}' monthly limit of {channel.MonthlyLimit.Value} reached ({monthly} sent this month).");
}
}
private static RenderedEmail RenderTemplate(EmailTemplate template, SendEmailMessageData data)
{
return new RenderedEmail(
Interpolate(template.Subject, data.Variables),
Interpolate(template.HtmlBody, data.Variables),
Interpolate(template.TextBody, data.Variables));
}
private static string Interpolate(string text, IReadOnlyDictionary<string, string> variables)
{
var sb = new StringBuilder(text);
foreach (var (key, value) in variables)
sb.Replace($"{{{{{key}}}}}", value);
return sb.ToString();
}
private async Task PublishResultAsync(
CorrelationContext? correlationContext,
SendEmailMessageData data,
string? errorMessage,
CancellationToken ct)
{
var replyTo = correlationContext?.ReplyTo;
if (string.IsNullOrWhiteSpace(replyTo))
return;
try
{
var result = new NotificationResultMessage
{
CorrelationContext = (correlationContext ?? new CorrelationContext { CorrelationId = Guid.NewGuid().ToString() }) with { ReplyTo = null },
Data = new NotificationResultData
{
ServiceName = data.ServiceName,
RecipientEmail = data.RecipientEmail,
TemplateKey = data.TemplateKey,
Timestamp = DateTimeOffset.UtcNow,
ErrorMessage = errorMessage
}
};
byte[] body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(result));
var factory = new ConnectionFactory
{
HostName = appSettings.RabbitMq.Host,
Port = appSettings.RabbitMq.Port,
UserName = appSettings.RabbitMq.User,
Password = appSettings.RabbitMq.Password,
VirtualHost = appSettings.RabbitMq.VirtualHost
};
await using var conn = await factory.CreateConnectionAsync(ct);
await using var ch = await conn.CreateChannelAsync(cancellationToken: ct);
await ch.QueueDeclareAsync(replyTo, durable: true, exclusive: false, autoDelete: false,
cancellationToken: ct);
await ch.BasicPublishAsync(exchange: string.Empty, routingKey: replyTo, body: body,
cancellationToken: ct);
logger.LogDebug("Result published to reply queue '{Queue}' [CorrelationId={CorrelationId}]",
replyTo, correlationContext?.CorrelationId);
}
catch (Exception ex)
{
logger.LogWarning(ex, "Failed to publish notification result to reply queue '{Queue}'", replyTo);
}
using var scope = _scopeFactory.CreateScope();
var service = scope.ServiceProvider.GetRequiredService<ISendEmailService>();
await service.ProcessAsync(message, cancellationToken);
}
}