Skip to content

[Proposal] Event Sourcing V2 #9951

@willg1983

Description

@willg1983

Motivation

There have been various discussions about improvements to Orleans event sourcing support (see footer). Orleans is naturally a good fit for event-sourced aggregates because grains provide a strong consistency boundary, single activation guarantees, in-memory state, sequential message processing and single-threaded execution that makes safe state mutation simpler.

Event sourcing is often combined with a CQRS architectural pattern, event sourced aggregates provide the 'Command' portion and projections provide the 'Query' capability through event sourced read-models.

The current Orleans event sourcing support is quite limited, lacks credible in-the-box storage providers and does not support event sourcing's super power: projections.

This proposal adds

  • a new event sourcing provider interface which is simple to implement.
  • An approach for grain event sourced state(s) that can be constructor injected into an event sourced grain and extension methods for writing events.
  • A new component for grain-based projections and side effects, allowing grains to aggregate events in an eventually consistent way.

Event Storage Provider Interface

A new interface for event storage providers that supports Read/Write/Truncate. Reads and writes include new standard event properties such as EventId, EventNumber, GlobalLogPosition and a Metadata dictionary alongside the event data payload. Truncation supports requirements around compaction and removal of event streams. This replaces the need to implement ICustomStorageInterface or ILogViewAdaptor.

It loses the concept of the provider monitoring and notifying of changes directly, instead relying on optimistic concurrency to detect changes on write. This suits the use case of a grain having its own dedicated event stream, and effectively being the single writer (transient duplicate grain activation handled by op concurrency). If there are multiple writers then a projection that notifies the grain of externally written changes is a better approach in my view.

There is no support for writing events unconditionally (without optimistic concurrency check), support could be added via an additional method on the interface.

public interface IGrainEventStorageProvider
{
    /// <summary>
    /// Reads all events for the given grain starting from the given event number, in ascending order.
    /// The event number is used as the starting point for reads, but does not guarantee the first event
    /// will be contigous as previously truncated events may mean the first event is greater than the given event number.
    /// The storage provider should return events with event numbers greater than the given event number.
    /// </summary>
    /// <param name="grainId">grain identity</param>
    /// <param name="exclusiveFrom">the event number from which to start reading (exclusive)</param>
    /// <param name="cancellationToken">token to cancel the read</param>
    public IAsyncEnumerable<EventRead> ReadEvents(GrainId grainId, GrainEventNumber exclusiveFrom, CancellationToken cancellationToken);

    /// <summary>
    /// Write one or more events to the grain's event stream atomically with optimistic concurrency control based on <paramref name="expectedEventNumber"/>.
    /// The write will only succeed if the current event number in storage matches the given event number, which represents the expected state of the stream at the time of write.
    /// If the write is successful, the storage provider will return the global position of the last written event, which can be used for reading from the stream or for projections.
    /// If the event number doesn't match, the storage provider should throw an InconsistentStateException to indicate a concurrency conflict.
    /// </summary>
    /// <param name="grainId">grain identity</param>
    /// <param name="expectedEventNumber">the expected event number of the stream at the time of write</param>
    /// <param name="events">the events to write</param>
    /// <param name="cancellationToken">token to cancel the write</param>
    /// <returns>The written position of the last event</returns>
    /// <exception cref="InconsistentStateException">if the event number doesn't match the expected value in storage</exception>
    public ValueTask<GlobalEventLogPosition> WriteEvents(GrainId grainId, GrainEventNumber expectedEventNumber, ReadOnlyMemory<EventWrite> events, CancellationToken cancellationToken);

    /// <summary>
    /// Removes all events prior to the given event number for the specified grain.
    /// This can be used for log compaction or truncation.
    /// If the given event number is greater than the current highest event number in storage, the storage provider should throw an ArgumentOutOfRangeException. 
    /// </summary>
    /// <param name="grainId">grain identity</param>
    /// <param name="inclusiveUpTo">the event number up to which all events will be removed (inclusive)</param>
    /// <param name="cancellationToken">token to cancel the delete</param>
    /// <returns></returns>
    /// <exception cref="ArgumentOutOfRangeException">if the given event number is greater than the current highest event number in storage</exception>"
    public ValueTask DeleteEvents(GrainId grainId, GrainEventNumber inclusiveUpTo, CancellationToken cancellationToken);
}

public readonly record struct EventRead(string EventType, // for deserialization
                                        BinaryData Data,
                                        GrainEventNumber EventNumber,
                                        GlobalEventLogPosition Sequence,
                                      //  DateTimeOffset WrittenTimestamp, // Bad idea: really just an infrastructure concern. If date is relevent to the event it should be captured in the event data
                                        Guid EventId,
                                        IReadOnlyDictionary<string, object> Metadata
                                        // Could include correlationId / causationId, or leave that to metadata
                                        );

public readonly record struct EventWrite(IReadOnlyDictionary<string, object> Metadata,
                                         string EventType, // for deserialization
                                         BinaryData Data,
                                         Guid EventId);

To enable translation from TEventBase to EventWrite, which is used to persist events and EventRead which is used to read events, a new interface will be added:

