Cấu Hình Debezium Kết Nối Với SQL Server Bằng Docker - Techmaster
Có thể bạn quan tâm
Bài viết áp dụng cho hệ điều hành Ubuntu
1. Cài đặt Docker
Đầu tiên chạy 4 lệnh sau: sudo apt-get update sudo apt-get install \ apt-transport-https \ ca-certificates \ curl \ gnupg-agent \ software-properties-common curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add - sudo add-apt-repository \ "deb [arch=amd64] https://download.docker.com/linux/ubuntu \ $(lsb_release -cs) \ stable"
Sau đó chạy tiếp: sudo apt-get update sudo apt-get install docker-ce docker-ce-cli containerd.io
Sau đó, kiểm tra xem đã cài được Docker chưa bằng lệnh: sudo docker run hello-world
Cuối cùng, chạy các lệnh sau để có thể chạy các lệnh docker mà không cần sudo: sudo groupadd docker sudo usermod -aG docker $USER
Nhớ tắt terminal đi rồi bật lại, sau đó chạy lại lệnh sau để kiểm tra: docker run hello-world
2. Cài đặt docker-compose
Lần lượt chạy từng lệnh sau: sudo curl -L "https://github.com/docker/compose/releases/download/1.26.0/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose sudo chmod +x /usr/local/bin/docker-compose
Kiểm tra đã cài thành công chưa bằng lệnh: docker-compose --version
3. Tạo file docker-compose-sqlserver.yaml
Tạo file docker-compose-sqlserver.yaml bằng lệnh: touch docker-compose-sqlserver.yaml
Sau đó mở trình soạn thảo nano (hoặc vim), copy paste nội dung sau: version: '2' services: zookeeper: image: debezium/zookeeper:${DEBEZIUM_VERSION} ports: - 2181:2181 - 2888:2888 - 3888:3888 kafka: image: debezium/kafka:${DEBEZIUM_VERSION} ports: - 9092:9092 links: - zookeeper environment: - ZOOKEEPER_CONNECT=zookeeper:2181 sqlserver: image: microsoft/mssql-server-linux:2017-CU9-GDR2 ports: - 1433:1433 environment: - ACCEPT_EULA=Y - MSSQL_PID=Standard - SA_PASSWORD=Password! - MSSQL_AGENT_ENABLED=true connect: image: debezium/connect:${DEBEZIUM_VERSION} ports: - 8083:8083 links: - kafka - sqlserver environment: - BOOTSTRAP_SERVERS=kafka:9092 - GROUP_ID=1 - CONFIG_STORAGE_TOPIC=my_connect_configs - OFFSET_STORAGE_TOPIC=my_connect_offsets - STATUS_STORAGE_TOPIC=my_connect_statuses
4. Khởi chạy docker-compose
Lần lượt chạy các lệnh sau: export DEBEZIUM_VERSION=1.1 docker-compose -f docker-compose-sqlserver.yaml up -d
Kiểm tra xem các docker container đã chạy hay chưa bằng lệnh: docker ps
5. Mockup dữ liệu
Đầu tiên, tạo file inventory.sql với nội dung như sau: -- Create the test database CREATE DATABASE testDB; GO USE testDB; EXEC sys.sp_cdc_enable_db; -- Create and populate our products using a single insert with many rows CREATE TABLE products ( id INTEGER IDENTITY(101,1) NOT NULL PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512), weight FLOAT ); INSERT INTO products(name,description,weight) VALUES ('scooter','Small 2-wheel scooter',3.14); INSERT INTO products(name,description,weight) VALUES ('car battery','12V car battery',8.1); INSERT INTO products(name,description,weight) VALUES ('12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8); INSERT INTO products(name,description,weight) VALUES ('hammer','12oz carpenter''s hammer',0.75); INSERT INTO products(name,description,weight) VALUES ('hammer','14oz carpenter''s hammer',0.875); INSERT INTO products(name,description,weight) VALUES ('hammer','16oz carpenter''s hammer',1.0); INSERT INTO products(name,description,weight) VALUES ('rocks','box of assorted rocks',5.3); INSERT INTO products(name,description,weight) VALUES ('jacket','water resistent black wind breaker',0.1); INSERT INTO products(name,description,weight) VALUES ('spare tire','24 inch spare tire',22.2); EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'products', @role_name = NULL, @supports_net_changes = 0; -- Create and populate the products on hand using multiple inserts CREATE TABLE products_on_hand ( product_id INTEGER NOT NULL PRIMARY KEY, quantity INTEGER NOT NULL, FOREIGN KEY (product_id) REFERENCES products(id) ); INSERT INTO products_on_hand VALUES (101,3); INSERT INTO products_on_hand VALUES (102,8); INSERT INTO products_on_hand VALUES (103,18); INSERT INTO products_on_hand VALUES (104,4); INSERT INTO products_on_hand VALUES (105,5); INSERT INTO products_on_hand VALUES (106,0); INSERT INTO products_on_hand VALUES (107,44); INSERT INTO products_on_hand VALUES (108,2); INSERT INTO products_on_hand VALUES (109,5); EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'products_on_hand', @role_name = NULL, @supports_net_changes = 0; -- Create some customers ... CREATE TABLE customers ( id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL UNIQUE ); INSERT INTO customers(first_name,last_name,email) VALUES ('Sally','Thomas','[email protected]'); INSERT INTO customers(first_name,last_name,email) VALUES ('George','Bailey','[email protected]'); INSERT INTO customers(first_name,last_name,email) VALUES ('Edward','Walker','[email protected]'); INSERT INTO customers(first_name,last_name,email) VALUES ('Anne','Kretchmar','[email protected]'); EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 0; -- Create some very simple orders CREATE TABLE orders ( id INTEGER IDENTITY(10001,1) NOT NULL PRIMARY KEY, order_date DATE NOT NULL, purchaser INTEGER NOT NULL, quantity INTEGER NOT NULL, product_id INTEGER NOT NULL, FOREIGN KEY (purchaser) REFERENCES customers(id), FOREIGN KEY (product_id) REFERENCES products(id) ); INSERT INTO orders(order_date,purchaser,quantity,product_id) VALUES ('16-JAN-2016', 1001, 1, 102); INSERT INTO orders(order_date,purchaser,quantity,product_id) VALUES ('17-JAN-2016', 1002, 2, 105); INSERT INTO orders(order_date,purchaser,quantity,product_id) VALUES ('19-FEB-2016', 1002, 2, 106); INSERT INTO orders(order_date,purchaser,quantity,product_id) VALUES ('21-FEB-2016', 1003, 1, 107); EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'orders', @role_name = NULL, @supports_net_changes = 0; GO
Sau đó, sử dụng lệnh docker ps để lấy ra được ID của container chạy SQL Server:
Sau đó, chạy lệnh sau để mockup dữ liệu test: cat inventory.sql | docker exec -i 63bfd9e2cfe5 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
Lưu ý: 63bfd9e2cfe5 là ID của container chạy SQL Server trên máy mình, còn trên máy các bạn sẽ là một giá trị khác, các bạn nhớ thay ID cho đúng nhé
6. Kết nối Debezium với SQL Server
Tạo file register-sqlserver.json với nội dung sau: { "name": "inventory-connector", "config": { "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector", "tasks.max" : "1", "database.server.name" : "server1", "database.hostname" : "sqlserver", "database.port" : "1433", "database.user" : "sa", "database.password" : "Password!", "database.dbname" : "testDB", "database.history.kafka.bootstrap.servers" : "kafka:9092", "database.history.kafka.topic": "schema-changes.inventory" } }
Sau đó chạy lệnh: curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json
7. Lắng nghe message từ tất cả các topic
Chạy lệnh sau: docker-compose -f docker-compose-sqlserver.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \ --bootstrap-server kafka:9092 \ --from-beginning \ --property print.key=true \ --whitelist '.*'
8. INSERT - UPDATE - DELETE dữ liệu trong SQL Server và kiểm tra xem có nhận được message không
Mở một terminal khác, sau đó chạy lệnh sau: docker-compose -f docker-compose-sqlserver.yaml exec sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD -d testDB'
Thử INSERT bản ghi mới vào bảng products bằng cách lần lượt chạy 2 lệnh sau (nhớ là phải chạy lệnh GO nhé): INSERT INTO products(name,description,weight) VALUES ('Marcus Rashford','Striker', 70); GO
Sau khi INSERT xong, mở terminal còn lại ( đang chạy Debezium ) lên để kiểm tra xem có nhận được message thông báo thay đổi dữ liệu trong bảng không:
Thử INSERT bản ghi khác vào bảng customers: INSERT INTO customers(first_name,last_name,email) VALUES ('Alex','Ferguson','[email protected]'); GO
Kiểm tra xem có nhận được message không:
Thử DELETE bản ghi vừa thêm mới vào bảng products xem sao: DELETE from products WHERE name = 'Marcus Rashford'; GO
Kiểm tra xem có nhận được message không:
Như vậy là chúng ta đã cấu hình xong Debezium kết nối tới SQL Server sử dụng Docker. Chúc các bạn thành công
Đừng quên tham khảo các khóa học hay tại Techmaster nhé ✌️✌️✌️✌️✌️✌️
Từ khóa » Debezium Là Gì
-
Thay đổi Thu Thập Dữ Liệu Với Debezium: Hướng Dẫn đơn Giản, Phần 1
-
Tutorial :: Debezium Documentation
-
010: Apache Kafka Connect Concept - Viblo
-
Hướng Dẫn Cài đặt Và ứng Dụng Kafka Connect - VTS Engineering
-
MySQL CDC With Apache Kafka And Debezium - Clairvoyant
-
Có Bạn Nào Trong Group Mình đang Sử Dụng Kĩ Thuật CDC (Change ...
-
#157 - A Year And A Half With Debezium: CDC With MySQL | Revue
-
[Kafka-connect] Streaming The Data Of MySQL ... - NimTechnology
-
System Design Cơ Bản: Message Broker - TopDev
-
Thắc Mắc - Cách Sử Dụng Hệ Thống Data Change Capture (CDC) Cho ...
-
Thắc Mắc - Query Vào Database Liên Tục | Page 3 | TheNEXTvoz
-
Giải Quyết Các Vấn đề Tích Hợp Bằng MySql Binlog — P2
-
Cách Tốt Nhất để Lấy Những Thay đổi Do Trigger Trong Database
-
Phân Loại Giữa Các Hệ Thống Message Queue | By Anh Le - Medium
-
Kafka Connect - Confluent Documentation
-
Debezium/connect - Docker Image