Skip to content

Commit a9ace25

Browse files
authored
Merge pull request #5 from CanalSharp/dev
update readme
2 parents bb8786c + 717811c commit a9ace25

File tree

5 files changed

+111
-11
lines changed

5 files changed

+111
-11
lines changed

README.md

Lines changed: 95 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,63 @@ CanalSharp 是阿里巴巴开源项目 Canal 的 .NET 客户端。为 .NET 开
1616

1717
关于 Canal 的更多信息请访问 https://github.com/alibaba/canal/wiki
1818

19-
## 二.如何使用
19+
## 二.应用场景
2020

21-
1.安装Canal
21+
CanalSharp作为Canal的客户端,其应用场景就是Canal的应用场景。关于应用场景在Canal介绍一节已有概述。举一些实际的使用例子:
22+
23+
1.代替使用轮询数据库方式来监控数据库变更,有效改善轮询耗费数据库资源。
24+
25+
2.根据数据库的变更实时更新搜索引擎,比如电商场景下商品信息发生变更,实时同步到商品搜索引擎 Elasticsearch、solr等
26+
27+
3.根据数据库的变更实时更新缓存,比如电商场景下商品价格、库存发生变更实时同步到redis
28+
29+
4.数据库异地备份、数据同步
30+
31+
5.根据数据库变更触发某种业务,比如电商场景下,创建订单超过xx时间未支付被自动取消,我们获取到这条订单数据的状态变更即可向用户推送消息。
32+
33+
6.将数据库变更整理成自己的数据格式发送到kafka等消息队列,供消息队列的消费者进行消费。
34+
35+
## 三.工作原理
36+
37+
CanalSharp 是 Canal 的 .NET 客户端,它与 Canal 是采用的Socket来进行通信的,传输协议是TCP,交互协议采用的是 Google Protocol Buffer 3.0。
38+
39+
## 四.工作流程
40+
41+
1.Canal连接到mysql数据库,模拟slave
42+
43+
2.CanalSharp与Canal建立连接
44+
45+
2.数据库发生变更写入到binlog
46+
47+
5.Canal向数据库发送dump请求,获取binlog并解析
48+
49+
4.CanalSharp向Canal请求数据库变更
50+
51+
4.Canal发送解析后的数据给CanalSharp
52+
53+
5.CanalSharp收到数据,消费成功,发送回执。(可选)
54+
55+
6.Canal记录消费位置。
56+
57+
以一张图来表示:
58+
59+
![1537860226808](assets/668104-20180925182816462-2110152563.png)
60+
61+
## 五.快速入门
62+
63+
### 1.安装Canal
2264

2365
Canal的安装以及配置使用请查看 https://github.com/alibaba/canal/wiki/QuickStart
2466

25-
2.建立一个.NET Core App项目
67+
### 2.建立一个.NET Core 控制台项目
2668

27-
3.为该项目从 Nuget 安装 CanalSharp
69+
### 3.为该项目从 Nuget 安装 CanalSharp
2870

2971
````shell
3072
Install-Package CanalSharp.Client
3173
````
3274

33-
4.建立与Canal的连接
75+
### 4.建立与Canal的连接
3476