public interface IEventConverter 
    public DecodedEvent<TEventBase> DecodeEvent<TEventBase>(EventRead read) 
    public EventWrite EncodeEvent<TEventBase>(TEventBase @event) 
}

public readonly record struct DecodedEvent<TEventBase>(TEventBase Event, GrainEventNumber EventNumber) where TEventBase : class;

A default implementation will use IGrainStorageSerializer for serialization, it will populate the metadata with activity tracing information (if available, which can then be used to correlate the activity in projections/side-effects) we may place an interface constraint on TEvent to implement a Guid EventId property.

Constructor injected Grain Event Sourcing State(s)

Event sourced state read-models will be injected into event sourced grain's constructor.

public sealed class MyEventSourcedGrain(IEventSourcedState<MyReadModel> eventSourcedState) : Grain,
    IMyEventSourcedAggrgateGrain
{

    // Example of command pattern 
    public async ValueTask<EventSourcedCommandResult<TResult>> ExecuteCommand<TResult>(IEventSourcedGrainCommand<EventBase, TResult> command, CancellationToken token)
    {
        var result = command.Execute(eventSourcedState, this.GetPendingEventsWriter());
        var (globalLogPosition, eventNumber) = await this.CommitPendingEvents(token);
        return new(result, globalLogPosition, eventNumber);
    }

      // Query method etc
}

Each IEventSourcedState injected into the grain will be synchronously updated as events are written in the grain.

The IEventSourcedState interface will implement IAsyncEnumerable to provide observability of state changes.

This approach also allows a grain to host multiple state models (which could be useful for separating concerns, e.g. one model stores 'decider state' for command validation, one stores an audit history for the UI, one provides status). A built-in event capturing state model will, if used by the grain author, be able to dynamically create any state model on-demand and keep it updated.

public sealed class MyEventSourcedGrain(
IEventSourcedState<MyReadModel> eventSourcedState, 
IEventSourcedState<MyAuditHistory> auditHistory, 
IEventSourcedState<MyOrderStatusReadModel> customerOrderStatus) : Grain,
    IMyEventSourcedAggrgateGrain

Writing pending events and atomically committing all pending events to storage and refreshing state from storage will be extension methods of IGrainBase

It is expected IEventSourcedState models will be updated when a pending event is written, before the async persistence operation, to ensure state consistency for re-entrant operations. This risks phantom reads by concurrent operations on event write failure instead of stale reads all the time. The IAsyncEnumerable state model update yielding will happen after the write to ensure phantom state does not escape non-rentrant grains.

The grainId will be used for the event sourcing stream name, we could provide a grain attribute to configure the formatting.

We may want a mechanism for event type names that decouple them from their .net type names, perhaps adding a new EventAlias attribute.

These new event sourcing components will opt-in to grain migration if all the injected event sourced state models are serializable allowing them and pending events to be live migrated with the grain.

Event Sourced Projections and side-effects

A new interface that can be used to read grain events, allowing grains to be created that receive all matching historical catch-up events and live events from other grains.

The expectation is this will be implemented using the mechanism in the underlying event persistence provider, this ensures projections receive events as written in global event log order.

This proposal lays the ground work upon which more sophisticated abstractions can be built such as aprojection grain, that can host any event sourced read model in-memory or write to another storage mechanism.

This interface notifies of checkpoint opportunities.

 ///<summary>
/// Provides a mechanism to subscribe to events emitted by grains
/// </summary>
public interface IGrainEventProvider 
{
    /// <summary>
    /// Subscribes to a stream of events for a given subscription, with the event payload included in <see cref="GrainEvent{TEventBase}"/>
    /// </summary>
    /// <typeparam name="TEventBase">The type of events to subscribe to.</typeparam>
    /// <param name="subscriber">The subscriber for observability purposes.</param>
    /// <param name="startingPosition">The position in the event stream to start from.</param>
    /// <param name="eventFilter">An array of event types to filter the subscription.</param>
    /// <param name="cancellationToken">A token to cancel the subscription.</param>
    /// <returns>An asynchronous stream of <see cref="GrainEvent{TEventBase}"/>, <see cref="CaughtUp"/>, <see cref="FallenBehind"/> or <see cref="Checkpoint"/> objects.</returns>
    IAsyncEnumerable<EventStreamUpdate> SubscribeToGrainEvents<TEventBase>(GrainId subscriber, GlobalEventLogPosition startingPosition, Type[] eventFilter, CancellationToken cancellationToken) where TEventBase : notnull;

    /// <summary>
    /// Subscribes to all events from <typeparamref name="TGrain"/> with the event payload included in <see cref="GrainEvent{TEventBase}"/>
    /// </summary>
    /// <typeparam name="TGrain">The type of the grain whose events to subscribe to.</typeparam>
    /// <typeparam name="TEventBase">The base type of events to subscribe to.</typeparam>
    /// <param name="subscriber">The subscriber for observability purposes.</param>
    /// <param name="startingPosition">The position in the event stream to start from.</param>
    /// <param name="cancellationToken">A token to cancel the subscription.</param>
    /// <returns>An asynchronous stream of <see cref="GrainEvent{TEventBase}"/>, <see cref="CaughtUp"/>, <see cref="FallenBehind"/> or <see cref="Checkpoint"/> objects.</returns>
    IAsyncEnumerable<EventStreamUpdate> SubscribeToGrainEvents<TGrain, TEventBase>(GrainId subscriber, GlobalEventLogPosition startingPosition, CancellationToken cancellationToken) where TGrain : IGrain where TEventBase : notnull;
}



