namespace HrynCo.NotificationService.Worker; using System.Text; using System.Text.Json; using HrynCo.NotificationService.Contracts.Messages; using HrynCo.NotificationService.DAL.Abstract.Providers; using HrynCo.NotificationService.DAL.Abstract.Repositories; using HrynCo.NotificationService.DAL.Abstract.Templates; using HrynCo.NotificationService.Services.EmailChannels.Send; using Hrynco.RabbitMq; using MediatR; using Microsoft.Extensions.Options; using RabbitMQ.Client; public sealed class SendEmailConsumer( IOptionsMonitor options, IEmailChannelRepository channelRepository, IEmailTemplateRepository templateRepository, IEmailChannelUsageRepository usageRepository, IMediator mediator, AppSettings appSettings, ILogger logger) : RabbitMqConsumerBase(options, logger) { private const string IncomingQueue = "notification.send-email"; protected override string QueueName => IncomingQueue; protected override async Task HandleMessageAsync(SendEmailMessage message, CancellationToken cancellationToken) { var data = message.Data; logger.LogInformation( "Processing SendEmail for service={Service} template={Template} recipient={Recipient} [CorrelationId={CorrelationId}]", data.ServiceName, data.TemplateKey, data.RecipientEmail, message.CorrelationContext?.CorrelationId); var channel = await ResolveChannelAsync(data.ServiceName, cancellationToken); var template = await ResolveTemplateAsync(data.ServiceName, data.TemplateKey, data.LanguageCode, cancellationToken); await EnforceLimitsAsync(channel, cancellationToken); var rendered = RenderTemplate(template, data); var sendResult = await mediator.Send( new SendEmailCommand(channel.Id, data.RecipientEmail, data.RecipientName, rendered.Subject, rendered.HtmlBody, rendered.TextBody), cancellationToken); if (!sendResult.IsSuccess) throw new InvalidOperationException(sendResult.Error?.Message ?? "Send failed."); logger.LogInformation( "Email sent successfully service={Service} template={Template} recipient={Recipient}", data.ServiceName, data.TemplateKey, data.RecipientEmail); await PublishResultAsync(message.CorrelationContext, data, errorMessage: null, cancellationToken); } private async Task ResolveChannelAsync(string serviceName, CancellationToken ct) { var channels = await channelRepository.GetByServiceAsync(serviceName, ct); return channels .Where(c => c.IsActive) .OrderBy(c => c.Priority) .FirstOrDefault() ?? throw new InvalidOperationException( $"No active email channel found for service '{serviceName}'."); } private async Task ResolveTemplateAsync( string serviceName, string templateKey, string? languageCode, CancellationToken ct) { var lang = string.IsNullOrWhiteSpace(languageCode) ? "en" : languageCode; var template = await templateRepository.GetAsync(serviceName, templateKey, lang, ct); if (template is null && lang != "en") template = await templateRepository.GetAsync(serviceName, templateKey, "en", ct); return template ?? throw new InvalidOperationException( $"Template not found: service='{serviceName}' key='{templateKey}' language='{lang}'."); } private async Task EnforceLimitsAsync(EmailChannel channel, CancellationToken ct) { var today = DateOnly.FromDateTime(DateTime.UtcNow); if (channel.DailyLimit.HasValue) { var daily = await usageRepository.GetDailyCountAsync(channel.Id, today, ct); if (daily >= channel.DailyLimit.Value) throw new InvalidOperationException( $"Channel '{channel.Id}' daily limit of {channel.DailyLimit.Value} reached ({daily} sent today)."); } if (channel.MonthlyLimit.HasValue) { var monthly = await usageRepository.GetMonthlyCountAsync(channel.Id, today.Year, today.Month, ct); if (monthly >= channel.MonthlyLimit.Value) throw new InvalidOperationException( $"Channel '{channel.Id}' monthly limit of {channel.MonthlyLimit.Value} reached ({monthly} sent this month)."); } } private static RenderedEmail RenderTemplate(EmailTemplate template, SendEmailMessageData data) { return new RenderedEmail( Interpolate(template.Subject, data.Variables), Interpolate(template.HtmlBody, data.Variables), Interpolate(template.TextBody, data.Variables)); } private static string Interpolate(string text, IReadOnlyDictionary variables) { var sb = new StringBuilder(text); foreach (var (key, value) in variables) sb.Replace($"{{{{{key}}}}}", value); return sb.ToString(); } private async Task PublishResultAsync( CorrelationContext? correlationContext, SendEmailMessageData data, string? errorMessage, CancellationToken ct) { var replyTo = correlationContext?.ReplyTo; if (string.IsNullOrWhiteSpace(replyTo)) return; try { var result = new NotificationResultMessage { CorrelationContext = (correlationContext ?? new CorrelationContext { CorrelationId = Guid.NewGuid().ToString() }) with { ReplyTo = null }, Data = new NotificationResultData { ServiceName = data.ServiceName, RecipientEmail = data.RecipientEmail, TemplateKey = data.TemplateKey, Timestamp = DateTimeOffset.UtcNow, ErrorMessage = errorMessage } }; byte[] body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(result)); var factory = new ConnectionFactory { HostName = appSettings.RabbitMq.Host, Port = appSettings.RabbitMq.Port, UserName = appSettings.RabbitMq.User, Password = appSettings.RabbitMq.Password, VirtualHost = appSettings.RabbitMq.VirtualHost }; await using var conn = await factory.CreateConnectionAsync(ct); await using var ch = await conn.CreateChannelAsync(cancellationToken: ct); await ch.QueueDeclareAsync(replyTo, durable: true, exclusive: false, autoDelete: false, cancellationToken: ct); await ch.BasicPublishAsync(exchange: string.Empty, routingKey: replyTo, body: body, cancellationToken: ct); logger.LogDebug("Result published to reply queue '{Queue}' [CorrelationId={CorrelationId}]", replyTo, correlationContext?.CorrelationId); } catch (Exception ex) { logger.LogWarning(ex, "Failed to publish notification result to reply queue '{Queue}'", replyTo); } } }