Realtime Data with MariaDB and Websocket [Part 1]

Uncategorized
5k words

Goal

We are going to learn and build a service that serves real-time data with MariaDB as the underlying database.

Why?

User expectations are increasing, and real-time data has become mandatory for a great web application. In this article, we are going to see possible options to build real-time data that serves data with MariaDB as our database.

There are a lot of software that offers this kind of real-time feature, one of them is debezium another is supabase, but we are going to create it our own.

Pre-requisite

  • java17
  • mariadb

How?

There are multiple options to build real-time data, one of them is polling the table that users registered to, for example, if a user subscribes to table user_profile there will be a service that regularly queries into that table and check if the table has been modified.

It’s not the most efficient method, but it will get the job done.

Another alternative is by monitoring a binlog file (binary log) file.

The term “binary log file” generally denotes an individual numbered file containing database events. The term “binary log” collectively denotes the set of numbered binary log files plus the index file.

In short, it is a collection of events that are happening in a database.

These log files are mostly used for replication sources and we are using it to listen to any changes that is happening in our database.

We are going to use 2 main libraries.

  1. mysql-bin-log-connector to parse event from binlog file.
  2. JSqlParser to parse the query we retrieve from binlog.

Monitoring binlog file

A MariaDB uses mixed-logging as its default binary format.

To enable it add --log-bin --log-basename=[your_logfile_name]

When the database starts there should be a file in your data directory called your_logfile_name-bin.000001

Part 1, The Service

In this part, we are going to learn how to build a service that listens to the changes in the database and parses it.

You can get the full code in this repo

First thing first, we are going to start our project, in this example, I am going to start a micronaut project because later we are going to use its web socket feature to streamline the data.

But you can use a simple gradle project or event you can build the project from scratch.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.BinaryLogClient.EventListener;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;

import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.statement.delete.Delete;
import net.sf.jsqlparser.statement.insert.Insert;
import net.sf.jsqlparser.statement.update.Update;

 public static void main(String[] args) throws IOException, InterruptedException {

        // use your credential, make sure you have REPLICATION_SLAVE privilege or you can specific your binlog path as stated https://github.com/osheroff/mysql-binlog-connector-java#tapping-into-mysql-replication-stream
        BinaryLogClient client = new BinaryLogClient("host", 3306, "user", "password");

        EventDeserializer eventDeserializer = new EventDeserializer();

        eventDeserializer.setCompatibilityMode(
                EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
                EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY);
        client.setEventDeserializer(eventDeserializer);

        client.registerEventListener(new EventListener() {
            @Override
            public void onEvent(Event event) {
                if (event.getHeader().getEventType() == EventType.QUERY) {
                    QueryEventData queryEventData = event.getData();

                    Statement statement = null;

                    try {
                        //parse the query from event
                        statement = CCJSqlParserUtil.parse(queryEventData.getSql());
                    } catch (JSQLParserException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }

                    //get query type so we can cast statement to the appropriate class
                    String queryType = statement.getClass().getSimpleName().toUpperCase();
                    String tableName = null;

                    switch (queryType) {
                        case "INSERT":
                            Insert insertStatement = (Insert) statement;
                            tableName = insertStatement.getTable().getName();
                            break;
                        case "DELETE":
                            Delete deleteStatement = (Delete) statement;
                            tableName = deleteStatement.getTable().getName();
                            break;
                        case "UPDATE":
                            Update updateStatement = (Update) statement;
                            tableName = updateStatement.getTable().getName();
                            break;
                        default:
                            System.out.println(String.format("unsupported query to watch %s", queryType));
                    }

                    Map e = new HashMap();
                    e.put("queryType", queryType);
                    e.put("tableName", tableName);
                    System.out.println(e.toString());
                }
            }
        });
        client.connect();
    }

When you start it, it should give you this log.

1
2
May 12, 2024 9:07:30 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to localhost:3306 at mylog-bin.000001/622 (sid:65535, cid:33)

Now try to modify any table by doing either INSERT, UPDATE or DELETE.

If you see this line.

1
{queryType=INSERT, tableName=`user`}

Congratulations, you’ve successfully listened to your database changes.

In the next part, we are going to fetch the modified table and streamline it using web socket.

To be continued.