# Change Data Capture with Broadway

```elixir
Mix.install([
  {:tigerbeetlex, "~> 0.16.78"},
  {:broadway, "~> 1.2"},
  {:broadway_rabbitmq, "~> 0.7"},
  {:jason, "~> 1.2"}
])
```

## Introduction

Since version 0.16.43, TigerBeetle can [stream changes](https://docs.tigerbeetle.com/operating/cdc/) (transfers and balance updates) on RabbitMQ. TigerBeetlex provides structs and functions to help decode the raw JSON data of CDC events into well-defined struct.

This guide shows how to use [Broadway](https://elixir-broadway.org/) and its [RabbitMQ connector](https://github.com/dashbitco/broadway_rabbitmq) to easily create a data pipeline that consumes TigerBeetle CDC data. This can be used as a starting point to build your own pipeline. The guide can also be executed in Livebook.

## Requirements

### RabbitMQ

The guide assumes you run RabbitMQ locally on port 5672 with default credentials (`guest:guest`). You can run an instance of RabbitMQ using Docker with this command:

```sh
docker run -it --rm --name rabbitmq -d -p 5672:5672 -p 15672:15672 rabbitmq:4-management
```

This also exposes the RabbitMQ management dashboard on `http://localhost:15672`.

There's no need to declare any exchange or queue since you pipeline will be responsible for its own setup.

## Setup

In your `mix.exs` add TigerBeetlex, Broadway and Jason as dependencies

```elixir
[
  {:tigerbeetlex, "~> 0.16.78"},
  {:broadway, "~> 1.2"},
  {:broadway_rabbitmq, "~> 0.7"},
  {:jason, "~> 1.2"}
]
```

## Building the pipeline

Here's the basic Broadway pipeline to consume TigerBeetle CDC data

```elixir
defmodule MyApp.CDCPipeline do
  use Broadway

  alias TigerBeetlex.CDC.Event
  alias Broadway.Message

  @exchange "tigerbeetle"
  @queue "tigerbeetle_cdc_broadway"

  def start_link(_opts \\ []) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module:
          {BroadwayRabbitMQ.Producer,
           connection: [
             host: "localhost",
             username: "guest",
             password: "guest",
           ],
           after_connect: &declare_rabbitmq_topology/1,
           queue: @queue,
           declare: [durable: true],
           bindings: [{@exchange, []}],
           on_failure: :reject_and_requeue}
      ],
      processors: [
        default: []
      ]
    )
  end

  defp declare_rabbitmq_topology(amqp_channel) do
    AMQP.Exchange.declare(amqp_channel, @exchange, :fanout, durable: true)
  end

  @impl true
  def handle_message(_processor, message, _context) do
    message
    |> Message.update_data(fn data ->
      # The message data is the JSON string representing the event.
      # We're going to decode it and then transform it in a `TigerBeetlex.CDC.Event` struct.
      data
      |> Jason.decode!()
      |> Event.cast!()
    end)
    |> process_message()
  end

  defp process_message(message) do
    # Here you would put your processing logic. We're just going to print the event.
    IO.inspect(message.data)

    # Note that `handle_message` must return the (possibly modified) message.
    message
  end
end
```

Here are some key highlights about the pipeline, feel free to consult the documentation of [`Broadway`](https://hexdocs.pm/broadway) and [`BroadwayRabbitMQ`](https://hexdocs.pm/broadway_rabbitmq) for more information:

* The `connection` key supports all options supported by [`AMQP.Connection.open`](https://hexdocs.pm/amqp/4.0.0/AMQP.Connection.html#open/2) to configure your RabbitMQ connection
* The pipeline declares a `tigerbeetle` fanout exchange and a `tigerbeetle_cdc_broadway` queue, bound to the exchange. This means that all messages sent to the exchange will be sent to the queue, regardless of the routing key
* Both the exchange and the queue are created with `durable: true`, meaning they survive RabbitMQ restarts. Declaring exchanges and queues is idempotent so it's ok for the pipeline to declare them again
* `on_failure` is set to `:reject_and_requeue`, which means that if a message fails to be processed it will be rejected and re-enqueued. The behavior you want to use here depends on your use case and your need to do strict in-order processing.

Once you have your pipeline defined, you can start it under your supervision tree. Here we're just going to start it manually.

```elixir
case MyApp.CDCPipeline.start_link() do
  {:ok, pid} -> {:ok, pid}
  # Since we're executing on Livebook, handle re-execution
  {:error, {:already_started, pid}} -> {:ok, pid}
  {:error, reason} -> {:error, reason}
end
```

## Start TigerBeetle CDC Job

To start TigerBeetle CDC job issue the command below.

> #### ⚠️ Big data ahead
>
> The first time that it's run, the CDC job will publish _all_ data in your cluster since the beginning of time.
>
> If you want to only start publising from the current date, you have to pass the `--timestamp-last` providing an appropriate epoch timestamp in nanoseconds.

```sh
./tigerbeetle amqp --addresses=127.0.0.1:3000 --cluster=0 \
    --host=127.0.0.1 \
    --vhost=/ \
    --user=guest --password=guest \
    --publish-exchange=tigerbeetle
```

This will connect to the TigerBeetle cluster pointed to by `addresses` and `cluster` and start publising CDC data on the `tigerbeetle` exchange. Check out the full documentation of the `tigerbeetle amqp` message [here](https://docs.tigerbeetle.com/operating/cdc/).

## Pushing some data

Let's create two accounts to transact with.

```elixir
alias TigerBeetlex.Account
alias TigerBeetlex.Connection
alias TigerBeetlex.ID

address = "127.0.0.1:3000"

{:ok, _pid} = Connection.start_link(name: :tb, cluster_id: <<0::128>>, addresses: [address])

account_1 =
  %Account{
    id: ID.from_int(42_000),
    ledger: 1,
    code: 2
  }

account_2 =
  %Account{
    id: ID.from_int(42_001),
    ledger: 1,
    code: 3
  }

{:ok, _account_errors} = Connection.create_accounts(:tb, [account_1, account_2])
# Error handling omitted
```

Now let's create a Transfer between the accounts.

```elixir
alias TigerBeetlex.Transfer

transfers = [
  %Transfer{
    id: ID.generate(),
    debit_account_id: ID.from_int(42_000),
    credit_account_id: ID.from_int(42_001),
    amount: 10_000,
    ledger: 1,
    code: 720
  }
]

{:ok, []} = Connection.create_transfers(:tb, transfers)
```

You should now see the print output from the pipeline (if you're executing this in Livebook, it's below the cell where you create the transfer).

And that's it, you are now consuming TigerBeetle CDC data! Be sure to read [`Broadway`](https://hexdocs.pm/broadway) and [`BroadwayRabbitMQ`](https://hexdocs.pm/broadway_rabbitmq) documentation and check out all their features (like, for example, [batching](https://hexdocs.pm/broadway/Broadway.html#module-batching)) to understand how to modify this basic pipeline to suit your usecase.
