E@E Example
Step 1 — App Starts, DI Wires Everything¶
// Program.cs
builder.Services.AddSingleton<RabbitMqConnection>(); // one TCP connection
builder.Services.AddSingleton<IMessageBus, RabbitMqMessageBus>(); // publisher
builder.Services.AddSingleton<IMessageHandler<OrderCreatedEvent>, OrderCreatedHandler>();
builder.Services.AddHostedService<OrderCreatedConsumer>(); // BackgroundService
Three things registered:
RabbitMqConnection → singleton, created once, lives forever
RabbitMqMessageBus → singleton, owns its own channel for publishing
OrderCreatedConsumer → BackgroundService, starts automatically on app startup
Step 2 — RabbitMqConnection Is Created¶
First time anything asks for it from DI — singleton constructor runs:
public class RabbitMqConnection : IDisposable
{
private readonly IConnection _connection;
public RabbitMqConnection(IConfiguration config)
{
var connectionString = config["RabbitMq:ConnectionString"];
// "amqp://guest:guest@rabbitmq:5672"
var factory = new ConnectionFactory
{
Uri = new Uri(connectionString),
DispatchConsumersAsync = true // required for async handlers
};
// TCP handshake + AMQP negotiation happens here
// This is the one persistent TCP connection for the entire app
_connection = factory.CreateConnection();
}
public IModel CreateChannel() => _connection.CreateModel();
// cheap — just opens a virtual lane on the existing TCP connection
}
After this line — one persistent TCP connection to RabbitMQ exists. Nothing else opens a TCP connection. Everything uses this one.
Step 3 — RabbitMqMessageBus Is Created (Publisher)¶
Also singleton — constructor runs once:
public class RabbitMqMessageBus : IMessageBus, IDisposable
{
private readonly IModel _channel;
public RabbitMqMessageBus(RabbitMqConnection connection, ILogger<RabbitMqMessageBus> logger)
{
// Opens a channel — virtual lane on the TCP connection
_channel = connection.CreateChannel();
// Declares both exchanges — idempotent, no-op if already exist
_channel.ExchangeDeclare("domain.events", ExchangeType.Topic, durable: true);
_channel.ExchangeDeclare("domain.commands", ExchangeType.Direct, durable: true);
}
}
After this:
TCP connection: open
Publisher channel: open
domain.events exchange: declared
domain.commands exchange: declared
Step 4 — OrderCreatedConsumer BackgroundService Starts¶
ExecuteAsync runs automatically on app startup:
protected override Task ExecuteAsync(CancellationToken ct)
{
// Opens its OWN channel — separate virtual lane from publisher
_channel = _connection.CreateChannel();
// Declares DLX exchange (if retry or DLQ enabled)
_channel.ExchangeDeclare("dlx.exchange", ExchangeType.Direct, durable: true);
// Declares retry queue — no consumer, just TTL + dead letter back to main
_channel.QueueDeclare(
"demo.order-created.queue.retry",
durable: true,
arguments: {
"x-message-ttl": 30000,
"x-dead-letter-exchange": "domain.events",
"x-dead-letter-routing-key": "OrderCreatedEvent"
}
);
// Declares DLQ — normal queue, graveyard
_channel.QueueDeclare("demo.order-created.queue.dlq");
_channel.QueueBind("demo.order-created.queue.dlq", "dlx.exchange", "demo.order-created.queue.dlq");
// Declares main queue and binds to exchange
_channel.QueueDeclare("demo.order-created.queue", durable: true);
_channel.QueueBind("demo.order-created.queue", "domain.events", "OrderCreatedEvent");
// QoS — one message at a time, fair dispatch
_channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
// Registers consumer with broker — broker now knows this consumer exists
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.Received += async (_, ea) => { ... };
_channel.BasicConsume("demo.order-created.queue", autoAck: false, consumer);
return Task.CompletedTask; // returns immediately, consumer runs in background
}
After this:
TCP connection: open (shared)
Publisher channel: open (owned by MessageBus)
Consumer channel: open (owned by OrderCreatedConsumer)
domain.events exchange: declared
domain.commands exchange: declared
dlx.exchange: declared
demo.order-created.queue: declared, bound to domain.events/OrderCreatedEvent
demo.order-created.queue.retry: declared, TTL 30s, dead letters back to domain.events
demo.order-created.queue.dlq: declared, bound to dlx.exchange
Consumer registered with broker: YES — broker will push messages to it
Step 5 — HTTP Request Hits The Controller¶
[HttpPost("create")]
public async Task<IActionResult> CreateOrder([FromBody] CreateOrderRequest req)
{
var orderId = Guid.NewGuid();
await _bus.PublishAsync(new OrderCreatedEvent(orderId, req.CustomerName, req.Total));
await _bus.SendAsync(new ProcessInvoiceCommand(orderId, req.Total, req.CustomerName), "invoice-service");
return Ok(new { OrderId = orderId });
}
Two calls. Let's trace each.
Step 6 — PublishAsync (Event)¶
public Task PublishAsync<T>(T message) where T : IntegrationEvent
{
var routingKey = typeof(T).Name; // "OrderCreatedEvent"
var envelope = new MessageEnvelope
{
MessageType = "OrderCreatedEvent",
PayloadJson = JsonSerializer.Serialize(message),
// { "orderId": "...", "customerName": "John", "total": 99.99 }
Timestamp = DateTime.UtcNow,
MessageId = Guid.NewGuid()
};
var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(envelope));
var props = _channel.CreateBasicProperties();
props.Persistent = true; // written to disk, survives restart
props.ContentType = "application/json";
_channel.BasicPublish(
exchange: "domain.events", // topic exchange
routingKey: "OrderCreatedEvent", // type name
basicProperties: props,
body: body
);
}
Message is now in the broker. Publisher is done. No idea who receives it.
Step 7 — Broker Routes The Message¶
domain.events exchange receives message
routing key: "OrderCreatedEvent"
Checks all bindings:
"OrderCreatedEvent" → demo.order-created.queue ✓ match
"PaymentProcessedEvent" → ... ✗ no match
Delivers copy to demo.order-created.queue
Message sits in queue waiting for consumer to pick it up
Step 8 — Consumer Receives It¶
The AsyncEventingBasicConsumer fires its Received event:
consumer.Received += async (_, ea) =>
{
try
{
// 1. Deserialize envelope
var json = Encoding.UTF8.GetString(ea.Body.ToArray());
var envelope = JsonSerializer.Deserialize<MessageEnvelope>(json);
// envelope.MessageType = "OrderCreatedEvent"
// envelope.PayloadJson = "{ orderId, customerName, total }"
// 2. Deserialize payload into strongly typed message
var message = JsonSerializer.Deserialize<OrderCreatedEvent>(envelope.PayloadJson);
// 3. Call handler — pure business logic
await _handler.HandleAsync(message);
// OrderCreatedHandler.HandleAsync runs
// logs: "🛒 Order received! ..."
// 4. ACK — tell broker message processed, remove from queue
_channel.BasicAck(ea.DeliveryTag, multiple: false);
}
catch (Exception ex)
{
var retryCount = GetRetryCount(ea.BasicProperties); // 0 first time
if (retryCount < 3) // MaxRetries
{
// publish to retry queue — waiting room
_channel.BasicPublish("", "demo.order-created.queue.retry", props, ea.Body);
}
else
{
// publish to DLQ — graveyard
_channel.BasicPublish("dlx.exchange", "demo.order-created.queue.dlq", props, ea.Body);
}
// ACK original — remove from main queue either way
_channel.BasicAck(ea.DeliveryTag, false);
}
};
Step 9 — SendAsync (Command)¶
Back in the controller, second call:
await _bus.SendAsync(
new ProcessInvoiceCommand(orderId, req.Total, req.CustomerName),
destination: "invoice-service"
);
public Task SendAsync<T>(T message, string destination) where T : Command
{
// routing key = destination name, NOT type name
_channel.BasicPublish(
exchange: "domain.commands", // direct exchange
routingKey: "invoice-service", // who should handle it
props,
body
);
}
domain.commands (Direct) receives message
routing key: "invoice-service"
Exact match only:
"invoice-service" → invoice-service queue ✓
nothing else matches
Delivered to invoice-service queue
ProcessInvoiceConsumer picks it up
ProcessInvoiceHandler.HandleAsync runs
logs: "🧾 Invoice processing..."
The Complete Picture¶
App startup:
DI creates RabbitMqConnection → one TCP connection established
DI creates RabbitMqMessageBus → publisher channel opened, exchanges declared
BackgroundService starts → consumer channel opened
queues declared and bound
consumer registered with broker
HTTP request arrives:
Controller calls PublishAsync → envelope built, BasicPublish to domain.events
Broker routes → matched queues receive copies
Controller calls SendAsync → envelope built, BasicPublish to domain.commands
Broker routes → exact match queue receives it
Controller returns 200 → done, doesn't wait for consumers
Consumer side (independent):
Broker pushes message → Received event fires
Deserialize envelope → get MessageEnvelope
Deserialize payload → get OrderCreatedEvent
Call handler → business logic runs
Success → BasicAck → message deleted from queue
Failure → retry count check
< max → publish to retry queue → TTL → dead letter back → retry
>= max → publish to DLQ → sits for inspection
Always ACK original → remove from main queue
What Lives Where¶
TCP connection → RabbitMqConnection (singleton)
Publisher channel → RabbitMqMessageBus (singleton, owns one channel)
Consumer channel → RabbitMqConsumer<T> (one per BackgroundService)
Exchange declaration → RabbitMqMessageBus constructor (publisher owns exchanges)
Queue declaration → RabbitMqConsumer<T>.ExecuteAsync (consumer owns queues)
Business logic → IMessageHandler<T> implementation
Retry/DLQ routing → RabbitMqConsumer<T> catch block
Everything has a single clear owner. Nothing is shared that shouldn't be. That's the entire flow.