functional event-sourcing – let’s playback with fun

If you had any exposure to the basics of functional programming before you will at once see folds when you look at the playback-side of the event-sourcing pattern. In this post I will explore this idea a bit.

Image by 7263255

Image by 7263255

being basic

Let’s think about what the most basic event-recorder should do for us on the playback side.

Obvious it should play back the events. But what does that mean? Well on first approximation you could say it should give you a list or a sequence (or whatever) of all the events it recorded.


remark

Aside from the record side I will ignore here all the aggregate/event-ID, metadata and event-version stuff around event-sourcing. You can always add it!

For example you can add a Event-Store abstraction. You than ask this store to give you back a playback for a given aggregate-ID in a given event-version range and you are right back in our discussion.


Of course our playback should be type-safe and only return events of a given type, so the first draft for an interface could look like this:

type IPlayback<'ev> =
    abstract Playback : unit -> 'ev seq

Now this perfectly fine. Yeah it’s surely not pure (well you could add a real-world parameter or wrap it into a monad if you like) but I think our stores will never be – so let’s forget about this at the moment.

What I don’t like to much is that it forces the sequence on us – let’s abstract it away.

abstracting on need

Now a sequence is one of the most basic collection you can have and I don’t want to abstract the collection – I want to abstract based on the things we are going to do with the events once we got them.

Let’s think about this a bit.

When you look at the use cases you will always end up aggregating the events in some way. You might look for a certain event, sum up values or whatever. If you saw examples in modern C# then chances are high that you used some kind of LINQ query or operators (extension methods to IEnumerable<T>) to do it – if not then there will surely be a for/foreach loop around.

The idea is this: if you loop over the events anyway than why repeat this stuff every time? Just let the playback do this for us. We just have to find the common pattern.

folding

photo by Jim Bauer

image by Jim Bauer


I’ll assume that you never heard of folds (other names might be aggregate, reduce, catamorphisms, …) before and I’ll try to give you the idea here. If you are comfortable with this just skip this section.

It’s easier to see if we are using Lists, so let’s look at a few patterns:

Let’s say you want to sum up a list of integers

let rec sum xs =
    match xs with
    | []      -> 0
    | (x::xs) -> x + sum xs

or you want to map a list of values using f:

let rec map f xs =
    match xs with
    | []      -> []
    | (x::xs) -> f x :: map f xs

or you want to find the last value having some property p:

let rec find p xs =
    match xs with
    | []      -> None
    | (x::xs) -> if p x then Some x else find p xs

Now we can already see some kind of pattern: the same match is everywhere. The empty case always returns some value and the non-empty cases seem all to be connected too somehow. Let’s refactor the last a bit:

let rec sum xs =
    ...
    | (x::xs) -> let sum' = sum xs
                 x + sum'

let rec map f xs =
    ... 
    | (x::xs) -> let map' = map f xs
                 f x :: map'
let rec find p xs =
    ... 
    | (x::xs) -> let find' = find p x
                 if p x then Some x else find'

Ok seems we are always using the recursive all somewhere (the last example might give you some doubt: obvious this is now worse than before – but let’s not concentrate on performance for now). Let’s see what we can do with the last part – can we abstract this somehow?

let rec sum xs =
    ...
    | (x::xs) -> let sum' = sum xs
                 (fun y s -> y + s) x sum'

let rec map f xs =
    ... 
    | (x::xs) -> let map' = map f xs
                 (fun y ys -> (f y) :: ys) x xs
let rec find p xs =
    ... 
    | (x::xs) -> 
        let find' = find p x
        (fun y o -> if p y then Some y else o) x find'

Seems like we can use a function that takes x (the current element) and the recursive rest every single time. So let’s give the final form (remembering the empty case):

let rec foldR f e xs = 
    match xs with
    | []      -> e
    | (x::xs) -> f x (foldR f e xs)

let sum xs = 
    foldR (+) 0 xs

let map f xs = 
    foldR (fun y ys -> f y::ys) [] xs

let find p xs = 
    foldR (fun y o -> if p y then Some y else o) None xs

We just re-discovered the right-fold.

remark

Just in case you are wondering – we can get just the list back too if we want (might come handy if you need the events themselves):

let items xs =
    foldR (fun y ys -> y::ys) [] xs

