[Kafka-connect] Streaming The Data Of MySQL ... - NimTechnology
Có thể bạn quan tâm
Contents
- 1) Install and configurate mysql to use to the kafka-connect Lab.
- 2) Config kafka-connect.
- 2.1) Repairing
- 2.2) Action config
- 2.3) Add a new column inside a table that is streaming.
- 2.4) Create a new table
- 2.5) Adding the old tables but before that it wasn’t added into connector.
- 3) Note.
1) Install and configurate mysql to use to the kafka-connect Lab.
Run docker MySQL:
docker run -p 3306:3306 --name mastermysql -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7 docker exec -it mastermysql bash apt update -y apt install vim -y cd /etc/mysql/giờ sửa file: mysql.cnf
vi mysql.cnf [mysqld] server-id = 42 log_bin = mysql-bin binlog_format = row expire_logs_days = 10xong rồi thoát ra restart docker
docker restart mastermysql docker exec -it mastermysql bash mysqladmin variables -uroot -p|grep log_bin
In order for Debezium MySQL to track all the changes in bin-log, it needs a user with SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT permissions. Grant them to the user you intend to use for Debezium.
giờ chúng ta tạo 1 User để cho kafka-connect.ở dưới là user: debezium và pass là: 123456
>>>Mysql 5.7 mysql -uroot -p GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY '123456'; >>> Mysql 8 docker run -p 3306:3306 --name mastermysql -e MYSQL_ROOT_PASSWORD=123456 -d mysql:8.0 CREATE USER 'debezium'@'%' IDENTIFIED BY '123456'; GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';Xong rồi mình tạo database và table như hình bên dươi.

2) Config kafka-connect.
2.1) Repairing
Mình đã hướng dẫn càc bạn cài các components sau:
Kafka:https://nimtechnology.com/2022/02/13/kafka-install-kafka-and-zookeeper-cluster-on-kubernetes/Hoặc starting kafka và zookeeper bằng command lại để lab.https://nimtechnology.com/2022/01/30/kafka-zookeeper-starting-kafka-and-zookeeper/
Kafka-connect:https://nimtechnology.com/2022/02/09/kafka-connect-install-kafka-connect-on-kubernetes-through-helm-chart/
Schema registry:https://nimtechnology.com/2022/02/13/kafka-installing-schema-registry-to-use-for-the-kafka-and-kafka-connect-model/
Lense nữa:https://nimtechnology.com/2022/02/09/kafka-connect-install-lenses-to-manage-kafka/cái này thì cần lincense.Bạn có thể dụng câu lệnh curl trực tiếp vào kafka connect nhé. Command sao mình chỉ sau he.
Reference Links:https://dev-yeye.tistory.com/35?category=904307
Bạn chuẩn bị đầu đủ các phần trên nhé
2.2) Action config
Giờ login vào lenses.tạo connector


Vậy CDC là gì?
Đầu tiên cdc là Change Data Capture (CDC)Ở bước tạo mysql mình có tạo user debezium để nó thể đọc được log_bin, nghĩa là các thay đổi trong database.nếu bạn làm mysql master-slave rồi thì thấy cách cdc này nó cũng như thế.
reference links for master:https://developer.confluent.io/learn-kafka/data-pipelines/kafka-data-ingestion-with-cdc/
Nếu lenses của các bạn ko có icon giống mình thì đọc lại bài kafka-connect và build lại image nhé.Links add plugin for kafka connect

Config sau là dành cho mysql 5.7
connector.class=io.debezium.connector.mysql.MySqlConnector database.user=debezium database.server.id=223344 database.history.kafka.topic=dbhistory.voting_contra.v2 database.history.kafka.bootstrap.servers=kafka:9092 database.server.name=connect_wordpress_note database.port=3306 include.schema.changes=false table.whitelist=wordpress_note.students database.hostname=192.168.101.27 database.password=*********** name=cdc-wordpress-note-connector database.whitelist=wordpress_noteConfig sau là dành cho mysql 8.0
connector.class=io.debezium.connector.mysql.MySqlConnector database.allowPublicKeyRetrieval=true database.user=debezium database.server.id=223344 database.history.kafka.topic=dbhistory.school.v1 database.history.kafka.bootstrap.servers=kafka:9092 database.server.name=connect_school database.port=3306 include.schema.changes=false table.whitelist=school.students database.hostname=192.168.101.27 database.password=*********** name=cdc-school-connector database.whitelist=schoolNote config with mysql 8.xBạn sẽ gặp lỗi này “Public Key Retrieval Is Not Allowed” hay đọc bài viết bên dứoihttps://rmoff.net/2019/10/23/debezium-mysql-v8-public-key-retrieval-is-not-allowed/
curl -i -X PUT -H "Accept:application/json" \ -H "Content-Type:application/json" http://localhost:8083/connectors/source-debezium-mysql-00/config \ -d '{ "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "42", "database.allowPublicKeyRetrieval":"true", <<<==== add this line to FIX Error "database.server.name": "asgard", "table.whitelist": "demo.customers", "database.history.kafka.bootstrap.servers": "kafka:29092", "database.history.kafka.topic": "asgard.dbhistory.demo" , "include.schema.changes": "true" }'Explain the configurations
name: đơn giản là name của connector đó.
connector.classcái này chỗ khai báo các plugin của chúng ta.
database.port: Port của DBdatabase.hostname: IP hoặc hostname của DBdatabase.user: username mà debezium để đọc log_bindatabase.password: password của user trên
database.server.id: số này bạn cứ để số nào cũng được nhưng duy nhất nhé (unique) trên cluster kafka, Nó chỉ có ý nghĩa trên kafka thôi
database.whitelist: đây là cái database bạn muốn stream datatable.whitelist: đây là table mà bạn muốn stream data
database.server.nametable.whitelistvới cặp ở trên mình thấy nó có liên quan đến Schema registry

