Skip to content

The search box knows all the secrets -- try it!

Polecat is part of the Critter Stack ecosystem.

JasperFx Logo JasperFx provides formal support for Polecat and other Critter Stack libraries. Please check our Support Plans for more details.

Dynamic Consistency Boundary (DCB)

The Dynamic Consistency Boundary (DCB) pattern allows you to query and enforce consistency across events from multiple streams using tags -- strong-typed identifiers attached to events at append time. This is useful when your consistency boundary doesn't align with a single event stream.

Concept

In traditional event sourcing, consistency is enforced per-stream using optimistic concurrency on the stream version. DCB extends this by letting you:

  1. Tag events with one or more strong-typed identifiers
  2. Query events across streams by those tags
  3. Aggregate tagged events into a view (like a live aggregation, but cross-stream)
  4. Enforce consistency at save time -- detecting if new matching events were appended since you last read

Polecat uses a single append strategy (direct INSERT with OUTPUT inserted.seq_id), so DCB tags are always persisted immediately after each event insert. There are no separate append modes to configure -- tags just work.

Registering Tag Types

Tag types are strong-typed identifiers (typically record types wrapping a primitive). Register them during store configuration:

cs
public override async Task InitializeAsync()
{
    await StoreOptions(opts =>
    {
        // Register tag types -- each gets its own table (pc_event_tag_student, pc_event_tag_course)
        opts.Events.RegisterTagType<StudentId>("student")
            .ForAggregate<StudentCourseEnrollment>();
        opts.Events.RegisterTagType<CourseId>("course")
            .ForAggregate<StudentCourseEnrollment>();
    });
}

snippet source | anchor

Each tag type gets its own table (pc_event_tag_student, pc_event_tag_course, etc.) with a composite primary key of (value, seq_id).

Tag Type Requirements

Tag types should be simple wrapper records around a primitive value:

cs
// Strong-typed tag identifiers
public record StudentId(Guid Value);
public record CourseId(Guid Value);

snippet source | anchor

Supported inner value types: Guid, string, int, long, short.

Tagging Events

Use BuildEvent and WithTag to attach tags before appending:

cs
var enrolled = theSession.Events.BuildEvent(new StudentEnrolled("Alice", "Math"));
enrolled.WithTag(studentId, courseId);
theSession.Events.Append(streamId, enrolled);
await theSession.SaveChangesAsync();

snippet source | anchor

Events can have multiple tags of different types. Tags are persisted to their respective tag tables in the same transaction as the event.

Querying Events by Tags

Use EventTagQuery to build a query, then execute it with QueryByTagsAsync:

cs
var query = new EventTagQuery().Or<StudentId>(studentId);
var events = await session2.Events.QueryByTagsAsync(query);

snippet source | anchor

Multiple Tags (OR)

cs
var query = new EventTagQuery()
    .Or<StudentId>(student1)
    .Or<StudentId>(student2);

var events = await session2.Events.QueryByTagsAsync(query);

snippet source | anchor

Filtering by Event Type

cs
var query = new EventTagQuery()
    .Or<AssignmentSubmitted, StudentId>(studentId);

var events = await session2.Events.QueryByTagsAsync(query);

snippet source | anchor

Events are always returned ordered by sequence number (global append order).

Aggregating by Tags

Build an aggregate from tagged events, similar to AggregateStreamAsync but across streams. First define an aggregate that applies the tagged events:

cs
// Aggregate for DCB
public class StudentCourseEnrollment
{
    public Guid Id { get; set; }
    public string StudentName { get; set; } = "";
    public string CourseName { get; set; } = "";
    public List<string> Assignments { get; set; } = new();
    public bool IsDropped { get; set; }

    public void Apply(StudentEnrolled e)
    {
        StudentName = e.StudentName;
        CourseName = e.CourseName;
    }

    public void Apply(AssignmentSubmitted e)
    {
        Assignments.Add(e.AssignmentName);
    }

    public void Apply(StudentDropped e)
    {
        IsDropped = true;
    }
}

