Change data capture with Postgres & Elixir
- Simon Thörnqvist
- 13th Dec 2022
- 23 min of reading time
CDC is the process of identifying and capturing data changes from the database.
With CDC, changes to data can be tracked in near real-time, and that information can be used to support a variety of use cases, including auditing, replication, and synchronisation.
A good example of a use case for CDC is to consider an application which inserts a record into the database and pushes an event to a message queue after the record has been inserted (write-twice).
Imagine you’re working on an e-commerce application and after an order is created and inserted into the database, you push an OrderCreated event to a message queue. The consumers of the event might do things such as create pick orders for the warehouse, schedule transports for delivery and send an order confirmation email to the customer.
But what if the application crashes after the order has been inserted into the database but before managing to push the event to the message queue? This is possible due to the fact that you can’t atomically insert the record AND push the message in the same transaction, so if the application crashes after inserting the record to the database but before pushing the event to the queue, the event is lost.
There are of course workarounds to circumvent this: a simple solution is to “outbox” the event into an outbox table in the same transaction as writing the record, and then, rely on a CDC process to capture the change to the outbox table and push the event to the message queue. The transaction is atomic and the CDC process can assure the event
gets delivered to the message queue at-least-once.
In order to capture changes, CDC typically uses one of two methods: log-based or trigger-based.
Log-based CDC involves reading the transaction logs of the database to identify changed data, which is the method we’ll use here by utilising Postgres Logical Replication.
There are two modes of replication in Postgres:
1. Physical replication – Every change from the primary are streamed to replicas via the WAL (Write Ahead Log). This replication is performed byte-by-byte with exact block addresses.
2. Logical replication – In logical replication the subscriber receives each individual transactional change (i.e. INSERT, UPDATE, or DELETE statements) to the database.
The WAL is still streamed but it encodes the logical operations so that they can be decoded by the subscriber without having to know Postgres internals.
One of the great things about logical replication is that it can be used to only replicate specific tables or rows, meaning that you have complete control over what is being replicated.
To enable logical replication the wal_level need to be set:
-- determines how much information is written to the wal.
-- Each 'level' inherits the level below it; 'logical' is the highest level
ALTER SYSTEM SET wal_level=logical;
-- simultaneously running WAL sender processes
ALTER SYSTEM SET max_wal_senders='10';
-- simultaneously defined replication slots
ALTER SYSTEM SET max_replication_slots='10';
The changes require a restart to the Postgres instance.
After the system has been restarted the wal_level can be verified with:
SHOW wal_level;
wal_level
-----------
logical
(1 row)
In order to subscribe to changes a publication must be created. A publication is a group of tables in which we would like to receive data changes for.
Let’s create a simple table and define a publication for it:
CREATE TABLE articles (id serial PRIMARY KEY, title text, description text, body text);
CREATE PUBLICATION articles_pub FOR TABLE articles;
To tell postgres to retain WAL segments we must create a replication slot].
The replication slot represents a stream of changes from one or more publications and is used to prevent data loss in the event of a server failure, as they are crash safe.
In order to get a feel for the protocol and messages being sent we can use pg_recvlogicalto start a replication subscriber:
# Start and use the publication defined above
# output is written to stdout
pg_recvlogical --start \
--host='localhost' \
--port='5432' \
--username='postgres' \
--dbname='postgres' \
--option=publication_names='articles_pub' \
--option=proto_version=1 \
--create-slot \
--if-not-exists \
--slot=articles_slot \
--plugin=pgoutput \
--file=-
Insert a record:
INSERT INTO articles (title, description, body)
VALUES ('Postgres replication', 'Using logical replication', 'Foo bar baz');
Each row in the output corresponds to a replication messages received through the subscription:
B(egin) - Begin transaction
R(elation) - Table, schema, columns and their types
I(insert) - Data being inserted
C(ommit) - Commit transaction
B
Rarticlesdidtitledescriptionbody
It35tPostgres replicationtUsing logical replicationtFoo bar baz
C
If we insert multiple records in a transaction we should have two I in between B and C:
BEGIN;
INSERT INTO articles (title, description, body) VALUES ('First', 'desc', 'Foo');
INSERT INTO articles (title, description, body) VALUES ('Second', 'desc', 'Bar');
COMMIT;
And the ouput:
C
B
It37tFirsttdesctFoo
It38tSecondtdesctBar
C
The relation i.e table information was not transmitted since we already received the relation when inserting the first record.
Postgres only sends the relation the first time it’s encountered during the session. The subscriber is expected to cache a previously sent relation.
Now that we have a feel for how Logical replication works, let’s implement it in Elixir!
Create a new Elixir project:
mix new cdc
We’ll add the following dependencies to mix.exs
defp deps do
{:postgrex, "~> 0.16.4"},
# decode/encode replication messages
{:postgrex_pgoutput, "~> 0.1.0"}
end
postgrex supports replication through the Postgrex.ReplicationConnection process.
defmodule CDC.Replication do
use Postgrex.ReplicationConnection
require Logger
defstruct [
:publications,
:slot,
:state
]
def start_link(opts) do
conn_opts = [auto_reconnect: true]
publications = opts[:publications] || raise ArgumentError, message: "`:publications` is required"
slot = opts[:slot] || raise ArgumentError, message: "`:slot` is required"
Postgrex.ReplicationConnection.start_link(
__MODULE__,
{slot, publications},
conn_opts ++ opts
)
end
@impl true
def init({slot, pubs}) do
{:ok, %__MODULE__{slot: slot, publications: pubs}}
end
@impl true
def handle_connect(%__MODULE__{slot: slot} = state) do
query = "CREATE_REPLICATION_SLOT #{slot} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"
Logger.debug("[create slot] query=#{query}")
{:query, query, %{state | state: :create_slot}}
end
@impl true
def handle_result([%Postgrex.Result{} | _], %__MODULE__{state: :create_slot, publications: pubs, slot: slot} = state) do
opts = [proto_version: 1, publication_names: pubs]
query = "START_REPLICATION SLOT #{slot} LOGICAL 0/0 #{escape_options(opts)}"
Logger.debug("[start streaming] query=#{query}")
{:stream, query, [], %{state | state: :streaming}}
end
@impl true
def handle_data(msg, state) do
Logger.debug("Received msg=#{inspect(msg, limit: :infinity, pretty: true)}")
{:noreply, [], state}
end
defp escape_options([]),
do: ""
defp escape_options(opts) do
parts =
Enum.map_intersperse(opts, ", ", fn {k, v} -> [Atom.to_string(k), ?\s, escape_string(v)] end)
[?\s, ?(, parts, ?)]
end
defp escape_string(value) do
[?', :binary.replace(to_string(value), "'", "''", [:global]), ?']
end
end
The code is available on GitHub
Let’s try it out:
opts = [
slot: "articles_slot_elixir",
publications: ["articles_pub"],
host: "localhost",
database: "postgres",
username: "postgres",
password: "postgres",
port: 5432,
]
CDC.Replication.start_link(opts)
When we start the process the following is happening:
1. Once we are connected to postgres the callback handle_connect/1 is called, a temporary logical replication slot is created.
2. handle_result/2 is called with the result from the query in ‘1’. If the slot was created successfully we start streaming from the slot and go into streaming mode. The requested position ‘0/0’ means that Postgres picks the position.
3. Any replication messages sent from postgres are received in the handle_data/2 callback.
There are two types of messages a subscriber receives:
1. primary_keep_alive – A checkin message, if reply == 1 the subscriber is expected to reply to the message with a standy_status_update to avoid a timeout disconnect.
The standy_status_update contains the current LSN the subscriber has processed.
Postgres uses this message to determine which WAL segments can be safely removed.
2. xlog_data – Contains the data messages for each step in a transaction.
Since we are not responding to the primary_keep_alive messages the process gets disconnected and restarts
.
Let’s fix it by decoding the messages and start replying with standby_status_update messages.
defmodule CDC.Replication do
use Postgrex.ReplicationConnection
require Postgrex.PgOutput.Messages
alias Postgrex.PgOutput.{Messages, Lsn}
require Logger
defstruct [
:publications,
:slot,
:state
]
def start_link(opts) do
conn_opts = [auto_reconnect: true]
publications = opts[:publications] || raise ArgumentError, message: "`:publications` is required"
slot = opts[:slot] || raise ArgumentError, message: "`:slot` is required"
Postgrex.ReplicationConnection.start_link(
__MODULE__,
{slot, publications},
conn_opts ++ opts
)
end
@impl true
def init({slot, pubs}) do
{:ok, %__MODULE__{slot: slot, publications: pubs}}
end
@impl true
def handle_connect(%__MODULE__{slot: slot} = state) do
query = "CREATE_REPLICATION_SLOT #{slot} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"
Logger.debug("[create slot] query=#{query}")
{:query, query, %{state | state: :create_slot}}
end
@impl true
def handle_result(
[%Postgrex.Result{} | _],
%__MODULE__{state: :create_slot, publications: pubs, slot: slot} = state
) do
opts = [proto_version: 1, publication_names: pubs]
query = "START_REPLICATION SLOT #{slot} LOGICAL 0/0 #{escape_options(opts)}"
Logger.debug("[start streaming] query=#{query}")
{:stream, query, [], %{state | state: :streaming}}
end
@impl true
def handle_data(msg, state) do
return_msgs =
msg
|> Messages.decode()
|> handle_msg()
{:noreply, return_msgs, state}
end
#
defp handle_msg(Messages.msg_primary_keep_alive(server_wal: lsn, reply: 1)) do
Logger.debug("msg_primary_keep_alive message reply=true")
<<lsn::64>> = Lsn.encode(lsn)
[standby_status_update(lsn)]
end
defp handle_msg(Messages.msg_primary_keep_alive(reply: 0)), do: []
defp handle_msg(Messages.msg_xlog_data(data: data)) do
Logger.debug("xlog_data message: #{inspect(data, pretty: true)}")
[]
end
defp standby_status_update(lsn) do
[
wal_recv: lsn + 1,
wal_flush: lsn + 1,
wal_apply: lsn + 1,
system_clock: Messages.now(),
reply: 0
]
|> Messages.msg_standby_status_update()
|> Messages.encode()
end
defp escape_options([]),
do: ""
defp escape_options(opts) do
parts =
Enum.map_intersperse(opts, ", ", fn {k, v} -> [Atom.to_string(k), ?\s, escape_string(v)] end)
[?\s, ?(, parts, ?)]
end
defp escape_string(value) do
[?', :binary.replace(to_string(value), "'", "''", [:global]), ?']
end
end
handle_data/2 decodes the message and passes it to handle_msg/1. If it’s a primary_keep_alive we respond with a standby_status_update.
The LSN denotes a byte position in the WAL.
The subscriber responds with the LSN it has currently handled, since we are not tracking the messages we receive, we just ack with the LSN sent from the server.
Next we’ll handle xlog_data messages, the idea here is that we’ll capture each operation into a Transaction struct.
The CDC.Protocol module will handle xlog_data messages and track the state of the transaction.
defmodule CDC.Protocol do
import Postgrex.PgOutput.Messages
require Logger
alias CDC.Tx
alias Postgrex.PgOutput.Lsn
@type t :: %__MODULE__{
tx: Tx.t(),
relations: map()
}
defstruct [
:tx,
relations: %{}
]
@spec new() :: t()
def new do
%__MODULE__{}
end
def handle_message(msg, state) when is_binary(msg) do
msg
|> decode()
|> handle_message(state)
end
def handle_message(msg_primary_keep_alive(reply: 0), state), do: {[], nil, state}
def handle_message(msg_primary_keep_alive(server_wal: lsn, reply: 1), state) do
Logger.debug("msg_primary_keep_alive message reply=true")
<<lsn::64>> = Lsn.encode(lsn)
{[standby_status_update(lsn)], nil, state}
end
def handle_message(msg, %__MODULE__{tx: nil, relations: relations} = state) do
tx =
[relations: relations, decode: true]
|> Tx.new()
|> Tx.build(msg)
{[], nil, %{state | tx: tx}}
end
def handle_message(msg, %__MODULE__{tx: tx} = state) do
case Tx.build(tx, msg) do
%Tx{state: :commit, relations: relations} ->
tx = Tx.finalize(tx)
relations = Map.merge(state.relations, relations)
{[], tx, %{state | tx: nil, relations: relations}}
tx ->
{[], nil, %{state | tx: tx}}
end
end
defp standby_status_update(lsn) do
[
wal_recv: lsn + 1,
wal_flush: lsn + 1,
wal_apply: lsn + 1,
system_clock: now(),
reply: 0
]
|> msg_standby_status_update()
|> encode()
end
end
CDC.Tx handles messages received within a transaction, begin, relation, insert/update/delete and commit.
defmodule CDC.Tx do
import Postgrex.PgOutput.Messages
alias Postgrex.PgOutput.Lsn
alias __MODULE__.Operation
@type t :: %__MODULE__{
operations: [Operation.t()],
relations: map(),
timestamp: term(),
xid: pos_integer(),
state: :begin | :commit,
lsn: Lsn.t(),
end_lsn: Lsn.t()
}
defstruct [
:timestamp,
:xid,
:lsn,
:end_lsn,
relations: %{},
operations: [],
state: :begin,
decode: true
]
def new(opts \\ []) do
struct(__MODULE__, opts)
end
def finalize(%__MODULE__{state: :commit, operations: ops} = tx) do
%{tx | operations: Enum.reverse(ops)}
end
def finalize(%__MODULE__{} = tx), do: tx
@spec build(t(), tuple()) :: t()
def build(tx, msg_xlog_data(data: data)) do
build(tx, data)
end
def build(tx, msg_begin(lsn: lsn, timestamp: ts, xid: xid)) do
%{tx | lsn: lsn, timestamp: ts, xid: xid, state: :begin}
end
def build(%__MODULE__{state: :begin, relations: relations} = tx, msg_relation(id: id) = rel) do
%{tx | relations: Map.put(relations, id, rel)}
end
def build(%__MODULE__{state: :begin, lsn: tx_lsn} = tx, msg_commit(lsn: lsn, end_lsn: end_lsn))
when tx_lsn == lsn do
%{tx | state: :commit, end_lsn: end_lsn}
end
def build(%__MODULE__{state: :begin} = builder, msg_insert(relation_id: id) = msg),
do: build_op(builder, id, msg)
def build(%__MODULE__{state: :begin} = builder, msg_update(relation_id: id) = msg),
do: build_op(builder, id, msg)
def build(%__MODULE__{state: :begin} = builder, msg_delete(relation_id: id) = msg),
do: build_op(builder, id, msg)
# skip unknown messages
def build(%__MODULE__{} = tx, _msg), do: tx
defp build_op(%__MODULE__{state: :begin, relations: rels, decode: decode} = tx, id, msg) do
rel = Map.fetch!(rels, id)
op = Operation.from_msg(msg, rel, decode)
%{tx | operations: [op | tx.operations]}
end
end
CDC.Tx.Operation handles INSERT/UPDATE/DELETE messages and decodes the data by combining it with the relation
defmodule CDC.Tx.Operation do
@moduledoc "Describes a change (INSERT, UPDATE, DELETE) within a transaction."
import Postgrex.PgOutput.Messages
alias Postgrex.PgOutput.Type, as: PgType
@type t :: %__MODULE__{}
defstruct [
:type,
:schema,
:namespace,
:table,
:record,
:old_record,
:timestamp
]
@spec from_msg(tuple(), tuple(), decode :: boolean()) :: t()
def from_msg(
msg_insert(data: data),
msg_relation(columns: columns, namespace: ns, name: name),
decode?
) do
%__MODULE__{
type: :insert,
namespace: ns,
schema: into_schema(columns),
table: name,
record: cast(data, columns, decode?),
old_record: %{}
}
end
def from_msg(
msg_update(change_data: data, old_data: old_data),
msg_relation(columns: columns, namespace: ns, name: name),
decode?
) do
%__MODULE__{
type: :update,
namespace: ns,
table: name,
schema: into_schema(columns),
record: cast(data, columns, decode?),
old_record: cast(columns, old_data, decode?)
}
end
def from_msg(
msg_delete(old_data: data),
msg_relation(columns: columns, namespace: ns, name: name),
decode?
) do
%__MODULE__{
type: :delete,
namespace: ns,
schema: into_schema(columns),
table: name,
record: %{},
old_record: cast(data, columns, decode?)
}
end
defp into_schema(columns) do
for c <- columns do
c
|> column()
|> Enum.into(%{})
end
end
defp cast(data, columns, decode?) do
Enum.zip_reduce([data, columns], %{}, fn [text, typeinfo], acc ->
key = column(typeinfo, :name)
value =
if decode? do
t =
typeinfo
|> column(:type)
|> PgType.type_info()
PgType.decode(text, t)
else
text
end
Map.put(acc, key, value)
end)
end
end
As before, the primary_keep_alive message with reply == 1 sends a standby_status_update. When we receive an xlog_data message, we create a new %Tx{} which we use to “build” the transaction until we receive a msg_commit which marks the end of the transaction.
Any insert, update, delete messages creates an CDC.Tx.Operation in the transaction, each operation contains a relation_id which is used to look up the relation from tx.relations.
The operation together with the relation enables us to decode the data. Column and type information is retrieved from the relation and is used to decode the values into elixir terms.
.
Once we are in a commit state we merge Tx.relations with Protocol.relations since a relation message will only be transmitted the first time a table is encountered during the connection session, Protocol.relations contains all msg_relation we’ve been sent during the session.
The CDC.Replication module now looks like this:
defmodule CDC.Replication do
use Postgrex.ReplicationConnection
alias CDC.Protocol
require Logger
defstruct [
:publications,
:protocol,
:slot,
:state
]
def start_link(opts) do
conn_opts = [auto_reconnect: true]
publications = opts[:publications] || raise ArgumentError, message: "`:publications` is required"
slot = opts[:slot] || raise ArgumentError, message: "`:slot` is required"
Postgrex.ReplicationConnection.start_link(
__MODULE__,
{slot, publications},
conn_opts ++ opts
)
end
@impl true
def init({slot, pubs}) do
{:ok,
%__MODULE__{
slot: slot,
publications: pubs,
protocol: Protocol.new()
}}
end
@impl true
def handle_connect(%__MODULE__{slot: slot} = state) do
query = "CREATE_REPLICATION_SLOT #{slot} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"
Logger.debug("[create slot] query=#{query}")
{:query, query, %{state | state: :create_slot}}
end
@impl true
def handle_result(
[%Postgrex.Result{} | _],
%__MODULE__{state: :create_slot, publications: pubs, slot: slot} = state
) do
opts = [proto_version: 1, publication_names: pubs]
query = "START_REPLICATION SLOT #{slot} LOGICAL 0/0 #{escape_options(opts)}"
Logger.debug("[start streaming] query=#{query}")
{:stream, query, [], %{state | state: :streaming}}
end
@impl true
def handle_data(msg, state) do
{return_msgs, tx, protocol} = Protocol.handle_message(msg, state.protocol)
if not is_nil(tx) do
Logger.debug("Tx: #{inspect(tx, pretty: true)}")
end
{:noreply, return_msgs, %{state | protocol: protocol}}
end
defp escape_options([]),
do: ""
defp escape_options(opts) do
parts =
Enum.map_intersperse(opts, ", ", fn {k, v} -> [Atom.to_string(k), ?\s, escape_string(v)] end)
[?\s, ?(, parts, ?)]
end
defp escape_string(value) do
[?', :binary.replace(to_string(value), "'", "''", [:global]), ?']
end
end
handle_data/2 calls Protocol.handle_message/1 which returns a tuple with three elements {messages_to_send :: [binary()], complete_transaction :: CDC.Tx.t() | nil, CDC.Protocol.t()}
For now we just inspect the transaction when it’s emitted from Protocol.handle_message/3, let’s try it out:
Interactive Elixir (1.14.0) - press Ctrl+C to exit (type h() ENTER for help)
opts = [
slot: "articles_slot_elixir",
publications: ["articles_pub"],
host: "localhost",
database: "postgres",
username: "postgres",
password: "postgres",
port: 5432,
]
{:ok, _} = CDC.Replication.start_link(opts)
{:ok, pid} = Postgrex.start_link(opts)
insert_query = """
INSERT INTO articles (title, description, body)
VALUES ('Postgres replication', 'Using logical replication', 'with Elixir!')
"""
_ = Postgrex.query!(pid, insert_query, [])
14:03:48.020 [debug] Tx: %CDC.Tx{
timestamp: ~U[2022-10-31 13:03:48Z],
xid: 494,
lsn: {0, 22981920},
end_lsn: nil,
relations: %{
16386 => {:msg_relation, 16386, "public", "articles", :default,
[
{:column, [:key], "id", :int4, -1},
{:column, [], "title", :text, -1},
{:column, [], "description", :text, -1},
{:column, [], "body", :text, -1}
]}
},
operations: [
%CDC.Tx.Operation{
type: :insert,
schema: [
%{flags: [:key], modifier: -1, name: "id", type: :int4},
%{flags: [], modifier: -1, name: "title", type: :text},
%{flags: [], modifier: -1, name: "description", type: :text},
%{flags: [], modifier: -1, name: "body", type: :text}
],
namespace: "public",
table: "articles",
record: %{
"body" => "with Elixir!",
"description" => "Using logical replication",
"id" => 6,
"title" => "Postgres replication"
},
old_record: %{},
timestamp: nil
}
],
state: :begin,
decode: true
}
Each change in the transaction is stored in Tx.operations, operation.record is the decoded row as a map.
Finally let’s implement a way to subscribe to changes from CDC.Replication:
defmodule CDC.Replication do
use Postgrex.ReplicationConnection
alias CDC.Protocol
require Logger
defstruct [
:publications,
:protocol,
:slot,
:state,
subscribers: %{}
]
def start_link(opts) do
conn_opts = [auto_reconnect: true]
publications = opts[:publications] || raise ArgumentError, message: "`:publications` is required"
slot = opts[:slot] || raise ArgumentError, message: "`:slot` is required"
Postgrex.ReplicationConnection.start_link(
__MODULE__,
{slot, publications},
conn_opts ++ opts
)
end
def subscribe(pid, opts \\ []) do
Postgrex.ReplicationConnection.call(pid, :subscribe, Keyword.get(opts, :timeout, 5_000))
end
def unsubscribe(pid, ref, opts \\ []) do
Postgrex.ReplicationConnection.call(
pid,
{:unsubscribe, ref},
Keyword.get(opts, :timeout, 5_000)
)
end
@impl true
def init({slot, pubs}) do
{:ok,
%__MODULE__{
slot: slot,
publications: pubs,
protocol: Protocol.new()
}}
end
@impl true
def handle_connect(%__MODULE__{slot: slot} = state) do
query = "CREATE_REPLICATION_SLOT #{slot} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"
Logger.debug("[create slot] query=#{query}")
{:query, query, %{state | state: :create_slot}}
end
@impl true
def handle_result(
[%Postgrex.Result{} | _],
%__MODULE__{state: :create_slot, publications: pubs, slot: slot} = state
) do
opts = [proto_version: 1, publication_names: pubs]
query = "START_REPLICATION SLOT #{slot} LOGICAL 0/0 #{escape_options(opts)}"
Logger.debug("[start streaming] query=#{query}")
{:stream, query, [], %{state | state: :streaming}}
end
@impl true
def handle_data(msg, state) do
{return_msgs, tx, protocol} = Protocol.handle_message(msg, state.protocol)
if not is_nil(tx) do
notify(tx, state.subscribers)
end
{:noreply, return_msgs, %{state | protocol: protocol}}
end
# Replies must be sent using `reply/2`
# https://hexdocs.pm/postgrex/Postgrex.ReplicationConnection.html#reply/2
@impl true
def handle_call(:subscribe, {pid, _} = from, state) do
ref = Process.monitor(pid)
state = put_in(state.subscribers[ref], pid)
Postgrex.ReplicationConnection.reply(from, {:ok, ref})
{:noreply, state}
end
def handle_call({:unsubscribe, ref}, from, state) do
{reply, new_state} =
case state.subscribers do
%{^ref => _pid} ->
Process.demonitor(ref, [:flush])
{_, state} = pop_in(state.subscribers[ref])
{:ok, state}
_ ->
{:error, state}
end
from && Postgrex.ReplicationConnection.reply(from, reply)
{:noreply, new_state}
end
@impl true
def handle_info({:DOWN, ref, :process, _, _}, state) do
handle_call({:unsubscribe, ref}, nil, state)
end
defp notify(tx, subscribers) do
for {ref, pid} <- subscribers do
send(pid, {:notification, self(), ref, tx})
end
:ok
end
defp escape_options([]),
do: ""
defp escape_options(opts) do
parts =
Enum.map_intersperse(opts, ", ", fn {k, v} -> [Atom.to_string(k), ?\s, escape_string(v)] end)
[?\s, ?(, parts, ?)]
end
defp escape_string(value) do
[?', :binary.replace(to_string(value), "'", "''", [:global]), ?']
end
end
And we can use it like this:
opts = [
slot: "articles_slot",
publications: ["articles_pub"],
host: "localhost",
database: "postgres",
username: "postgres",
password: "postgres",
port: 5432,
]
{:ok, pid} = CDC.Replication.start_link(opts)
{:ok, pg_pid} = Postgrex.start_link(opts)
{:ok, ref} = CDC.Replication.subscribe(pid)
insert_query = """
INSERT INTO articles (title, description, body)
VALUES ('Postgres replication', 'Using logical replication', 'with Elixir!')
"""
_ = Postgrex.query!(pg_pid, insert_query, [])
flush()
{:notification, #PID<0.266.0>, #Reference<0.2499916608.3416784901.94813>,
%CDC.Tx{
timestamp: ~U[2022-10-31 13:26:35Z],
xid: 495,
lsn: {0, 22983536},
end_lsn: nil,
relations: %{
16386 => {:msg_relation, 16386, "public", "articles", :default,
[
{:column, [:key], "id", :int4, -1},
{:column, [], "title", :text, -1},
{:column, [], "description", :text, -1},
{:column, [], "body", :text, -1}
]}
},
operations: [
%CDC.Tx.Operation{
type: :insert,
schema: [
%{flags: [:key], modifier: -1, name: "id", type: :int4},
%{flags: [], modifier: -1, name: "title", type: :text},
%{flags: [], modifier: -1, name: "description", type: :text},
%{flags: [], modifier: -1, name: "body", type: :text}
],
namespace: "public",
table: "articles",
record: %{
"body" => "with Elixir!",
"description" => "Using logical replication",
"id" => 7,
"title" => "Postgres replication"
},
old_record: %{},
timestamp: nil
}
],
state: :begin,
decode: true
}}
If you’re looking for a way to capture changes from your database with minimal changes to your existing setup, Changing Data Capture is definitely worth considering. With Elixir and postgrex we’ve implemented a mini Debezium in ~400 LOC. Full source is available here.
If you need help with your Elixir implementation our world-leading team of experts is always here to help. Contact us today to find out how we can help you.
Pawel Chrząszcz introduces MongooseIM 6.3.0 with Prometheus monitoring and CockroachDB support for greater scalability and flexibility.
Here's how machine learning drives business efficiency, from customer insights to fraud detection, powering smarter, faster decisions.
Phuong Van explores Phoenix LiveView implementation, covering data migration, UI development, and team collaboration from concept to production.