Command handling middleware for CQRS applications in Elixir.
Provides support for command registration and dispatch; delegation to aggregate roots; event handling; and long running process managers.
Use with eventstore and eventsourced as components that comprise a CQRS framework for Elixir.
MIT License
Getting started
If available in Hex, the package can be installed as:
- Add commanded to your list of dependencies in
mix.exs:
```elixir
def deps do
[{:commanded, "~> 0.1.0"}]
end
```
- Ensure commanded is started before your application:
```elixir
def application do
[applications: [:commanded]]
end
```
- Configure the
eventstorein each environment's mix config file (e.g.config/dev/exs), specifying usage of the JSON serializer:
```elixir
config :eventstore, EventStore.Storage,
serializer: Commanded.Event.JsonSerializer,
username: "postgres",
password: "postgres",
database: "eventstore_dev",
hostname: "localhost",
pool_size: 10
```
4. Create the `eventstore` database and tables using the `mix` task
```
mix event_store.create
```
Sample usage
Including commanded in the applications section of mix.exs will ensure it is started.
You may manually start the top level Supervisor process.
{:ok, _} = Commanded.Supervisor.start_link
Command handlers
Create a module per command, defining the fields with defstruct. A command must contain a field to uniquely identify the aggregate instance (e.g. account_number).
defmodule OpenAccount do defstruct [:account_number, :initial_balance] end
Implement the Commanded.Commands.Handler behaviour consisting of a single handle/2 function. It receives the aggregate root and the command to be handled. It must return the aggregate root.
defmodule OpenAccountHandler do @behaviour Commanded.Commands.Handler def handle(%BankAccount{} = aggregate, %OpenAccount{account_number: account_number, initial_balance: initial_balance}) do aggregate |> BankAccount.open_account(account_number, initial_balance) end end
Command dispatch and routing
Create a router to handle registration of each command to its associated handler. Configure each command, mapping it to its handler and aggregate root.
defmodule BankingRouter do use Commanded.Commands.Router dispatch OpenAccount, to: OpenAccountHandler, aggregate: BankAccount, identity: :account_number dispatch DepositMoney, to: DepositMoneyHandler, aggregate: BankAccount, identity: :account_number end
You can then dispatch a command using the router.
:ok = BankingRouter.dispatch(%OpenAccount{account_number: "ACC123", initial_balance: 1_000})
Event handlers
Create an event handler module which implements handle/1 for each event you are interested in.
Add a catch-all handle/1 function for all other events to ignore.
defmodule AccountBalanceHandler do def start_link do Agent.start_link(fn -> 0 end, name: __MODULE__) end def handle(%BankAccountOpened{initial_balance: initial_balance}) do Agent.update(__MODULE__, fn _ -> initial_balance end) end def handle(%MoneyDeposited{balance: balance}) do Agent.update(__MODULE__, fn _ -> balance end) end def handle(_) do # ignore all other events end def current_balance do Agent.get(__MODULE__, fn balance -> balance end) end end
Register the event handler with a given name. The name is used when subscribing to the event store to record the last seen event.
{:ok, _} = AccountBalanceHandler.start_link {:ok, _} = Commanded.Event.Handler.start_link("account_balance", AccountBalanceHandler)
Process managers
A process manager is responsible for communicating between one or more aggregate roots.
It handles events and may dispatch one or more commands in response. Process managers have state that can be used to track which aggregate roots are being coordinated.
A process manager must implement interested?/1 to indicate which events are used, and to route the event to an existing instance or start a new process. A handle/2 function must exist for each interested event. It receives the process manager's state and the event to be handled. It must return the state, including any commands that should be dispatched.
defmodule TransferMoneyProcessManager do defstruct commands: [], transfer_uuid: nil, source_account: nil, target_account: nil, amount: nil, status: nil def interested?(%MoneyTransferRequested{transfer_uuid: transfer_uuid}), do: {:start, transfer_uuid} def interested?(%MoneyWithdrawn{transfer_uuid: transfer_uuid}), do: {:continue, transfer_uuid} def interested?(%MoneyDeposited{transfer_uuid: transfer_uuid}), do: {:continue, transfer_uuid} def interested?(_event), do: false def new(process_uuid) do %TransferMoneyProcessManager{transfer_uuid: process_uuid} end def handle(%TransferMoneyProcessManager{transfer_uuid: transfer_uuid} = transfer, %MoneyTransferRequested{source_account: source_account, target_account: target_account, amount: amount}) do transfer = transfer |> dispatch(%WithdrawMoney{account_number: source_account, transfer_uuid: transfer_uuid, amount: amount}) %TransferMoneyProcessManager{transfer | source_account: source_account, target_account: target_account, amount: amount, status: :withdraw_money_from_source_account } end def handle(%TransferMoneyProcessManager{transfer_uuid: transfer_uuid} = transfer, %MoneyWithdrawn{} = _money_withdrawn) do transfer = transfer |> dispatch(%DepositMoney{account_number: transfer.target_account, transfer_uuid: transfer_uuid, amount: transfer.amount}) %TransferMoneyProcessManager{transfer | status: :deposit_money_in_target_account } end def handle(%TransferMoneyProcessManager{} = transfer, %MoneyDeposited{} = _money_deposited) do %TransferMoneyProcessManager{transfer | status: :transfer_complete } end def handle(_transfer, _event) do # ignore any other events end defp dispatch(%TransferMoneyProcessManager{commands: commands} = transfer, command) do %TransferMoneyProcessManager{transfer | commands: [command | commands] } end end
Register the process manager router, with a uniquely identified name. This is used when subscribing to events from the event store to track the last seen event and ensure they are only received once.
{:ok, _} = Commanded.ProcessManagers.Router.start_link("transfer_money_process_manager", TransferMoneyProcessManager)
Supervision
Use a supervisor to host your process managers and event handlers.
defmodule Bank.Supervisor do use Supervisor def start_link do Supervisor.start_link(__MODULE__, :ok) end def init(:ok) do children = [ supervisor(Commanded.Supervisor, []), # process manager worker(Commanded.ProcessManagers.Router, ["TransferMoneyProcessManager", TransferMoneyProcessManager, BankingRouter], id: :transfer_money_process_manager), # event handler worker(Commanded.Event.Handler, ["AccountBalanceHandler", AccountBalanceHandler]) ] supervise(children, strategy: :one_for_one) end end
Your application should include the supervisor as a worker.
defmodule Bank do use Application def start(_type, _args) do import Supervisor.Spec, warn: false children = [ worker(BankApp.Supervisor, []) ] opts = [strategy: :one_for_one, name: __MODULE__] Supervisor.start_link(children, opts) end end