snippet source | anchor

Then aggregate across streams by tag query:

cs
var query = new EventTagQuery()
    .Or<StudentId>(studentId)
    .Or<CourseId>(courseId);

var aggregate = await session2.Events.AggregateByTagsAsync<StudentCourseEnrollment>(query);

snippet source | anchor

Returns null if no matching events are found.

Fetch for Writing (Consistency Boundary)

FetchForWritingByTags loads the aggregate and establishes a consistency boundary. At SaveChangesAsync time, Polecat checks whether any new events matching the query have been appended since the read, throwing DcbConcurrencyException if so:

cs
await using var session2 = theStore.LightweightSession();
var query = new EventTagQuery().Or<StudentId>(studentId);
var boundary = await session2.Events.FetchForWritingByTags<StudentCourseEnrollment>(query);

// Read current state
var aggregate = boundary.Aggregate; // may be null if no events yet
var lastSequence = boundary.LastSeenSequence;

// Append via boundary
var assignment = session2.Events.BuildEvent(new AssignmentSubmitted("HW1", 95));
assignment.WithTag(studentId, courseId);
boundary.AppendOne(assignment);

// Save -- will throw DcbConcurrencyException if another session
// appended matching events after our read
await session2.SaveChangesAsync();

snippet source | anchor

Handling Concurrency Violations

cs
try
{
    await session1.SaveChangesAsync();
}
catch (AggregateException ex) when (ex.InnerExceptions.OfType<DcbConcurrencyException>().Any())
{
    // Reload and retry -- the boundary's tag query had new matching events
    var violation = ex.InnerExceptions.OfType<DcbConcurrencyException>().First();
    // violation.Query -- the original tag query
    // violation.LastSeenSequence -- the sequence at time of read
}

snippet source | anchor

TIP

The consistency check only detects events that match the same tag query. Events appended to unrelated tags or streams will not cause a violation.

Checking Event Existence

If you only need to know whether any events matching a tag query exist -- without loading or deserializing them -- use EventsExistAsync. This is a lightweight existence check that avoids the overhead of fetching and materializing event data:

cs
[Fact]
public async Task events_exist_returns_true_when_matching_events_found()
{
    var studentId = new StudentId(Guid.NewGuid());
    var courseId = new CourseId(Guid.NewGuid());
    var streamId = Guid.NewGuid();

    var enrolled = theSession.Events.BuildEvent(new StudentEnrolled("Alice", "Math"));
    enrolled.WithTag(studentId, courseId);
    theSession.Events.Append(streamId, enrolled);
    await theSession.SaveChangesAsync();

    // Check existence -- lightweight, no event loading
    await using var session2 = theStore.LightweightSession();
    var query = new EventTagQuery().Or<StudentId>(studentId);
    var exists = await session2.Events.EventsExistAsync(query);
    exists.ShouldBeTrue();
}

snippet source | anchor

This is useful for guard clauses and validation logic in DCB workflows where you need to check preconditions before appending new events.

EventsExistAsync is also available in batch queries via batch.EventsExist(query).

How It Works

Storage

Each registered tag type creates a table:

sql
CREATE TABLE [dbo].[pc_event_tag_student] (
    value uniqueidentifier NOT NULL,
    seq_id bigint NOT NULL,
    CONSTRAINT pk_pc_event_tag_student PRIMARY KEY (value, seq_id),
    CONSTRAINT fk_pc_event_tag_student_events
        FOREIGN KEY (seq_id) REFERENCES [dbo].[pc_events](seq_id) ON DELETE CASCADE
);

Consistency Check

At SaveChangesAsync time, Polecat executes an EXISTS query checking for new events matching the tag query with seq_id > lastSeenSequence. This runs in the same transaction as the event appends, providing serializable consistency for the tagged boundary.

Tag Routing

Events appended via IEventBoundary.AppendOne() are automatically routed to streams based on their tags. Each tag value becomes the stream identity, so events with the same tag value end up in the same stream.

Released under the MIT License.