使用方式1
- 初始化 Canal连接 canal 服务:CanalConnectors.newSingleConnector(hostname,port,destination…)
- 定时获取canal的Message.getEntries,如果有数据 entries.size > 0,则为行数据变更操作;
- 根据事件类型(INSERT、UPDATE…),获取变更记录数据,解析处理;
引入canal POM依赖:
...
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
创建canal定时抓取canal事件类,注解@Component;
@Scheduled(fixedDelay = 100)
public void run() throws Exception {
long batchId = -1;
try {
int batchSize = 1000;
//依此从canal种取1000条数据
Message message = canalConnector.getWithoutAck(batchSize);
batchId = message.getId();
List<CanalEntry.Entry> entries = message.getEntries();
if (batchId != -1 && entries.size() > 0) {
for (CanalEntry.Entry entry : entries) {
//若是行记录更新
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
//解析处理
handleCanalEvent(entry);
}
}
}
canalConnector.ack(batchId);
} catch (Exception e) {
// 解决断线重新问题;
logger.error("CanalScheduling-run-001 发生异常: " + e.getMessage(), e);
try {
canalConnector = CanalConnectors.newSingleConnector(
new InetSocketAddress(canal.hostname, canal.port), canal.destination,
canal.userName, canal.passWord);
canalConnector.connect();
canalConnector.subscribe("dbname.tablename"); //订阅侦听的"数据库.表名"
if (batchId != -1) {
canalConnector.rollback(batchId);
} else {
canalConnector.rollback();
}
} catch (Exception ex) {
batchId = -1;
}
}
}
以上源码说明:
方法是用@Scheduled注解的,定时100毫秒执行一次;一开始canalConnector是没有初始的,首次执行会先报错到外 catch 进行初始化连接;
有偿试过,如果将 canalConnector 做为一个 @Bean 初化化连接,然后将 canalConnector 注入使用,如果 canal 重启断开,这个 @Bean 的canalConnector 就没再使用;
因此需要将 canalConnector 初始连接入到 定时方法里 (@Scheduled),但又不能每次在开始就重复连接次,因此将canalConnector的连接操作放到 catch,如果有异常,就给他重新连接下;
开始获取 withoutAck 的 Message 信息,每次获取 1000 条数据,判断下 message.getEntries 是否内容(entries.size() > 0);获取每个 entry;
再判断每个entry 是否行变更的entryType,如果是就放到处理方法里处理;
private void handleCanalEvent(CanalEntry.Entry entry) throws Exception {
//我们可以拿到binlog类型,是创建还是修改或者其它进而根据业务就行修改代码
//CanalEntry.EventType eventType = entry.getHeader().getEventType();
//
//获取数据库相关信息
String database = entry.getHeader().getSchemaName();
String table = entry.getHeader().getTableName();
database = database == null ? "": database.toLowerCase();
table = table == null ? "": table.toLowerCase();
logger.info("handleCanalEvent-001 当前数据库及表: database: " + database + "; table: " + table);
//判断是否要处理的数据库与表
if (!canal.database.equals(database) || !canal.table.equals(table)) {
return;
}
CanalEntry.RowChange change = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
CanalEntry.EventType eventType = change.getEventType();
for (CanalEntry.RowData rowData : change.getRowDatasList()) {
//获取变更主键id
List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
String primaryKey = "id";
CanalEntry.Column idColumn = columns.stream().filter(column -> column.getIsKey()
&& primaryKey.equals(column.getName())).findFirst().orElse(null);
//Map<String, Object> dataMap = parseColumnsToMap(columns);
//根据数据库,表,id信息重新从数据库捞数据构建索引
// add ==========================
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
if (eventType == CanalEntry.EventType.DELETE) {
parseColumnsToMap(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
parseColumnsToMap(rowData.getAfterColumnsList());
} else {
parseColumnsToMap(rowData.getBeforeColumnsList());
parseColumnsToMap(rowData.getAfterColumnsList());
}
// =========================
}
}
以上代码首先根据 CanalEntry.Entry 的入参,获取该entry的数据库及表名信息,判断是否要处理的数据库及表信息,当然这个可以上 canal 安装服务的 conf/example/instance.properties 上直接限制数据库及表名匹配;
再获取RowData,根据 eventType 获取相应的 rowData数据,如果Delete为getBeforeColumnList();
Map<String,Object> parseColumnsToMap(List<CanalEntry.Column> columns){
Map<String,Object> jsonMap = new HashMap<>();
columns.forEach(column -> {
if(column == null){
return;
}
jsonMap.put(column.getName(), column.getValue());
});
return jsonMap;
}
上面 List<CanalEntry.Column> columns 为某数据表行的所有列字段及值;
使用方式2
实现EntryHandler<数据表实体类>接口 来实现侦听数据行变更处理
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
@CanalTable("tablename") // 要侦听的物理表名
@Component
public class CanalBizTableHandler implements EntryHandler<BizTable> {
// 注入 BizTableDao
@Autowired
private BizTableDao bizTableDao;
@Override
public void insert(BizTableVO bizTable) {
bizTableDao.insert(bizTable);
}
}
这种看起来比较简单,但前提是源数据要保证 bizTable 所有字段都要有值,或类型正确,不然会报错,而且无法在这里前置判断字段值修改;
BizTable实体类:
@Data
public class BizTable {
@Id
private Long id;
@Column(name = "username")
private String username;
}