https://www.percona.com/blog/wp-content/uploads/2022/08/Ra_ignore_t.png
Recently I have been working with a few customers with multiple terabytes of transactional data on their MySQL clusters. These very large datasets are not really needed for their daily operations but they are very convenient because they allow them to query historical data easily. However the convenience comes at a high price, you pay a lot more for the storage, backup and restoration take much longer, and are, of course, much larger. So, the question is: how can they perform “efficient data archiving”?
Let’s try to define what would be an efficient data archiving architecture. We can layout some key requirements:
- The archive should be on an asynchronous replica
- The archive replica should be using a storage configuration optimized for large dataset
- The regular cluster should just be deleting data normally
- The archiving system should remove delete statements from the replication stream and keep only the inserts and updates
- The archiving system should be robust and able to handle failures and resume replication
Key elements
Our initial starting point is something like this:
The cluster is composed of a source (S) and two replicas (R1 and R2) and we are adding a replica for the archive (RA). The existing cluster is pretty much irrelevant in all the discussions that will follow, as long as the row-based replication format is used with full-row images.
The above setup is in theory sufficient to archive data but in order to do so, we must not allow the delete statements on the tables we want to archive to flow through the replication stream. The deletions must be executed with sql_log_bin = 0 on all the normal servers. Although this may look simple, it has a number of drawbacks. A cron job or a SQL event must be called regularly on all the servers. These jobs must delete the same data on all the production servers. Likely this process will introduce some differences between the tables. Verification tools like pt-table-checksum may start to report false positives. As we’ll see, there are other options.
Capturing the changes (CDC)
An important component we need is a way to capture the changes going to the table we want to archive. The MySQL binary log, when used with the row-based format and full row image, is perfect for the purpose. We need a tool that can connect to a database server like a replica, convert the binary log event into a usable form, and keep track of its position in the binary log.
For this project, we’ll use Maxwell, a tool developed by Zendesk. Maxwell connects to a source server like a regular replica and outputs the row-based events in JSON format. It keeps track of its replication position in a table on the source server.
Removing deletions
Since the CDC component will output the events in JSON format, we just need to filter for the tables we are interested in and then ignore the delete events. You can use any programming language that has decent JSON and MySQL support. In this post, I’ll be using Python.
Storage engine for the archives
InnoDB is great for transactional workload but far less optimal for archiving data. MyRocks is a much better option, as it is write-optimized and is much more efficient at data compression.
Architectures for efficient data archiving
Shifted table
We have a few architectural options for our archiving replica. The first architecture, shown below, hooks the CDC to the archiving replica. This means if we are archiving table t, we’ll need to have on the archiving replica both the production t, from which data is deleted, and the archived copy tA, which keeps its data long term.
The main advantage of this architecture is that all the components related to the archiving process only interact with the archiving replica. The negative side is, of course, the presence of duplicate data on the archiving replica as it has to host both t and tA. One could argue that the table t could be using the blackhole storage engine but let’s not dive down such a rabbit hole.
Ignored table
Another architectural option is to use two different replication streams from the source. The first stream is the regular replication link but the replica has the replication option replicate-ignore-table=t. The replication events for table t are handled by a second replication link controlled by Maxwell. The deletions events are removed and the inserts and updates are applied to the archiving replica.
While this later architecture stores only a single copy of t on the archiving replica, it needs two full replication streams from the source.
Example
The application
My present goal is to provide an example as simple as possible while still working. I’ll be using the Shifted table approach with the Sysbench tpc-c script. This script has an option, enable_purge, that removes old orders that have been processed. Our goal is to create the table tpccArchive.orders1 which contains all the rows, even the deleted ones, while the table tpcc.orders1 is the regular orders table. They have the same structure but the archive table is using MyRocks.
Let’s first prepare the archive table:
mysql> create database tpccArchive;
Query OK, 1 row affected (0,01 sec)
mysql> use tpccArchive;
Database changed
mysql> create table orders1 like tpcc.orders1;
Query OK, 0 rows affected (0,05 sec)
mysql> alter table orders1 engine=rocksdb;
Query OK, 0 rows affected (0,07 sec)
Records: 0 Duplicates: 0 Warnings: 0
Capturing the changes
Now, we can install Maxwell. Maxwell is a Java-based application so a compatible JRE is needed. It will also connect to MySQL as a replica so it needs an account with the required grants. It also needs its own maxwell schema in order to persist replication status and position.
root@LabPS8_1:~# apt-get install openjdk-17-jre-headless
root@LabPS8_1:~# mysql -e "create user maxwell@'localhost' identified by 'maxwell';"
root@LabPS8_1:~# mysql -e 'create database maxwell;'
root@LabPS8_1:~# mysql -e 'grant ALL PRIVILEGES ON maxwell.* TO maxwell@localhost;'
root@LabPS8_1:~# mysql -e 'grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO maxwell@localhost;'
root@LabPS8_1:~# curl -sLo - https://github.com/zendesk/maxwell/releases/download/v1.37.6/maxwell-1.37.6.tar.gz| tar zxvf -
root@LabPS8_1:~# cd maxwell-1.37.6/
root@LabPS8_1:~/maxwell-1.37.6# ./bin/maxwell -help
Help for Maxwell:
Option Description
------ -----------
--config <String> location of config.properties file
--env_config <String> json object encoded config in an environment variable
--producer <String> producer type: stdout|file|kafka|kinesis|nats|pubsub|sns|sqs|rabbitmq|redis|custom
--client_id <String> unique identifier for this maxwell instance, use when running multiple maxwells
--host <String> main mysql host (contains `maxwell` database)
--port <Integer> port for host
--user <String> username for host
--password <String> password for host
--help [ all, mysql, operation, custom_producer, file_producer, kafka, kinesis, sqs, sns, nats, pubsub, output, filtering, rabbitmq, redis, metrics, http ]
In our example, we’ll use the stdout producer to keep things as simple as possible.
Filtering script
In order to add and update rows to the tpccArchive.orders1 table, we need a piece of logic that will identify events for the table tpcc.orders1 and ignore the delete statements. Again, for simplicity, I chose to use a Python script. I won’t present the whole script here, feel free to download it from my GitHub repository. It is essentially a loop on line written to stdin. The line is loaded as a JSON string and then some decisions are made based on the values found. Here’s a small section of code at its core:
...
for line in sys.stdin:
j = json.loads(line)
if j['database'] == dbName and j['table'] == tableName:
debug_print(line)
if j['type'] == 'insert':
# Let's build an insert ignore statement
sql += 'insert ignore into ' + destDbName + '.' + tableName
...
The above section creates an “insert ignore” statement when the event type is ‘insert’. The script connects to the database using the user archiver and the password tpcc and then applies the event to the table tpccArchive.orders1.
root@LabPS8_1:~# mysql -e "create user archiver@'localhost' identified by 'tpcc';"
root@LabPS8_1:~# mysql -e 'grant ALL PRIVILEGES ON tpccArchive.* TO archiver@localhost;'
All together
Just to make it easy to reproduce the steps, here’s the application (tpcc) side:
yves@ThinkPad-P51:~/src/sysbench-tpcc$ ./tpcc.lua --mysql-host=10.0.4.158 --mysql-user=tpcc --mysql-password=tpcc --mysql-db=tpcc \
--threads=1 --tables=1 --scale=1 --db-driver=mysql --enable_purge=yes --time=7200 --report-interval=10 prepare
yves@ThinkPad-P51:~/src/sysbench-tpcc$ ./tpcc.lua --mysql-host=10.0.4.158 --mysql-user=tpcc --mysql-password=tpcc --mysql-db=tpcc \
--threads=1 --tables=1 --scale=1 --db-driver=mysql --enable_purge=yes --time=7200 --report-interval=10 run
The database is running a VM whose IP is 10.0.4.158. The enable_purge option causes old orders1 to be deleted. For the archiving side, running on the database VM:
root@LabPS8_1:~/maxwell-1.37.6# bin/maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' \
--producer=stdout 2> /tmp/maxerr | python3 ArchiveTpccOrders1.py
After the two hours tpcc run we have:
mysql> select TABLE_SCHEMA, TABLE_ROWS, DATA_LENGTH, INDEX_LENGTH, ENGINE from information_schema.tables where table_name='orders1';
+--------------+------------+-------------+--------------+---------+
| TABLE_SCHEMA | TABLE_ROWS | DATA_LENGTH | INDEX_LENGTH | ENGINE |
+--------------+------------+-------------+--------------+---------+
| tpcc | 48724 | 4210688 | 2310144 | InnoDB |
| tpccArchive | 1858878 | 38107132 | 14870912 | ROCKSDB |
+--------------+------------+-------------+--------------+---------+
2 rows in set (0,00 sec)
A more realistic architecture
The above example is, well, an example. Any production system will need to be hardened much more than my example. Here are a few requirements:
- Maxwell must be able to restart and continue from the correct replication position
- The Python script must be able to restart and continue from the correct replication position
- The Python script must be able to reconnect to MySQL and retry a transaction if the connection is dropped.
Maxwell already takes care of the first point, it uses the database to store its current position.
The following logical step would be to add a more robust queuing system than a simple process pipe between Maxwell and the Python script. Maxwell supports many queuing systems like kafka, kinesis, rabbitmq, redis and many others. For our application, I tend to like a solution using kafka and a single partition. kafka doesn’t manage the offset of the message, it is up to the application. This means the Python script could update a row of a table as part of every transaction it is applying to keep track of its position in the kafka stream. If the archive tables are using RocksDB, the queue position tracking table should also use RocksDB so the database transaction is not across storage engines.
Conclusion
In this post, I provided a solution to archive data using the MySQL replication binary logs. Archiving fast-growing tables is a frequent need and hopefully, such a solution can help. It would be great to have a MySQL plugin on the replica able to filter the replication events directly. This would remove the need for an external solution like Maxwell and my python script. Generally speaking, however, this archiving solution is just a specific case of a summary table. In a future post, I hope to present a more complete solution that will also maintain a summary.
Percona Database Performance Blog