Testing
The Ratatoskr.Testing package provides pipeline observability, parallel test isolation via W3C trace IDs, and ergonomic assertion APIs. It has zero overhead in production — the observer collection is empty when not registered.
Setup
Install the package and register it in your test setup:
dotnet add package Ratatoskr.Testing
services.AddRatatoskrTesting();
This registers a MessageTracker singleton that observes all message activities flowing through the pipeline.
Session-Based API
The primary API creates a MessageTrackingSession per test. Each session generates a unique W3C trace ID that correlates all messages published within its scope — enabling parallel test isolation.
[Test]
public async Task OrderPlaced_HandlerProcessesSuccessfully()
{
await using var session = Services.CreateTrackingSession();
await InScopeAsync(async ctx =>
{
var bus = ctx.ServiceProvider.GetRequiredService<IRatatoskr>();
await bus.PublishDirectAsync(new OrderPlaced(Guid.NewGuid(), "test@example.com", 99.99m));
});
var dispatched = await session.WaitForDispatched<OrderPlaced>();
dispatched.GetMessage<OrderPlaced>().CustomerEmail.Should().Be("test@example.com");
dispatched.Result.Should().Be(DispatchResult.Success);
}
Creating a Session
// From IServiceProvider
await using var session = services.CreateTrackingSession();
// From IHost
await using var session = host.CreateTrackingSession();
// With custom default timeout
await using var session = services.CreateTrackingSession(
defaultTimeout: TimeSpan.FromSeconds(30));
Waiting for Messages
Each wait method blocks until a matching message arrives or the timeout expires:
var published = await session.WaitForPublished<OrderPlaced>();
var sent = await session.WaitForSent<OrderPlaced>();
var received = await session.WaitForReceived<OrderPlaced>();
var dispatched = await session.WaitForDispatched<OrderPlaced>();
// With custom timeout
var dispatched = await session.WaitForDispatched<OrderPlaced>(
TimeSpan.FromSeconds(15));
// With predicate
var dispatched = await session.WaitForDispatched<OrderPlaced>(
predicate: m => m.GetMessage<OrderPlaced>().OrderId == orderId);
Tip
Prefer using the return value from WaitFor* directly over querying the collection afterward — this avoids race conditions where a later stage completes before an earlier stage's observer notification is recorded.
Pipeline Stages
The tracker captures messages at every stage:
| Stage | Description |
|---|---|
Published |
After each send attempt during PublishDirectAsync (includes TransportName and Exception) |
Sent |
After bytes are sent to the transport |
Received |
When the consumer receives a message from the transport |
Dispatched |
After handler invocation completes (includes result and exception) |
OutboxStaged |
When a message is serialized into an outbox entity during SaveChanges |
OutboxSent |
When the outbox processor sends a message to the transport |
InboxQueued |
When a message is accepted into the inbox |
InboxDispatched |
When an inbox handler delivery is attempted (success or failure) |
InboxPoisoned |
When an inbox handler status exceeds max retries |
Querying Messages
Each stage has a MessageCollection for querying:
var msg = session.Dispatched.Single<OrderPlaced>();
var msg = session.Dispatched.First<OrderPlaced>();
IReadOnlyList<TrackedMessage> all = session.Dispatched.All<OrderPlaced>();
int count = session.Published.Count;
Assertion Helpers
var msg = session.Dispatched.ShouldHaveMessage<OrderPlaced>();
session.Published.ShouldHaveNoMessage<OrderCancelled>();
TrackedMessage Properties
var msg = session.Dispatched.Single<OrderPlaced>();
OrderPlaced order = msg.GetMessage<OrderPlaced>(); // Deserialized message
msg.Properties.Type // "order.placed"
msg.Properties.Id // CloudEvents message ID
msg.Result // DispatchResult.Success (Dispatched stage only)
msg.Exception // null on success
byte[] raw = msg.RawBody; // Serialized bytes
msg.Stage // MessageStage.Dispatched
msg.TraceId // W3C trace ID
Action-Based API
For simpler scenarios, the action-based API wraps session creation and waiting:
[Test]
public async Task CreateOrder_PublishesEvent()
{
await using var session = await Services
.TrackActivity()
.Timeout(TimeSpan.FromSeconds(10))
.WaitForMessage<OrderPlaced>(MessageStage.Dispatched)
.ExecuteAndWaitAsync(async () =>
{
using var scope = Services.CreateScope();
var bus = scope.ServiceProvider.GetRequiredService<IRatatoskr>();
await bus.PublishDirectAsync(new OrderPlaced(Guid.NewGuid(), "test@example.com", 99.99m));
});
session.Dispatched.Single<OrderPlaced>()
.Result.Should().Be(DispatchResult.Success);
}
PublishAndWaitAsync
A convenience method that resolves IRatatoskr and publishes for you (always waits for Dispatched):
await using var session = await Services
.TrackActivity()
.PublishAndWaitAsync(new OrderPlaced(Guid.NewGuid(), "test@example.com", 99.99m));
session.Dispatched.ShouldHaveMessage<OrderPlaced>();
Note
PublishAndWaitAsync does not support custom WaitForMessage conditions. Use ExecuteAndWaitAsync when you need custom wait conditions.
Transport Wire Format Assertions
The TransportMessage property captures the transport-level wire representation at the Sent and Received stages:
var sent = await session.WaitForSent<OrderPlaced>();
var transport = sent.TransportMessage!;
// AMQP properties
transport.Headers["content-type"].Should().Be("application/json");
transport.Headers["type"].Should().Be("order.placed");
// CloudEvents AMQP headers (binary mode)
transport.Headers["cloudEvents_specversion"].Should().Be("1.0");
transport.Headers["cloudEvents_type"].Should().Be("order.placed");
// Raw wire body
var wireBody = Encoding.UTF8.GetString(transport.Body);
wireBody.Should().Contain("test@example.com");
// Routing metadata
transport.Metadata["exchange"].Should().Be("orders.events");
transport.Metadata["routing-key"].Should().Be("order.placed");
Outbox Testing
Verify the full outbox flow:
await using var session = Services.CreateTrackingSession();
await InScopeAsync(async ctx =>
{
var dbContext = ctx.ServiceProvider.GetRequiredService<OrderDbContext>();
dbContext.OutboxMessages.Add(new OrderPlaced(Guid.NewGuid(), "test@example.com", 99.99m));
await dbContext.SaveChangesAsync();
});
var dispatched = await session.WaitForDispatched<OrderPlaced>(
TimeSpan.FromSeconds(10));
dispatched.Result.Should().Be(DispatchResult.Success);
session.OutboxStaged.ShouldHaveMessage<OrderPlaced>();
Inbox Testing
Use WithoutBackgroundProcessing() for deterministic inbox testing:
services.AddRatatoskr(bus =>
{
bus.AddEfCoreDurability<OrderDbContext>(d =>
d.UseInbox(inbox => inbox.WithoutBackgroundProcessing()));
// ... channel configuration ...
});
// In the test body, trigger processing manually (use true to match production / recover stuck rows):
var processor = Services.GetRequiredService<InboxMessageProcessor<OrderDbContext>>();
await processor.ProcessBatchAsync(
includeStuckMessageDetection: true,
CancellationToken.None);
This gives you full control over when inbox processing runs, so you can inspect database state between acceptance and delivery. With includeStuckMessageDetection: false, only rows that have never been claimed (ProcessingStartedAt == null) are eligible; this avoids concurrent processors picking the same row while another worker is sending, but it also skips stuck recovery until you pass true or advance time.
Note
WithoutBackgroundProcessing() also disables the cleanup service.
Parallel Test Isolation
Each session creates a unique W3C trace ID. Messages published within a session inherit this trace ID through Activity.Current. Sessions filter by matching trace ID, so parallel tests never interfere with each other:
await using var session1 = Services.CreateTrackingSession();
// ... publish message A in session1's context ...
await using var session2 = Services.CreateTrackingSession();
// ... publish message B in session2's context ...
session1.Dispatched.Single<OrderPlaced>().GetMessage<OrderPlaced>().OrderId.Should().Be(idA);
session2.Dispatched.Single<OrderPlaced>().GetMessage<OrderPlaced>().OrderId.Should().Be(idB);
What's Next
- Observability — OpenTelemetry tracing and metrics setup
- Inbox — Understanding inbox processing for test scenarios
- Outbox — Understanding outbox processing for test scenarios