Using python-mysql-replication with Dolt
Dolt is the world's first version controlled SQL database – you can branch/diff/fork/merge/rebase your data, in all the same ways that Git lets you work with source code files. Thanks to Dolt's custom storage engine, these operations to compute diffs and to merge branches are extremely fast, too. ⚡️ Dolt is fully compatible with MySQL, so you can use it as a drop-in replacement anywhere you're running a MySQL server. We've been highlighting this compatibility recently, and in today's post, we're taking a look at another MySQL tool that works with Dolt.
Python MySQL Replication
Lately, I've been writing about Dolt's support for Dolt-to-MySQL replication, using the MySQL binlog replication protocol. In addition to Dolt's native replication format that's optimized for Dolt-to-Dolt replication, we also support the MySQL binlog replication protocol for compatibility with MySQL replication. In 2023, we launched MySQL-to-Dolt replication, and this year, we launched Dolt-to-MySQL replication. That means a Dolt SQL server can now serve as either a source or a replica in a MySQL replication topology.
One great use case for Dolt-to-MySQL replication is to enable Change Data Capture tools to monitor data changes and alert other parts of your system when certain data changes. In our previous post on Dolt-to-MySQL replication, we showed how you can hook up Debezium to monitor a Dolt branch for data changes. Debezium is a great solution for change data capture systems, especially if you're already running Kafka for an event streaming platform. In this post, we're going to look a similar, but simpler tool – the Python MySQL Replication library, python-mysql-replication
. This library works in a very similar way to Debezium, by connecting to a MySQL compatible server and consuming a stream of binlog events. The biggest difference with this library is that it doesn't transform the raw events and publish them to a Kafka topic; instead, you write Python code that directly processes the binlog events. Debezium is a great choice for a very robust and scalable solution, but for many applications that don't need that level of complexity, python-mysql-replication
can be a great fit.
Demo
Let's jump right in and see the python-mysql-replication
library in action. We'll start by setting up a Dolt SQL server, then we'll write a simple Python script to show the changes being made in our Dolt database, and finally we'll extend that script to do something useful when a specific data change occurs.
Dolt SQL Server
First things, first, we need a Dolt SQL server to connect to. We could create a new database here, but for this demo, we'll clone the existing dolthub/employees
database from DoltHub, so that we've got some sample data to work with. This database has a few tables that represent a fake company's employees, their titles, and their salaries.
# Create a new directory to hold our cloned Dolt database and the Python script we'll write later
mkdir pythonMysqlReplication && cd pythonMysqlReplication
# Clone the dolthub/employees database
dolt clone dolthub/employees
cd employees
Now that we've got a Dolt database to work with, let's set a few options to enable binlog replication support before we start up the server. This options must be set before the Dolt SQL server starts up, otherwise binlog replication won't be enabled, so we'll use dolt sql
to persist a few system variables. Setting @@log_bin
is what enables data changes to be recorded in binary log events, and @@gtid_mode
and @@enforce_gtid_consistency
are required for replication with GTID position (the only positioning mode supported by Dolt).
dolt sql -q "SET @@PERSIST.log_bin=ON;"
dolt sql -q "SET @@PERSIST.gtid_mode=ON;"
dolt sql -q "SET @@PERSIST.enforce_gtid_consistency=ON;"
After setting those options to enable binlog replication, we need to set one more option for the python-mysql-replication
library to work with Dolt. The library requires that the @@binlog_row_metadata
system variable to FULL
(the default is MINIMAL
). Although Dolt doesn't fully support sending full row metadata, we can set this system variable to FULL
to keep the library happy. This limitation means that we won't be able to look up row values by column name, but we can still reference them by their ordinal position, as we'll see later. If you hit issues with compatibility here, please send us a GitHub issue to request full support for @@binlog_row_metadata
and we'll be happy to dig into the details.
dolt sql -q "SET @@PERSIST.binlog_row_metadata=FULL;"
Now that we've configured our replication options through those system variables, we're ready to start up the Dolt SQL server:
dolt sql-server --loglevel DEBUG
The dolt sql-server
process will stay in the foreground in our terminal and print out any log messages. You should see a message like Enabling binary logging
if you got the system variables set up correctly. Leave this running here, and let's go open a new terminal window, then use the mysql
command line client to connect to our running database server.
mysql -uroot --protocol TCP employees
Once you're logged into the Dolt SQL server, you can view all the tables and run some queries to take a look at what's in this database.
mysql> show tables;
+----------------------+
| Tables_in_employees |
+----------------------+
| current_dept_emp |
| departments |
| dept_emp |
| dept_emp_latest_date |
| dept_manager |
| employees |
| salaries |
| titles |
+----------------------+
8 rows in set (0.00 sec)
mysql> select * from employees limit 5;
+--------+------------+------------+-----------+--------+------------+
| emp_no | birth_date | first_name | last_name | gender | hire_date |
+--------+------------+------------+-----------+--------+------------+
| 10001 | 1953-09-02 | Georgi | Facello | M | 1986-06-26 |
| 10002 | 1964-06-02 | Bezalel | Simmel | F | 1985-11-21 |
| 10003 | 1959-12-03 | Parto | Bamford | M | 1986-08-28 |
| 10004 | 1954-05-01 | Chirstian | Koblick | M | 1986-12-01 |
| 10005 | 1955-01-21 | Kyoichi | Maliniak | M | 1989-09-12 |
+--------+------------+------------+-----------+--------+------------+
5 rows in set (0.00 sec)
Finally, before we head to a new terminal window, let's grab the server's UUID. We'll need this later to pass in to the python-mysql-replication
library.
mysql> select @@server_uuid;
+--------------------------------------+
| @@server_uuid |
+--------------------------------------+
| 2cd4d78c-65ac-492d-bd15-7a063529cbdb |
+--------------------------------------+
1 row in set (0.00 sec)
Hello python-mysql-replication
World!
The python-mysql-replication
library comes with an example dump_events.py
script that simply prints out the events it's capturing. I started with this example and made a few small tweaks to make it work for Dolt. You can download the Gist from GitHub, or just copy the code below into a dump_events.py
file.
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.event import HeartbeatLogEvent
mysql_settings = {'host': '127.0.0.1', 'port': 3306, 'user': 'root', 'passwd': ''}
stream = BinLogStreamReader(connection_settings=mysql_settings, blocking=True, server_id=100, auto_position='2cd4d78c-65ac-492d-bd15-7a063529cbdb:1')
for binlogevent in stream:
if not isinstance(binlogevent, HeartbeatLogEvent):
binlogevent.dump()
stream.close()
Before you can run this script, you'll need to adjust the auto_position
parameter, but first, let's explain each of the options we're passing to BinLogStreamReader
:
blocking=True
– this option tells the library to use normal, or blocking replication, instead of the non-blocking replication mode. You must specify this option when working with Dolt, since Dolt does not currently support MySQL's non-blocking replication mode. If you don't specify it,python-mysql-replication
will default toblocking=False
and you won't receive events from the Dolt server.server_id=100
– every node in a MySQL replication topology must use a unique server ID. The value doesn't really matter, as long as it's unique, so choose your favorite integer here.auto_position='5d9f7521-dbe6-46bd-a5dd-f5ae568c00ec:1'
– this is the GTID position that the library will send to the Dolt server so that the Dolt server knows what events need to be streamed to the library.python-mysql-replication
needs this parameter to be specified in order to use GTID auto-positioning (the only replication positioning mode that Dolt supports). You must change this for your Dolt server's unique@@server_uuid
! Take the result of theselect @@server_uuid;
query you ran against the Dolt SQL server in the last step, add ":1" at the end to signal the very first GTID on that server, and use that as the value for theauto_position
parameter.
The only other difference between our script and the example from the python-mysql-replication
library is that we're filtering out HeartbeatLogEvent
events. These events are sent by the MySQL server to ensure that the connection is still alive, and we don't need to process them in our script, so we just ignore them.
Let's try running our script...
python dump_events.py
=== RotateEvent ===
Position: 4
Next binlog file: binlog-main.000002
=== FormatDescriptionEvent ===
Date: 2024-08-05T19:52:12
Log position: 120
Event size: 93
Read bytes: 73
Binlog version: (4,)
mysql version: 5.6.33-0ubuntu0.14.04.1-log
Created: 1722887532
Common header length: 19
Post header length: (56, 13, 0, 8, 0, 18, 0, 4, 4, 4, 4, 18)
Server version split: (0, 0, 92)
Number of event types: 0
=== PreviousGtidsEvent ===
Date: 2024-08-05T19:52:12
Log position: 151
Event size: 8
Read bytes: 8
previous_gtids:
If the script connected correctly, then you should see a few events printed out immediately. Every MySQL binlog event stream starts off with a RotateEvent
to let the connected replica know what file is being served, and then a FormatDescriptionEvent
to tell the connected replica how the binlog events are formatted, and then finally a PreviousGtidsEvent
that tells the connected replica where the GTID position is.
At this point, our script is waiting for more events to come over the connection. Let's jump back over to the terminal window with our SQL shell open to our Dolt SQL server. We'll make some changes and see if they show up in the output of our Python program.
CREATE TABLE newTable(pk int primary key, col1 varchar(200));
INSERT INTO newTable VALUES (1, "hello world!");
Back in our terminal window with our running Python process, we see several more events generated from the two statements we executed above:
=== GtidEvent ===
Date: 2024-08-05T23:06:01
Log position: 239
Event size: 25
Read bytes: 26
Commit: True
GTID_NEXT: 2cd4d78c-65ac-492d-bd15-7a063529cbdb:9
=== QueryEvent ===
Date: 2024-08-05T23:06:01
Log position: 441
Event size: 179
Read bytes: 179
Schema: b'employees'
Execution time: 0
Query: CREATE TABLE `newTable` (
`pk` int NOT NULL,
`col1` varchar(200),
PRIMARY KEY (`pk`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_bin;
=== GtidEvent ===
Date: 2024-08-05T23:06:01
Log position: 489
Event size: 25
Read bytes: 26
Commit: True
GTID_NEXT: 2cd4d78c-65ac-492d-bd15-7a063529cbdb:10
=== QueryEvent ===
Date: 2024-08-05T23:06:01
Log position: 540
Event size: 28
Read bytes: 28
Schema: b'employees'
Execution time: 0
Query: BEGIN
=== TableMapEvent ===
Date: 2024-08-05T23:06:01
Log position: 599
Event size: 36
Read bytes: 36
Table id: 3
Schema: employees
Table: newTable
Columns: 2
=== OptionalMetaData ===
unsigned_column_list: []
default_charset_collation: None
charset_collation: {}
column_charset: []
column_name_list: []
set_str_value_list : []
set_enum_str_value_list : []
geometry_type_list : []
simple_primary_key_list: []
primary_keys_with_prefix: {}
visibility_list: []
charset_collation_list: []
enum_and_set_collation_list: []
=== WriteRowsEvent ===
Date: 2024-08-05T23:06:01
Log position: 653
Event size: 31
Read bytes: 12
Table: employees.newTable
Affected columns: 2
Changed rows: 1
Column Name Information Flag: False
Values:
--
* UNKNOWN_COL0 : 1
* UNKNOWN_COL1 : hello world!
=== XidEvent ===
Date: 2024-08-05T23:06:01
Log position: 684
Event size: 8
Read bytes: 8
Transaction ID: 0
Each SQL transaction is preceded by a GtidEvent
that marks the transaction boundary. The DDL CREATE TABLE
statement is represented by a QueryEvent
and the insert into the table is represented by a TableMapEvent
that describes the table being updated and a WriteRowsEvent
that includes the data being inserted. The final XidEvent
marks the end of the DML transaction.
This is the basic structure of how the python-mysql-replication
library works. It exposes the raw binlog events for you to process in your Python code. As the Dolt database changes, your Python app receives binlog events describing those changes. The example so far only used some simple SQL statements, but we can also use all of Dolt's version control functionality and also receive binlog events describing how those change the database's data. For example, the changes we've made so far are still in our current working set, because we haven't created a Dolt commit to add them to the database's commit graph. We can add them to the commit graph by running:
call dolt_commit('-Am', 'Adding a new table');
From the SQL shell, we can view the commit graph by looking at the dolt_log
system table:
select * from dolt_log;
+----------------------------------+-----------+-----------------+---------------------+----------------------------+
| commit_hash | committer | email | date | message |
+----------------------------------+-----------+-----------------+---------------------+----------------------------+
| 66snjo19cfinmqnf73okdlq8m1t3b7nn | root | root@% | 2024-08-06 18:11:55 | Adding a new table |
| a3p5ulod2rerno98191lhqnuhhfgppno | timsehn | tim@dolthub.com | 2023-11-27 23:28:41 | imported data |
| oi4sc8120pevl344fv60b865s2p73169 | timsehn | tim@dolthub.com | 2023-11-27 23:15:25 | Initialize data repository |
+----------------------------------+-----------+-----------------+---------------------+----------------------------+
3 rows in set (0.00 sec)
If we want to clear out those changes in the most recent commit, we can use the dolt_reset()
stored procedure to reset back to the any other commit:
call dolt_reset('--hard', 'HEAD~1');
Back in the terminal window with our Python script running, we see a bunch more binlog events printed out that undo creating the newTable
table:
=== GtidEvent ===
Date: 2024-08-06T16:47:52
Log position: 1301
Event size: 25
Read bytes: 26
Commit: True
GTID_NEXT: 7c53ef27-9e8b-41c0-af2a-073dd9afcc6b:6
=== QueryEvent ===
Date: 2024-08-06T16:47:52
Log position: 1369
Event size: 45
Read bytes: 45
Schema: b'employees'
Execution time: 0
Query: DROP TABLE `newTable`;
Now that you know the basics of how the python-mysql-replication
library works, let's see how we can modify our script to do something a little more useful.
Monitoring for New Hires
Okay, so now that we've got a simple working example with the python-mysql-replication
library, let's take this one step further and do something useful. Let's say that whenever a new employee is added to the database, we want to trigger a notification to our IT team to get the new hire's laptop and email account set up. We can do this by changing our script to monitor for WriteRowsEvent
binlog events to the employees
table and then trigger some sort of custom action to notify the IT team, such as sending them a Slack message or pushing a message into a queue.
Grab the source code below and copy it into a file called monitor_new_hires.py
, or download it from this GitHub Gist.
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import WriteRowsEvent
mysql_settings = {'host': '127.0.0.1', 'port': 3306, 'user': 'root', 'passwd': ''}
stream = BinLogStreamReader(connection_settings=mysql_settings, blocking=True, server_id=100, auto_position='2cd4d78c-65ac-492d-bd15-7a063529cbdb:1')
for binlogevent in stream:
if isinstance(binlogevent, WriteRowsEvent) and binlogevent.table == "employees":
for row in binlogevent.rows:
# NOTE: Because Dolt doesn't send the full row metadata, we don't have column names available to us here,
# so instead, we can access the columns using column indexes instead.
first_name = row["values"]["UNKNOWN_COL2"]
last_name = row["values"]["UNKNOWN_COL3"]
start_date = row["values"]["UNKNOWN_COL5"]
print(f"Detected new hire ({last_name}, {first_name}) starting on {start_date}")
# TODO: From here, you could integrate with HR's systems to let them know about the new hire
# For example, you could send a message to a slack channel, or you could send a message
# to a queue that another system monitors.
stream.close()
Don't forget to swap out the value for auto_position
with the correct @@server_uuid
for your running Dolt server! You can grab this value by querying the Dolt server with the select @@server_uuid;
statement.
Then go ahead and run the script.
python monitor_new_hires.py
Now let's go back to our SQL shell and insert a new employee into the database:
insert into employees values (1000001, "1981-02-16", "Cooper", "McBear", "M", "2020-02-20");
And back on the terminal where we're running our Python script, you should see the output:
Detected new hire (McBear, Cooper) starting on 2020-02-20
And there you have it! With just a few lines of Python code, we have an application that is monitoring our Dolt database and taking action based on the type of data changes it sees happening. You could imagine extending this to support all sorts of different actions on data changes, such as deprovisioning accounts when employees leave the company, or sending alerts if a department has reached their hiring budget for the year.
Conclusion
The python-mysql-replication
library is a great way to capture changes from a MySQL compatible database, like Dolt. It's a lightweight tool that lets you easily connect to a stream of binlog events and write custom Python code to take action on the changes. We saw how easy it was to connect this library to a Dolt SQL server and how to process the raw binlog events that it captures and exposes to your Python application. We also saw that using Dolt's version control features that change data, like dolt_reset()
or dolt_merge()
, generate the same binlog events as any other data changes.
If you're interested in change data capture, database replication, or data versioning, we hope you'll join us on Discord to talk with our engineering team and other Dolt users.