Skip to content

E@E Example

Step 1 — App Starts, DI Wires Everything

// Program.cs
builder.Services.AddSingleton<RabbitMqConnection>();        // one TCP connection
builder.Services.AddSingleton<IMessageBus, RabbitMqMessageBus>(); // publisher

builder.Services.AddSingleton<IMessageHandler<OrderCreatedEvent>, OrderCreatedHandler>();
builder.Services.AddHostedService<OrderCreatedConsumer>();   // BackgroundService

Three things registered:

RabbitMqConnection     singleton, created once, lives forever
RabbitMqMessageBus     singleton, owns its own channel for publishing
OrderCreatedConsumer   BackgroundService, starts automatically on app startup

Step 2 — RabbitMqConnection Is Created

First time anything asks for it from DI — singleton constructor runs:

public class RabbitMqConnection : IDisposable
{
    private readonly IConnection _connection;

    public RabbitMqConnection(IConfiguration config)
    {
        var connectionString = config["RabbitMq:ConnectionString"];
        // "amqp://guest:guest@rabbitmq:5672"

        var factory = new ConnectionFactory
        {
            Uri = new Uri(connectionString),
            DispatchConsumersAsync = true   // required for async handlers
        };

        // TCP handshake + AMQP negotiation happens here
        // This is the one persistent TCP connection for the entire app
        _connection = factory.CreateConnection();
    }

    public IModel CreateChannel() => _connection.CreateModel();
    // cheap — just opens a virtual lane on the existing TCP connection
}

After this line — one persistent TCP connection to RabbitMQ exists. Nothing else opens a TCP connection. Everything uses this one.


Step 3 — RabbitMqMessageBus Is Created (Publisher)

Also singleton — constructor runs once:

public class RabbitMqMessageBus : IMessageBus, IDisposable
{
    private readonly IModel _channel;

    public RabbitMqMessageBus(RabbitMqConnection connection, ILogger<RabbitMqMessageBus> logger)
    {
        // Opens a channel — virtual lane on the TCP connection
        _channel = connection.CreateChannel();

        // Declares both exchanges — idempotent, no-op if already exist
        _channel.ExchangeDeclare("domain.events",   ExchangeType.Topic,  durable: true);
        _channel.ExchangeDeclare("domain.commands", ExchangeType.Direct, durable: true);
    }
}

After this:

TCP connection: open
Publisher channel: open
domain.events exchange: declared
domain.commands exchange: declared

Step 4 — OrderCreatedConsumer BackgroundService Starts

ExecuteAsync runs automatically on app startup:

protected override Task ExecuteAsync(CancellationToken ct)
{
    // Opens its OWN channel — separate virtual lane from publisher
    _channel = _connection.CreateChannel();

    // Declares DLX exchange (if retry or DLQ enabled)
    _channel.ExchangeDeclare("dlx.exchange", ExchangeType.Direct, durable: true);

    // Declares retry queue — no consumer, just TTL + dead letter back to main
    _channel.QueueDeclare(
        "demo.order-created.queue.retry",
        durable: true,
        arguments: {
            "x-message-ttl":             30000,
            "x-dead-letter-exchange":    "domain.events",
            "x-dead-letter-routing-key": "OrderCreatedEvent"
        }
    );

    // Declares DLQ — normal queue, graveyard
    _channel.QueueDeclare("demo.order-created.queue.dlq");
    _channel.QueueBind("demo.order-created.queue.dlq", "dlx.exchange", "demo.order-created.queue.dlq");

    // Declares main queue and binds to exchange
    _channel.QueueDeclare("demo.order-created.queue", durable: true);
    _channel.QueueBind("demo.order-created.queue", "domain.events", "OrderCreatedEvent");

    // QoS — one message at a time, fair dispatch
    _channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

    // Registers consumer with broker — broker now knows this consumer exists
    var consumer = new AsyncEventingBasicConsumer(_channel);
    consumer.Received += async (_, ea) => { ... };
    _channel.BasicConsume("demo.order-created.queue", autoAck: false, consumer);

    return Task.CompletedTask;  // returns immediately, consumer runs in background
}

After this:

TCP connection:                    open (shared)
Publisher channel:                 open (owned by MessageBus)
Consumer channel:                  open (owned by OrderCreatedConsumer)
domain.events exchange:            declared
domain.commands exchange:          declared
dlx.exchange:                      declared
demo.order-created.queue:          declared, bound to domain.events/OrderCreatedEvent
demo.order-created.queue.retry:    declared, TTL 30s, dead letters back to domain.events
demo.order-created.queue.dlq:      declared, bound to dlx.exchange
Consumer registered with broker:   YES  broker will push messages to it

Step 5 — HTTP Request Hits The Controller

[HttpPost("create")]
public async Task<IActionResult> CreateOrder([FromBody] CreateOrderRequest req)
{
    var orderId = Guid.NewGuid();

    await _bus.PublishAsync(new OrderCreatedEvent(orderId, req.CustomerName, req.Total));
    await _bus.SendAsync(new ProcessInvoiceCommand(orderId, req.Total, req.CustomerName), "invoice-service");

    return Ok(new { OrderId = orderId });
}

Two calls. Let's trace each.


Step 6 — PublishAsync (Event)