3577
````csharp
3678
//canal 配置的 destination,默认为 example
@@ -41,15 +83,59 @@ var connector = CanalConnectors.NewSingleConnector("127.0.0.1", 11111, destinati
4183
connector.Connect();
4284
//订阅,同时传入Filter,如果不传则以Canal的Filter为准。Filter是一种过滤规则,通过该规则的表数据变更才会传递过来
4385
connector.Subscribe(".*\\\\..*");
44-
//获取消息并且需要发送Ack表示消费成功
86+
//获取数据但是不需要发送Ack来表示消费成功
4587
connector.Get(batchSize);
46-
//获取消息但是不需要发送Ack来表示消费成功
88+
//获取数据并且需要发送Ack表示消费成功
4789
connector.GetWithoutAck(batchSize);
4890
````
4991

50-
更多详情请查看 Sample
92+
更多详情请查看 [Sample](https://github.com/CanalSharp/CanalSharp/tree/master/sample/CanalSharp.SimpleClient)
93+
94+
## 六.通过docker方式快速运行CanalSharp
95+
96+
### 1.执行命令通过docker方式运行 mysql与canal
97+
98+
````shell
99+
git clone https://github.com/CanalSharp/CanalSharp.git
100+
cd docker
101+
docker-compose up -d
102+
````
103+
104+
### 2.使用navicat等数据库管理工具连接mysql
105+
106+
ip:运行docker的服务器ip
107+
108+
mysql用户:root
109+
110+
mysql密码:000000
111+
112+
mysql端口:4406
113+
114+
默认提供了一个test数据库,然后有一张名为test的表。
115+
116+
![1537866852816](assets/668104-20180925182815646-1209020640.png)
117+
118+
### 3.运行Sample项目
119+
120+
### 4.测试
121+
122+
执行下列sql:
123+
124+
````sql
125+
insert into test values(1000,'111');
126+
update test set name='222' where id=1000;
127+
delete from test where id=1000;
128+
````
129+
130+
![](assets/ys.gif)
131+
132+
可以看见我们分别执行 insert、update、delete 语句,我们的CanalSharp都获取到了数据库变更。
133+
134+
## 七.接下来的工作
135+
136+
CanalSharp集群支持
51137

52-
## .贡献代码
138+
## .贡献代码
53139

54140
1.fork本项目
55141

7.66 KB
Loading
16.9 KB
Loading

assets/ys.gif

301 KB
Loading

sample/CanalSharp.SimpleClient/Program.cs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ static void Main(string[] args)
2121
connector.Subscribe(".*\\\\..*");
2222
while (true)
2323
{
24-
//获取消息数据
25-
var message = connector.Get(5000);
24+
//获取数据 1024表示数据大小 单位为字节
25+
var message = connector.Get(1024);
26+
//批次id 可用于回滚
2627
var batchId = message.Id;
2728
if (batchId == -1 || message.Entries.Count <= 0)
2829
{
@@ -34,6 +35,10 @@ static void Main(string[] args)
3435
}
3536
}
3637

38+
/// <summary>
39+
/// 输出数据
40+
/// </summary>
41+
/// <param name="entrys">一个entry表示一个数据库变更</param>
3742
private static void PrintEntry(List<Entry> entrys)
3843
{
3944
foreach (var entry in entrys)
@@ -47,6 +52,7 @@ private static void PrintEntry(List<Entry> entrys)
4752

4853
try
4954
{
55+
//获取行变更
5056
rowChange = RowChange.Parser.ParseFrom(entry.StoreValue);
5157
}
5258
catch (Exception e)
@@ -56,10 +62,13 @@ private static void PrintEntry(List<Entry> entrys)
5662

5763
if (rowChange != null)
5864
{
65+
//变更类型 insert/update/delete 等等
5966
EventType eventType = rowChange.EventType;
67+
//输出binlog信息 表名 数据库名 变更类型
6068
Console.WriteLine(
6169
$"================> binlog[{entry.Header.LogfileName}:{entry.Header.LogfileOffset}] , name[{entry.Header.SchemaName},{entry.Header.TableName}] , eventType :{eventType}");
6270

71+
//输出 insert/update/delete 变更类型列数据
6372
foreach (var rowData in rowChange.RowDatas)
6473
{
6574
if (eventType == EventType.Delete)
@@ -83,10 +92,15 @@ private static void PrintEntry(List<Entry> entrys)
8392
}
8493
}
8594

95+
/// <summary>
96+
/// 输出每个列的详细数据
97+
/// </summary>
98+
/// <param name="columns"></param>
8699
private static void PrintColumn(List<Column> columns)
87100
{
88101
foreach (var column in columns)
89102
{
103+
//输出列明 列值 是否变更
90104
Console.WriteLine($"{column.Name}{column.Value} update= {column.Updated}");
91105
}
92106
}

0 commit comments

Comments
 (0)