Indeed you can remember the inner workings of foldR like this:

foldR f e xs takes a list in the form xs = a::b::c:: .. ::[] and replaces [] with e and each :: with an inline ‘f’ (where a ‘f’ b = f a b).

Sadly you cannot really write this in F# – you have to use parentheses but for an operator it works:

foldR (+) 0 (1::2::3::[]) = 1 + 2 + 3 + 0

weaving it in

So instead of returning the events in some sort of collection we are going to let the playback to the fold for us – we don’t care how this is done. Therefore the interface could now look like this:

type IPlayback<'ev> =
    abstract Playback : ('ev -> 'a -> 'a) -> 'a -> 'a

This is basically foldR with a different name and without giving the list – the collection is hidden away from us.

While this is not so bad I think most of us would prefer it a bit more readable, so let’s wrap the parameters into a record named Projection:

type Projection<'ev, 'out> =
    { fold  : 'ev -> 'out -> 'out
    ; empty : 'out
    }

type IPlayback<'ev> =
    abstract Playback : Projection<'ev, 'out> -> 'out

wait … this should be a Functor

If you use this for a while you will probably see that you tend to map the final result with another function (let’s call it projection) – especially if you had complicated inner states threaded through (you will see this below with the visited projection for a cargo).
And of course it would really be nice if we could make a Projection into a Functor. Well if you take a high-level peek at a Projection<'ev, 'out> you might see that this is move or less something like a mapping from a collection of events to an output-value. And of course we already know how to make something like Func<'ev seq, 'out> into a functor – we just have to compose the outcome:

let fmap (f : 'a -> b') (arrow: 'source -> 'a) =
    fun (s : 'source) -> arrow s |> f

which is of course the same as

let fmap f arrow = arrow >> f

By the way the object-part of the functor maps of course 'source -> 'a into 'source -> 'b.

Now let’s to the same with our Projections. Ok – it’s not as easy, because we cannot just change the target type of the fold part and empty is only used as input.

The solution to both our issues is just to add a final projection and have internal state:

type Projection<'ev, 'state, 'out> =
    { fold       : 'ev -> 'state -> 'state
    ; projection : 'state -> 'out
    ; empty      : 'state
    }

Of course a IPlayback implementation now has to handle the final projection as well (just as we had to do ourselves before).
Now we can just use the projection component just as we did the arrow above and introduce a functor-map (let’s just call it map):

let map (f : 'outA -> 'outB) 
        (p : Projection<'ev, _, 'outA>) 
        : Projection<'ev, _, 'outB> =
    { fold       = p.foldState 
    ; projection = p.projection >> f
    ; empty      = p.empty
    }
let (<%>) = map

As you can see it’s really no big deal just make sure that the event-type of the projections match, the internal states we don’t care (and luckily don’t have to – just use _).

Aside from this I added an operator (<%>) to make things easier (you cannot define <$> in F# so there is hope that one day we can have inbuild functor support!).

Before we have a look at an example I want to introduce two more helper functions I gonna need:

  • aggregate to help us create basic projections only with the fold part
  • and latest to filter/map for the most-recent event having some information (you indicate this with an option)
let aggregate (init : 'state) 
              (ag : 'ev -> 'state -> 'state) 
              : Projection<'ev, _, 'state> =
    { fold       = ag
    ; projection = id
    ; empty      = init
    }

let latest (f : 'ev -> 'a option) 
           : Projection<'ev,_,'a option> =
    aggregate None (fun ev lt -> (f ev <|> lt))

where (<|>) is the orElse-operator I use quite often:

let (<|>) (a : 'a option) (b : 'a option) =
    match (a,b) with
    | (None,   _) -> b
    | (Some _, _) -> a

By the way the latest operation suggest that the order in which the IPlayback should present the events to a projections fold should be from new to old (like [5;4;3;2;1]). This works nicely if you simulate an such things with an list, because recording of an event is just the cons operation.

an example

I first came upon event-sourcing in this article: Event Sourcing by Martin Fowler, so let’s just go with his example of tracking ships.

You can find the complete example in this Gist.

Let’s start with the models for the data:

type Country =
    | US
    | CANADA

type Cargo = Cargo of string
type Port  = Port of string * Country
type Ship  = Ship of string

let countryOfPort (Port (_, c)) = c

type Event =
    | Arrived  of Ship * Port
    | Departed of Ship
    | Loaded   of Ship * Cargo
    | Unloaded of Ship * Cargo

It’s just enough there to support the tests below but still – no need for the originals class explosion – which is a plus as far as I am concerned.

Here are the test I want to support – these are straight translations from tests found in the article:

let refact = Cargo "Refactoring"
let kr     = Ship "King Roy"
let sfo    = Port ("San Fransisco", US)
let la     = Port ("Los Angeles", US);
let yyv    = Port ("Vancouver", CANADA)

let ``Arrival marks Ships location``() =
    let playback =
        theShip kr
        |> arrivedIn sfo
        |> getPlayback
            
    let location = playback <| locationOf kr

    Assert.Equal(AtPort sfo, location)

let ``Departure puts Ship out to Sea``() =
    let playback =
        theShip kr
        |> arrivedIn la
        |> arrivedIn sfo
        |> departed
        |> getPlayback
            
    let location = playback <| locationOf kr

    Assert.Equal(AtSea, location)

let ``Visiting Canada marks Cargo``() =
    let playback =
        theShip kr
        |> loaded refact
        |> arrivedIn yyv
        |> departed
        |> arrivedIn sfo
        |> unloaded refact
        |> getPlayback

    let refactWasInCanada = 
        containerHasBeenIn refact CANADA 
        |> playback

    Assert.True refactWasInCanada

I left the helper functions out – you can find them in the gist.

What’s left is to define the projections for our playback, which is really easy for the location:

type Location =
    | AtPort of Port
    | AtSea
    | Unknown

let locationOf (ship : Ship) =
    (function None -> Unknown | Some l -> l) <%>
    latest (function
        | Arrived (s,p) when s = ship -> Some (AtPort p)
        | Departed s    when s = ship -> Some AtSea
        | _                           -> None)

As you can see it will look for the latest Arrival or Depature of the ship and returns Unknown (using map) if there is none such event for the ship.

To determine if a certain cargo was in a country we first filter out all visited countries:

let visited (cargo : Cargo) =
    fst <%> aggregate 
        (Set.empty, None) 
        (fun ev (vs, onShip) ->
            match ev with
            | Arrived (ship, port) 
                when Some ship = onShip ->
                    (Set.add port vs, onShip)
            | Loaded (ship, c)
                when c = cargo ->
                    (vs, Some ship)
            | Unloaded (_, c)
                when c = cargo ->
                    (vs, None)
            | _ -> (vs, onShip))

This keeps book of the set of visited ports and the ship the cargo is on (or None if it is currently unloaded) and finally returns just the visited ports.
With this it’s easy to see if a cargo has visited a country – just use Sets exit function:

let containerHasBeenIn (cargo : Cargo) (country : Country) =
    Set.exists (fun port -> countryOfPort port = country)
    <%> visited cargo

That’s all!

So if we want to have some fun and ask for a cargos location too:

let ``Arrival with cargo marks the cargos location``() =
    let playback =
        theShip kr
        |> loaded refact
        |> arrivedIn yyv
        |> getPlayback

    let location = 
        locationOfCargo refact
        |> playback

    Assert.Equal (yyv, location)

we can refactor the visited and containerHasBeenIn projections just a bit:

let portHistory (cargo : Cargo) =
    aggregate 
        ([], None) 
        (fun ev (vs, onShip) ->
            match ev with
            | Arrived (ship, port) 
                when Some ship = onShip ->
                    (port::vs, onShip)
            | Loaded (ship, c)
                when c = cargo ->
                    (vs, Some ship)
            | Unloaded (_, c)
                when c = cargo ->
                    (vs, None)
            | _ -> (vs, onShip))
    |> map fst

let containerHasBeenIn (cargo : Cargo) (country : Country) =
    List.exists (fun port -> countryOfPort port = country)
    <%> portHistory cargo

As you can see I just renamed the function and switched the set for a list.

Now you get it for free:

let locationOfCargo (cargo : Cargo) =
    List.head
    <%> portHistory cargo

Quite nice I think.

Try to do this in the OOP version without copy&pasting 😉

conclusion

That’s it for today.

In the next installments of this small series I want to give you an idea of how we could turn Projections into small applicative building blocks and how we could try to be more lazy with stuff.