public Task PublishAsync<T>(T message) where T : IntegrationEvent
{
    var routingKey = typeof(T).Name;  // "OrderCreatedEvent"

    var envelope = new MessageEnvelope
    {
        MessageType = "OrderCreatedEvent",
        PayloadJson = JsonSerializer.Serialize(message),
        // { "orderId": "...", "customerName": "John", "total": 99.99 }
        Timestamp   = DateTime.UtcNow,
        MessageId   = Guid.NewGuid()
    };

    var body  = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(envelope));
    var props = _channel.CreateBasicProperties();
    props.Persistent  = true;   // written to disk, survives restart
    props.ContentType = "application/json";

    _channel.BasicPublish(
        exchange:        "domain.events",      // topic exchange
        routingKey:      "OrderCreatedEvent",  // type name
        basicProperties: props,
        body:            body
    );
}

Message is now in the broker. Publisher is done. No idea who receives it.


Step 7 — Broker Routes The Message

domain.events exchange receives message
routing key: "OrderCreatedEvent"

Checks all bindings:
  "OrderCreatedEvent"  demo.order-created.queue   match
  "PaymentProcessedEvent"  ...                    no match

Delivers copy to demo.order-created.queue
Message sits in queue waiting for consumer to pick it up

Step 8 — Consumer Receives It

The AsyncEventingBasicConsumer fires its Received event:

consumer.Received += async (_, ea) =>
{
    try
    {
        // 1. Deserialize envelope
        var json     = Encoding.UTF8.GetString(ea.Body.ToArray());
        var envelope = JsonSerializer.Deserialize<MessageEnvelope>(json);
        // envelope.MessageType = "OrderCreatedEvent"
        // envelope.PayloadJson = "{ orderId, customerName, total }"

        // 2. Deserialize payload into strongly typed message
        var message = JsonSerializer.Deserialize<OrderCreatedEvent>(envelope.PayloadJson);

        // 3. Call handler — pure business logic
        await _handler.HandleAsync(message);
        // OrderCreatedHandler.HandleAsync runs
        // logs: "🛒 Order received! ..."

        // 4. ACK — tell broker message processed, remove from queue
        _channel.BasicAck(ea.DeliveryTag, multiple: false);
    }
    catch (Exception ex)
    {
        var retryCount = GetRetryCount(ea.BasicProperties);  // 0 first time

        if (retryCount < 3)  // MaxRetries
        {
            // publish to retry queue — waiting room
            _channel.BasicPublish("", "demo.order-created.queue.retry", props, ea.Body);
        }
        else
        {
            // publish to DLQ — graveyard
            _channel.BasicPublish("dlx.exchange", "demo.order-created.queue.dlq", props, ea.Body);
        }

        // ACK original — remove from main queue either way
        _channel.BasicAck(ea.DeliveryTag, false);
    }
};

Step 9 — SendAsync (Command)

Back in the controller, second call:

await _bus.SendAsync(
    new ProcessInvoiceCommand(orderId, req.Total, req.CustomerName),
    destination: "invoice-service"
);
public Task SendAsync<T>(T message, string destination) where T : Command
{
    // routing key = destination name, NOT type name
    _channel.BasicPublish(
        exchange:   "domain.commands",  // direct exchange
        routingKey: "invoice-service",  // who should handle it
        props,
        body
    );
}
domain.commands (Direct) receives message
routing key: "invoice-service"

Exact match only:
  "invoice-service"  invoice-service queue    nothing else matches

Delivered to invoice-service queue
ProcessInvoiceConsumer picks it up
ProcessInvoiceHandler.HandleAsync runs
logs: "🧾 Invoice processing..."

The Complete Picture

App startup:
  DI creates RabbitMqConnection     one TCP connection established
  DI creates RabbitMqMessageBus     publisher channel opened, exchanges declared
  BackgroundService starts          consumer channel opened
                                     queues declared and bound
                                     consumer registered with broker

HTTP request arrives:
  Controller calls PublishAsync     envelope built, BasicPublish to domain.events
  Broker routes                     matched queues receive copies
  Controller calls SendAsync        envelope built, BasicPublish to domain.commands
  Broker routes                     exact match queue receives it
  Controller returns 200            done, doesn't wait for consumers

Consumer side (independent):
  Broker pushes message             Received event fires
  Deserialize envelope              get MessageEnvelope
  Deserialize payload               get OrderCreatedEvent
  Call handler                      business logic runs
  Success  BasicAck                message deleted from queue
  Failure  retry count check
    < max   publish to retry queue  TTL  dead letter back  retry
    >= max  publish to DLQ          sits for inspection
  Always ACK original               remove from main queue

What Lives Where

TCP connection         RabbitMqConnection (singleton)
Publisher channel      RabbitMqMessageBus (singleton, owns one channel)
Consumer channel       RabbitMqConsumer<T> (one per BackgroundService)
Exchange declaration   RabbitMqMessageBus constructor (publisher owns exchanges)
Queue declaration      RabbitMqConsumer<T>.ExecuteAsync (consumer owns queues)
Business logic         IMessageHandler<T> implementation
Retry/DLQ routing      RabbitMqConsumer<T> catch block

Everything has a single clear owner. Nothing is shared that shouldn't be. That's the entire flow.