database.history.kafka.topicchỗ này mình thấy tạo ra 1 topic như sau:


database.history.kafka.bootstrap.servers: cái này để bạn để IP và port của kafka
Các bạn hỏi ủa rồi nó có stream data trong table đó vào topic hem????

Bạn sẽ thấy là trong topic này đã có 3 record nghĩ là trong table của cũng đang có 3 row.

giờ mình sẽ thêm 1 row data nữa.


Mình xin phép add thêm ảnh của case khác để các bạn tham khảo:

2.3) Add a new column inside a table that is streaming.
Giờ cũng table đó mình add thêm 1 column

thì mình thấy là e.history.kafka.topic với topic tên là dbhistory.voting_contra.v2 đã tăng 1 record

Giờ thêm data vào table;


2.4) Create a new table

Vậy giờ chúng ta edit config conector như thế nào?

Bạn thấy mình sửa như thế này.

Khi mình edit và save lại thì thấy có lỗi.


Mình đã dò google:https://gitter.im/debezium/dev/archives/2019/01/26
Jiri Pechanec @jpechane Jan 26 2019 13:18 UTC@rajan-g Hi, please look at https://debezium.io/docs/connectors/mysql/ and search for schema_onyl_recovery string

Note nho nhỏ:Nếu ở lần đầu tiên create connector mà bạn chơi:snapshot.mode=schema_only_recoverythì sẽ bị báo lỗi

file đã nhiều hơn. mà mình ko thấy lỗi nữa



chưa thấy tạo topic data.



2.5) Adding the old tables but before that it wasn’t added into connector.
Mình gặp trường hợp trong database của mình có 2 table:

Tiếp đên mình tạo connector với table company và với config như bên dưới
connector.class=io.debezium.connector.mysql.MySqlConnector database.user=debezium database.server.id=11 database.history.kafka.topic=_dbhistory.nimtechnology.v1 database.history.kafka.bootstrap.servers=kafka:9092 database.server.name=connect_nimtechnology database.port=3306 table.whitelist=wordpress_note.company database.hostname=192.168.101.27 database.password=123456 name=source.mysqldebezium.nimtechnology.v1 database.whitelist=wordpress_note include.schema.changes=false

Giờ mình stream (CDC) thêm table students
connector.class=io.debezium.connector.mysql.MySqlConnector database.user=debezium database.server.id=11 database.history.kafka.topic=_dbhistory.nimtechnology.v2 database.history.kafka.bootstrap.servers=kafka:9092 database.server.name=connect_nimtechnology database.port=3306 table.whitelist=wordpress_note.company,wordpress_note.students database.hostname=192.168.101.27 database.password=123456 name=source.mysqldebezium.nimtechnology.v1 database.whitelist=wordpress_note include.schema.changes=false snapshot.mode=schema_only_recoveryMình đã sửa database.history.kafka.topic và add thêm table students vào line “table.whitelist”
mình thấy tạo them topic history nhưng ko tạo topic Data students

Bạn phài insert data mới vào table students thì kafka connect mới tao topic


Mình show data có trong table.

3) Note.
nó còn 1 phần mình thấy hay được sài đó là:
Event Message Flattening with Single Message Transformhttps://rmoff.net/2018/03/24/streaming-data-from-mysql-into-kafka-with-kafka-connect-and-debezium/
mình sẽ tìm hiêu và trình bày sau.
Command check mysql:
mysql -u {user} -p'{pw}' -h {host} -P {port} -D {db} -e "SHOW GRANTS"Từ khóa » Debezium Là Gì
-
Thay đổi Thu Thập Dữ Liệu Với Debezium: Hướng Dẫn đơn Giản, Phần 1
-
Cấu Hình Debezium Kết Nối Với SQL Server Bằng Docker - Techmaster
-
Tutorial :: Debezium Documentation
-
010: Apache Kafka Connect Concept - Viblo
-
Hướng Dẫn Cài đặt Và ứng Dụng Kafka Connect - VTS Engineering
-
MySQL CDC With Apache Kafka And Debezium - Clairvoyant
-
Có Bạn Nào Trong Group Mình đang Sử Dụng Kĩ Thuật CDC (Change ...
-
#157 - A Year And A Half With Debezium: CDC With MySQL | Revue
-
System Design Cơ Bản: Message Broker - TopDev
-
Thắc Mắc - Cách Sử Dụng Hệ Thống Data Change Capture (CDC) Cho ...
-
Thắc Mắc - Query Vào Database Liên Tục | Page 3 | TheNEXTvoz
-
Giải Quyết Các Vấn đề Tích Hợp Bằng MySql Binlog — P2
-
Cách Tốt Nhất để Lấy Những Thay đổi Do Trigger Trong Database
-
Phân Loại Giữa Các Hệ Thống Message Queue | By Anh Le - Medium
-
Kafka Connect - Confluent Documentation
-
Debezium/connect - Docker Image