/// <summary>
/// Represents an update in the event stream, such as a <see cref="GrainEvent{T}"/> <see cref="Checkpoint"/>, <see cref="CaughtUp"/>, or <see cref="FallenBehind"/> notification.
/// This is the base type for all event stream update notifications returned by IGrainEventProvider 
/// </summary>
public abstract record EventStreamUpdate
{

    /// <summary>
    /// Indicates that the event stream subscription has caught up to the latest available event.
    /// Used to notify subscribers that no further historical events are pending.
    /// </summary>
    public sealed record CaughtUp : EventStreamUpdate
    {
        private CaughtUp() { }

        public static CaughtUp Instance { get; } = new();
    }

    /// <summary>
    /// Indicates that the event stream subscription has fallen behind and is no longer up-to-date.
    /// Used to notify subscribers that the stream is lagging.
    /// </summary>
    public sealed record FallenBehind : EventStreamUpdate
    {
        private FallenBehind() { }

        public static FallenBehind Instance { get; } = new();
    }

    /// <summary>
    /// Provides a regular checkpoint in the event stream when processing through events not relevent to the subscriber
    /// </summary>
    /// <param name="Position"></param>
    public record Checkpoint(GlobalEventLogPosition Position) : EventStreamUpdate;

    /// <summary>
    /// Provides a notification that a grain event occurred (without the payload)
    /// </summary>
    /// <param name="Position"></param>
    /// <param name="EventGrainId"></param>
    /// <param name="GrainEventNumber"></param>
    public record GrainEventNotification(GlobalEventLogPosition Position, GrainId EventGrainId, GrainEventNumber GrainEventNumber)
        : Checkpoint(Position);


    /// <summary>
    /// Represents an event emitted by a grain in the event stream.
    /// This record encapsulates the event data, its global position in the event log, the originating grain's identifier,
    /// and the version of the grain at the time the event was produced.
    /// </summary>
    /// <typeparam name="TEventBase">The base type of the event payload.</typeparam>
    /// <param name="Position">The global position of the event in the event log stream.</param>
    /// <param name="Event">The event payload emitted by the grain.</param>
    /// <param name="EventGrainId">The unique identifier of the grain that produced the event.</param>
    /// <param name="GrainEventNumber">The version of the grain at the time the event was generated.</param>
    public sealed record GrainEvent<TEventBase>(
        GlobalEventLogPosition Position,
        TEventBase Event,
        GrainId EventGrainId,
        GrainEventNumber GrainEventNumber) : GrainEventNotification(Position, EventGrainId, EventGrainVersion);
}

Improved developer experience

Polymorphic immutable state

Currently with JournaledGrain it's a pain to have a read-model that is immutable (each event applied produces a new read model) or state that is polymorphic (where an event can transition the state type) allowing a state machine to represent an aggregate's lifecycle e.g. Basket -> Processing-> Shipped -> Delivered-> Archived, these new components will enable both scenarios - but they are not mandatory.

Read-your-writes
Pending event write operations will return a GrainEventNumber and Commit operations will return a GlobalEventLogPosition which can be returned by the grain so callers can ensure 'read-your-write consistency' for projections shown in the UI.

Observability
The the code will be instrumented, support logging and activity tracing using the latest .net best practices.

POCO Grains
will be supported, no requirement to derive from JournaledGrain

Built-in event sourcing event storage providers
It would be useful to have some supported event storage providers, in addition to an in-memory provider for testing.

Initially a provider for Kurrent would be implemented as a reference
Ideally another provider would be available to ensure the abstractions are flexible enough - community suggestions welcomed.

Relationship to other parts of Orleans

Orleans.EventSourcing
Given the need to make breaking changes, backward compatibility with the existing event sourcing support in Orleans will be limited to developers being able to reuse their existing TLogView and TEventBase types (that they would have used with JournaledGrain) on the new event sourcing stack, there is no backward compatibility planned for ILogViewAdapter implementations. This proposal would add the new approach side-by-side with the existing event sourcing support in the same package.

Orleans.Journaling
Orleans.Journaling is optimised for high performance large-state storage involving multiple state machines with incremental atomic writes.

Projections often involve aggregating many events into large models, which if based on Orleans.Journaling would provide efficient storage of projection and side-effect state.

Streams
Streams (at least v1) do not provide a complete replay of all events from the beginning of time, they operate with the concept of a replay window, nor do they guarantee global ordering of received events, making them unsuitable for projections. It's not yet clear v2 addresses the above.

Related Issues

#8408
#8408
#486
#9734 (comment)
#9734 (comment)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions