说明:
instance模块:
canal client可以是消息队列
EventStore负责存储解析后的Binlog事件,而解析动作负责拉取Binlog,它的流程比较复杂。需要和MetaManager进行交互。 比如要记录每次拉取的Position,这样下一次就可以从上一次的最后一个位置继续拉取。所以MetaManager应该是有状态的。
EventParser的流程如下:
com.alibaba.otter.canal.parse.inbound.mysql.MysqlMultiStageCoprocessor.SinkStoreStage#onEvent
public void onEvent(MessageEvent event, long sequence, boolean endOfBatch) throws Exception {
try {
if (event.getEntry() != null) {
transactionBuffer.add(event.getEntry());
}
LogEvent logEvent = event.getEvent();
if (connection instanceof MysqlConnection && logEvent.getSemival() == 1) {
// semi ack回报
((MysqlConnection) connection).sendSemiAck(logEvent.getHeader().getLogFileName(),
logEvent.getHeader().getLogPos());
}
// clear for gc
event.setBuffer(null);
event.setEvent(null);
event.setTable(null);
event.setEntry(null);
event.setNeedDmlParse(false);
} catch (Throwable e) {
exception = new CanalParseException(e);
throw exception;
}
}
也是通过数组进行数据交换
private int bufferSize = 1024;
private AtomicLong putSequence = new AtomicLong(INIT_SQEUENCE); // 代表当前put操作最后一次写操作发生的位置
private AtomicLong flushSequence = new AtomicLong(INIT_SQEUENCE); // 代表满足flush条件后最后一次数据flush的时间
com.alibaba.otter.canal.parse.inbound.EventTransactionBuffer#add(com.alibaba.otter.canal.protocol.CanalEntry.Entry)
public void add(CanalEntry.Entry entry) throws InterruptedException {
switch (entry.getEntryType()) {
case TRANSACTIONBEGIN:
flush();// 刷新上一次的数据
put(entry);
break;
case TRANSACTIONEND:
put(entry);
flush();
break;
case ROWDATA:
put(entry);
// 针对非DML的数据,直接输出,不进行buffer控制
EventType eventType = entry.getHeader().getEventType();
if (eventType != null && !isDml(eventType)) {
flush();
}
break;
case HEARTBEAT:
// master过来的heartbeat,说明binlog已经读完了,是idle状态
put(entry);
flush();
break;
default:
break;
}
}
消费数组里的数据
com.alibaba.otter.canal.parse.inbound.EventTransactionBuffer#flush
private void flush() throws InterruptedException {
long start = this.flushSequence.get() + 1;
long end = this.putSequence.get();
if (start <= end) {
List<CanalEntry.Entry> transaction = new ArrayList<CanalEntry.Entry>();
for (long next = start; next <= end; next++) {
transaction.add(this.entries[getIndex(next)]);
}
flushCallback.flush(transaction);
flushSequence.set(end);// flush成功后,更新flush位置
}
}
public void flush(List<CanalEntry.Entry> transaction) throws InterruptedException {
boolean successed = consumeTheEventAndProfilingIfNecessary(transaction);
if (!running) {
return;
}
if (!successed) {
throw new CanalParseException("consume failed!");
}
// 消费数组的binlog成功,记录消费位点
LogPosition position = buildLastTransactionPosition(transaction);
if (position != null) { // 可能position为空
// 持久化到内存
logPositionManager.persistLogPosition(AbstractEventParser.this.destination, position);
}
}
com.alibaba.otter.canal.parse.inbound.AbstractEventParser#consumeTheEventAndProfilingIfNecessary
protected boolean consumeTheEventAndProfilingIfNecessary(List<CanalEntry.Entry> entrys) throws CanalSinkException,
InterruptedException {
long startTs = -1;
boolean enabled = getProfilingEnabled();
if (enabled) {
startTs = System.currentTimeMillis();
}
boolean result = eventSink.sink(entrys, (runningInfo == null) ? null : runningInfo.getAddress(), destination);
if (enabled) {
this.processingInterval = System.currentTimeMillis() - startTs;
}
if (consumedEventCount.incrementAndGet() < 0) {
consumedEventCount.set(0);
}
return result;
}
把事件保存在事件内存数组里
com.alibaba.otter.canal.parse.inbound.AbstractEventParser#consumeTheEventAndProfilingIfNecessary
com.alibaba.otter.canal.sink.entry.EntryEventSink#sink
com.alibaba.otter.canal.sink.entry.EntryEventSink#sinkData
private boolean sinkData(List<CanalEntry.Entry> entrys, InetSocketAddress remoteAddress)
throws InterruptedException {
boolean hasRowData = false;
boolean hasHeartBeat = false;
List<Event> events = new ArrayList<Event>();
for (CanalEntry.Entry entry : entrys) {
if (!doFilter(entry)) {
continue;
}
if (filterTransactionEntry
&& (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND)) {
long currentTimestamp = entry.getHeader().getExecuteTime();
// 基于一定的策略控制,放过空的事务头和尾,便于及时更新数据库位点,表明工作正常
if (lastTransactionCount.incrementAndGet() <= emptyTransctionThresold
&& Math.abs(currentTimestamp - lastTransactionTimestamp) <= emptyTransactionInterval) {
continue;
} else {
lastTransactionCount.set(0L);
lastTransactionTimestamp = currentTimestamp;
}
}
hasRowData |= (entry.getEntryType() == EntryType.ROWDATA);
hasHeartBeat |= (entry.getEntryType() == EntryType.HEARTBEAT);
Event event = new Event(new LogIdentity(remoteAddress, -1L), entry, raw);
events.add(event);
}
if (hasRowData || hasHeartBeat) {
// 存在row记录 或者 存在heartbeat记录,直接跳给后续处理
return doSink(events);
} else {
// 需要过滤的数据
if (filterEmtryTransactionEntry && !CollectionUtils.isEmpty(events)) {
long currentTimestamp = events.get(0).getExecuteTime();
// 基于一定的策略控制,放过空的事务头和尾,便于及时更新数据库位点,表明工作正常
if (Math.abs(currentTimestamp - lastEmptyTransactionTimestamp) > emptyTransactionInterval
|| lastEmptyTransactionCount.incrementAndGet() > emptyTransctionThresold) {
lastEmptyTransactionCount.set(0L);
lastEmptyTransactionTimestamp = currentTimestamp;
return doSink(events);
}
}
// 直接返回true,忽略空的事务头和尾
return true;
}
}
com.alibaba.otter.canal.sink.entry.EntryEventSink#doSink
如果不成功,函数不会返回,一直阻塞在这里
protected boolean doSink(List<Event> events) {
for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {
events = handler.before(events);
}
long blockingStart = 0L;
int fullTimes = 0;
do {
if (eventStore.tryPut(events)) {
if (fullTimes > 0) {
eventsSinkBlockingTime.addAndGet(System.nanoTime() - blockingStart);
}
for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {
events = handler.after(events);
}
return true;
} else {
if (fullTimes == 0) {
blockingStart = System.nanoTime();
}
applyWait(++fullTimes);
if (fullTimes % 100 == 0) {
long nextStart = System.nanoTime();
eventsSinkBlockingTime.addAndGet(nextStart - blockingStart);
blockingStart = nextStart;
}
}
for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {
events = handler.retry(events);
}
// 如果put失败,则死循环
} while (running && !Thread.interrupted());
return false;
}
// 超过3次,阻塞,3次以内,让出时间片yield,感觉没什么必要
private void applyWait(int fullTimes) {
int newFullTimes = fullTimes > maxFullTimes ? maxFullTimes : fullTimes;
if (fullTimes <= 3) { // 3次以内
Thread.yield();
} else { // 超过3次,最多只sleep 10ms
LockSupport.parkNanos(1000 * 1000L * newFullTimes);
}
}
将数据放到内容,没有持久化
实现参考了Disruptor,但tryPut有加锁,disruptor是无锁队列
事件数组大小bufferSize=16 * 1024;
com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer#tryPut(java.util.List
public boolean tryPut(List<Event> data) throws CanalStoreException {
if (data == null || data.isEmpty()) {
return true;
}
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
if (!checkFreeSlotAt(putSequence.get() + data.size())) {
return false;
} else {
doPut(data);
return true;
}
} finally {
lock.unlock();
}
}
private void doPut(List<Event> data) {
long current = putSequence.get();
long end = current + data.size();
// 先写数据,再更新对应的cursor,并发度高的情况,putSequence会被get请求可见,拿出了ringbuffer中的老的Entry值
for (long next = current + 1; next <= end; next++) {
entries[getIndex(next)] = data.get((int) (next - current - 1));
}
putSequence.set(end);
// 记录一下gets memsize信息,方便快速检索
if (batchMode.isMemSize()) {
long size = 0;
for (Event event : data) {
size += calculateSize(event);
}
putMemSize.getAndAdd(size);
}
profiling(data, OP.PUT);
// tell other threads that store is not empty
notEmpty.signal();
}
EventStore是一个RingBuffer,有三个指针:Put、Get、Ack。
Ack: 消费者消费完成,Ack增加。并且会删除Put中已经被Ack的数据
保存代码在上面EventSink那段
get代码如下
com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer#get(com.alibaba.otter.canal.protocol.position.Position, int)
public Events<Event> get(Position start, int batchSize) throws InterruptedException, CanalStoreException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
// 阻塞
while (!checkUnGetSlotAt((LogPosition) start, batchSize))
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
return doGet(start, batchSize);
} finally {
lock.unlock();
}
}
private Events<Event> doGet(Position start, int batchSize) throws CanalStoreException {
LogPosition startPosition = (LogPosition) start;
long current = getSequence.get();
long maxAbleSequence = putSequence.get();
long next = current;
long end = current;
// 如果startPosition为null,说明是第一次,默认+1处理
if (startPosition == null || !startPosition.getPostion().isIncluded()) { // 第一次订阅之后,需要包含一下start位置,防止丢失第一条记录
next = next + 1;
}
if (current >= maxAbleSequence) {
return new Events<Event>();
}
Events<Event> result = new Events<Event>();
List<Event> entrys = result.getEvents();
long memsize = 0;
if (batchMode.isItemSize()) {
end = (next + batchSize - 1) < maxAbleSequence ? (next + batchSize - 1) : maxAbleSequence;
// 提取数据并返回
for (; next <= end; next++) {
Event event = entries[getIndex(next)];
if (ddlIsolation && isDdl(event.getEventType())) {
// 如果是ddl隔离,直接返回
if (entrys.size() == 0) {
entrys.add(event);// 如果没有DML事件,加入当前的DDL事件
end = next; // 更新end为当前
} else {
// 如果之前已经有DML事件,直接返回了,因为不包含当前next这记录,需要回退一个位置
end = next - 1; // next-1一定大于current,不需要判断
}
break;
} else {
entrys.add(event);
}
}
} else {
long maxMemSize = batchSize * bufferMemUnit;
for (; memsize <= maxMemSize && next <= maxAbleSequence; next++) {
// 永远保证可以取出第一条的记录,避免死锁
Event event = entries[getIndex(next)];
if (ddlIsolation && isDdl(event.getEventType())) {
// 如果是ddl隔离,直接返回
if (entrys.size() == 0) {
entrys.add(event);// 如果没有DML事件,加入当前的DDL事件
end = next; // 更新end为当前
} else {
// 如果之前已经有DML事件,直接返回了,因为不包含当前next这记录,需要回退一个位置
end = next - 1; // next-1一定大于current,不需要判断
}
break;
} else {
entrys.add(event);
memsize += calculateSize(event);
end = next;// 记录end位点
}
}
}
PositionRange<LogPosition> range = new PositionRange<LogPosition>();
result.setPositionRange(range);
range.setStart(CanalEventUtils.createPosition(entrys.get(0)));
range.setEnd(CanalEventUtils.createPosition(entrys.get(result.getEvents().size() - 1)));
range.setEndSeq(end);
// 记录一下是否存在可以被ack的点
for (int i = entrys.size() - 1; i >= 0; i--) {
Event event = entrys.get(i);
// GTID模式,ack的位点必须是事务结尾,因为下一次订阅的时候mysql会发送这个gtid之后的next,如果在事务头就记录了会丢这最后一个事务
if ((CanalEntry.EntryType.TRANSACTIONBEGIN == event.getEntryType() && StringUtils.isEmpty(event.getGtid()))
|| CanalEntry.EntryType.TRANSACTIONEND == event.getEntryType() || isDdl(event.getEventType())) {
// 将事务头/尾设置可被为ack的点
range.setAck(CanalEventUtils.createPosition(event));
break;
}
}
if (getSequence.compareAndSet(current, end)) {
getMemSize.addAndGet(memsize);
notFull.signal();
profiling(result.getEvents(), OP.GET);
return result;
} else {
return new Events<Event>();
}
}
netty3.2.2
如果有消息队列中间件来消费binlog,则不需要启动netty server
com.alibaba.otter.canal.server.netty.CanalServerWithNetty#start
public void start() {
super.start();
if (!embeddedServer.isStart()) {
embeddedServer.start();
}
this.bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
/*
* enable keep-alive mechanism, handle abnormal network connection
* scenarios on OS level. the threshold parameters are depended on OS.
* e.g. On Linux: net.ipv4.tcp_keepalive_time = 300
* net.ipv4.tcp_keepalive_probes = 2 net.ipv4.tcp_keepalive_intvl = 30
*/
bootstrap.setOption("child.keepAlive", true);
/*
* optional parameter.
*/
bootstrap.setOption("child.tcpNoDelay", true);
// 构造对应的pipeline
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipelines = Channels.pipeline();
pipelines.addLast(FixedHeaderFrameDecoder.class.getName(), new FixedHeaderFrameDecoder());
// support to maintain child socket channel.
pipelines.addLast(HandshakeInitializationHandler.class.getName(),
new HandshakeInitializationHandler(childGroups));
pipelines.addLast(ClientAuthenticationHandler.class.getName(),
new ClientAuthenticationHandler(embeddedServer));
SessionHandler sessionHandler = new SessionHandler(embeddedServer);
pipelines.addLast(SessionHandler.class.getName(), sessionHandler);
return pipelines;
}
});
// 启动
if (StringUtils.isNotEmpty(ip)) {
this.serverChannel = bootstrap.bind(new InetSocketAddress(this.ip, this.port));
} else {
this.serverChannel = bootstrap.bind(new InetSocketAddress(this.port));
}
}
客户端通过canal server获取mysql binlog有几种方式(get方法和getWithoutAck):
private Events<Event> getEvents(CanalEventStore eventStore, Position start, int batchSize, Long timeout,
TimeUnit unit) {
if (timeout == null) {
return eventStore.tryGet(start, batchSize); // 即时获取
} else if (timeout <= 0){
return eventStore.get(start, batchSize); // 阻塞获取
} else {
return eventStore.get(start, batchSize, timeout, unit); // 异步获取
}
}
注意:EventStore的实现采用了类似Disruptor的RingBuffer环形缓冲区。RingBuffer的实现类是MemoryEventStoreWithBuffer
get方法和getWithoutAck方法的区别是:
com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded#getWithoutAck(com.alibaba.otter.canal.protocol.ClientIdentity, int, java.lang.Long, java.util.concurrent.TimeUnit)
/**
* 不指定 position 获取事件。canal 会记住此 client 最新的 position。 <br/>
* 如果是第一次 fetch,则会从 canal 中保存的最老一条数据开始输出。
*
* <pre>
* 几种case:
* a. 如果timeout为null,则采用tryGet方式,即时获取
* b. 如果timeout不为null
* 1. timeout为0,则采用get阻塞方式,获取数据,不设置超时,直到有足够的batchSize数据才返回
* 2. timeout不为0,则采用get+timeout方式,获取数据,超时还没有batchSize足够的数据,有多少返回多少
*
* 注意: meta获取和数据的获取需要保证顺序性,优先拿到meta的,一定也会是优先拿到数据,所以需要加同步. (不能出现先拿到meta,拿到第二批数据,这样就会导致数据顺序性出现问题)
* </pre>
*/
@Override
public Message getWithoutAck(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit)
throws CanalServerException {
checkStart(clientIdentity.getDestination());
checkSubscribe(clientIdentity);
CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
synchronized (canalInstance) {
// 获取到流式数据中的最后一批获取的位置
PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().getLastestBatch(clientIdentity);
Events<Event> events = null;
if (positionRanges != null) { // 存在流数据
events = getEvents(canalInstance.getEventStore(), positionRanges.getStart(), batchSize, timeout, unit);
} else {// ack后第一次获取
Position start = canalInstance.getMetaManager().getCursor(clientIdentity);
if (start == null) { // 第一次,还没有过ack记录,则获取当前store中的第一条
start = canalInstance.getEventStore().getFirstPosition();
}
events = getEvents(canalInstance.getEventStore(), start, batchSize, timeout, unit);
}
if (CollectionUtils.isEmpty(events.getEvents())) {
// logger.debug("getWithoutAck successfully, clientId:{}
// batchSize:{} but result
// is null",
// clientIdentity.getClientId(),
// batchSize);
return new Message(-1, true, new ArrayList()); // 返回空包,避免生成batchId,浪费性能
} else {
// 记录到流式信息
Long batchId = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange());
boolean raw = isRaw(canalInstance.getEventStore());
List entrys = null;
if (raw) {
entrys = Lists.transform(events.getEvents(), new Function<Event, ByteString>() {
public ByteString apply(Event input) {
return input.getRawEntry();
}
});
} else {
entrys = Lists.transform(events.getEvents(), new Function<Event, CanalEntry.Entry>() {
public CanalEntry.Entry apply(Event input) {
return input.getEntry();
}
});
}
if (logger.isInfoEnabled()) {
logger.info("getWithoutAck successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]",
clientIdentity.getClientId(),
batchSize,
entrys.size(),
batchId,
events.getPositionRange());
}
return new Message(batchId, raw, entrys);
}
}
}
ack
com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded#ack
/**
* 进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
*
* <pre>
* 注意:进行反馈时必须按照batchId的顺序进行ack(需有客户端保证)
* </pre>
*/
public void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException {
checkStart(clientIdentity.getDestination());
checkSubscribe(clientIdentity);
CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
PositionRange<LogPosition> positionRanges = null;
positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity, batchId); // 更新位置
if (positionRanges == null) { // 说明是重复的ack/rollback
throw new CanalServerException(String.format("ack error , clientId:%s batchId:%d is not exist , please check",
clientIdentity.getClientId(),
batchId));
}
// 更新cursor最好严格判断下位置是否有跳跃更新
// Position position = lastRollbackPostions.get(clientIdentity);
// if (position != null) {
// // Position position =
// canalInstance.getMetaManager().getCursor(clientIdentity);
// LogPosition minPosition =
// CanalEventUtils.min(positionRanges.getStart(), (LogPosition)
// position);
// if (minPosition == position) {// ack的position要晚于该最后ack的位置,可能有丢数据
// throw new CanalServerException(
// String.format(
// "ack error , clientId:%s batchId:%d %s is jump ack , last ack:%s",
// clientIdentity.getClientId(), batchId, positionRanges,
// position));
// }
// }
// 更新cursor,并持久化消费位点,不同的instance.xml不同的持久化方案
if (positionRanges.getAck() != null) {
canalInstance.getMetaManager().updateCursor(clientIdentity, positionRanges.getAck());
if (logger.isInfoEnabled()) {
logger.info("ack successfully, clientId:{} batchId:{} position:{}",
clientIdentity.getClientId(),
batchId,
positionRanges);
}
}
// 可定时清理数据
canalInstance.getEventStore().ack(positionRanges.getEnd(), positionRanges.getEndSeq());
}
com.alibaba.otter.canal.deployer.CanalLauncher#main
com.alibaba.otter.canal.deployer.CanalStarter#start
public synchronized void start() throws Throwable {
String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
if (!"tcp".equalsIgnoreCase(serverMode)) {
// 启动消息队列生产者,将解析的binlog消息放入消息队列
ExtensionLoader<CanalMQProducer> loader = ExtensionLoader.getExtensionLoader(CanalMQProducer.class);
canalMQProducer = loader
.getExtension(serverMode.toLowerCase(), CONNECTOR_SPI_DIR, CONNECTOR_STANDBY_SPI_DIR);
if (canalMQProducer != null) {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(canalMQProducer.getClass().getClassLoader());
canalMQProducer.init(properties);
Thread.currentThread().setContextClassLoader(cl);
}
}
if (canalMQProducer != null) {
MQProperties mqProperties = canalMQProducer.getMqProperties();
// disable netty
// 如果有队列生产者,则将nettyserver关闭,因为不需要server,有队列生产者消费binlog
System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
if (mqProperties.isFlatMessage()) {
// 设置为raw避免ByteString->Entry的二次解析
System.setProperty("canal.instance.memory.rawEntry", "false");
}
}
logger.info("## start the canal server.");
controller = new CanalController(properties);
controller.start();
logger.info("## the canal server is running now ......");
shutdownThread = new Thread() {
public void run() {
try {
logger.info("## stop the canal server");
controller.stop();
CanalLauncher.runningLatch.countDown();
} catch (Throwable e) {
logger.warn("##something goes wrong when stopping canal Server:", e);
} finally {
logger.info("## canal server is down.");
}
}
};
Runtime.getRuntime().addShutdownHook(shutdownThread);
if (canalMQProducer != null) {
canalMQStarter = new CanalMQStarter(canalMQProducer);
String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
// 开始消费
canalMQStarter.start(destinations);
controller.setCanalMQStarter(canalMQStarter);
}
// start canalAdmin
String port = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT);
if (canalAdmin == null && StringUtils.isNotEmpty(port)) {
String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
CanalAdminController canalAdmin = new CanalAdminController(this);
canalAdmin.setUser(user);
canalAdmin.setPasswd(passwd);
String ip = CanalController.getProperty(properties, CanalConstants.CANAL_IP);
logger.debug("canal admin port:{}, canal admin user:{}, canal admin password: {}, canal ip:{}",
port,
user,
passwd,
ip);
CanalAdminWithNetty canalAdminWithNetty = CanalAdminWithNetty.instance();
canalAdminWithNetty.setCanalAdmin(canalAdmin);
canalAdminWithNetty.setPort(Integer.parseInt(port));
canalAdminWithNetty.setIp(ip);
canalAdminWithNetty.start();
this.canalAdmin = canalAdminWithNetty;
}
running = true;
}
com.alibaba.otter.canal.deployer.CanalController#start
public void start() throws Throwable {
logger.info("## start the canal server[{}({}):{}]", ip, registerIp, port);
// 创建整个canal的工作节点
final String path = ZookeeperPathUtils.getCanalClusterNode(registerIp + ":" + port);
initCid(path);
if (zkclientx != null) {
this.zkclientx.subscribeStateChanges(new IZkStateListener() {
public void handleStateChanged(KeeperState state) throws Exception {
}
public void handleNewSession() throws Exception {
initCid(path);
}
@Override
public void handleSessionEstablishmentError(Throwable error) throws Exception {
logger.error("failed to connect to zookeeper", error);
}
});
}
// 优先启动embeded服务
embededCanalServer.start();
// 尝试启动一下非lazy状态的通道
for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) {
final String destination = entry.getKey();
InstanceConfig config = entry.getValue();
// 创建destination的工作节点
if (!embededCanalServer.isStart(destination)) {
// HA机制启动
ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
if (!config.getLazy() && !runningMonitor.isStart()) {
runningMonitor.start();
}
}
if (autoScan) {
instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction);
}
}
if (autoScan) {
instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();
for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
if (!monitor.isStart()) {
monitor.start();
}
}
}
// 启动网络接口
if (canalServer != null) {
canalServer.start();
}
}
com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded#start()
public void start() {
if (!isStart()) {
super.start();
// 如果存在provider,则启动metrics service
loadCanalMetrics();
metrics.setServerPort(metricsPort);
metrics.initialize();
// 根据destination生成instances,一个destionation一个instances,默认使用spring容器
canalInstances = MigrateMap.makeComputingMap(new Function<String, CanalInstance>() {
public CanalInstance apply(String destination) {
return canalInstanceGenerator.generate(destination);
}
});
// lastRollbackPostions = new MapMaker().makeMap();
}
}
com.alibaba.otter.canal.server.CanalMQStarter#start
public synchronized void start(String destinations) {
try {
if (running) {
return;
}
mqProperties = canalMQProducer.getMqProperties();
// set filterTransactionEntry
if (mqProperties.isFilterTransactionEntry()) {
System.setProperty("canal.instance.filter.transaction.entry", "true");
}
canalServer = CanalServerWithEmbedded.instance();
// 对应每个instance启动一个worker线程
executorService = Executors.newCachedThreadPool();
logger.info("## start the MQ workers.");
String[] dsts = StringUtils.split(destinations, ",");
for (String destination : dsts) {
destination = destination.trim();
CanalMQRunnable canalMQRunnable = new CanalMQRunnable(destination);
canalMQWorks.put(destination, canalMQRunnable);
executorService.execute(canalMQRunnable);
}
running = true;
logger.info("## the MQ workers is running now ......");
shutdownThread = new Thread() {
public void run() {
try {
logger.info("## stop the MQ workers");
running = false;
executorService.shutdown();
canalMQProducer.stop();
} catch (Throwable e) {
logger.warn("##something goes wrong when stopping MQ workers:", e);
} finally {
logger.info("## canal MQ is down.");
}
}
};
Runtime.getRuntime().addShutdownHook(shutdownThread);
} catch (Throwable e) {
logger.error("## Something goes wrong when starting up the canal MQ workers:", e);
}
}
消息队列开始消费
private void worker(String destination, AtomicBoolean destinationRunning) {
while (!running || !destinationRunning.get()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// ignore
}
}
logger.info("## start the MQ producer: {}.", destination);
MDC.put("destination", destination);
final ClientIdentity clientIdentity = new ClientIdentity(destination, (short) 1001, "");
while (running && destinationRunning.get()) {
try {
CanalInstance canalInstance = canalServer.getCanalInstances().get(destination);
if (canalInstance == null) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// ignore
}
continue;
}
MQDestination canalDestination = new MQDestination();
canalDestination.setCanalDestination(destination);
CanalMQConfig mqConfig = canalInstance.getMqConfig();
canalDestination.setTopic(mqConfig.getTopic());
canalDestination.setPartition(mqConfig.getPartition());
canalDestination.setDynamicTopic(mqConfig.getDynamicTopic());
canalDestination.setPartitionsNum(mqConfig.getPartitionsNum());
canalDestination.setPartitionHash(mqConfig.getPartitionHash());
// 订阅destination的instance
canalServer.subscribe(clientIdentity);
logger.info("## the MQ producer: {} is running now ......", destination);
Integer getTimeout = mqProperties.getFetchTimeout();
Integer getBatchSize = mqProperties.getBatchSize();
while (running && destinationRunning.get()) {
Message message;
if (getTimeout != null && getTimeout > 0) {
// 从canal中取消息
message = canalServer
.getWithoutAck(clientIdentity, getBatchSize, getTimeout.longValue(), TimeUnit.MILLISECONDS);
} else {
message = canalServer.getWithoutAck(clientIdentity, getBatchSize);
}
final long batchId = message.getId();
try {
int size = message.isRaw() ? message.getRawEntries().size() : message.getEntries().size();
if (batchId != -1 && size != 0) {
// 发送消息到消息队列
canalMQProducer.send(canalDestination, message, new Callback() {
@Override
public void commit() {
// 生产成功,提交确认
canalServer.ack(clientIdentity, batchId); // 提交确认
}
@Override
public void rollback() {
canalServer.rollback(clientIdentity, batchId);
}
}); // 发送message到topic
} else {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// ignore
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
} catch (Exception e) {
logger.error("process error!", e);
}
}
}
2阶段
message = canalServer .getWithoutAck(clientIdentity, getBatchSize, getTimeout.longValue(), TimeUnit.MILLISECONDS);获得消息
get方法和getWithoutAck方法的区别是:
com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded#subscribe
/**
* 客户端订阅,重复订阅时会更新对应的filter信息
*/
@Override
public void subscribe(ClientIdentity clientIdentity) throws CanalServerException {
checkStart(clientIdentity.getDestination());
CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
if (!canalInstance.getMetaManager().isStart()) {
canalInstance.getMetaManager().start();
}
canalInstance.getMetaManager().subscribe(clientIdentity); // 执行一下meta订阅
Position position = canalInstance.getMetaManager().getCursor(clientIdentity);
if (position == null) {
position = canalInstance.getEventStore().getFirstPosition();// 获取一下store中的第一条
if (position != null) {
canalInstance.getMetaManager().updateCursor(clientIdentity, position); // 更新一下cursor
}
logger.info("subscribe successfully, {} with first position:{} ", clientIdentity, position);
} else {
logger.info("subscribe successfully, use last cursor position:{} ", clientIdentity, position);
}
// 通知下订阅关系变化
canalInstance.subscribeChange(clientIdentity);
}
消息队列消费ringbuffer里的消息和存消费位置,怎么保障事务性
保障不了。消费结束后,提交ack,如果提交ack失败,则会发生重复消费
重复读取binlog
可能会重复读,上一次解析binlog到了什么位置,对应组件 CanalLogPositionManager,保存在内存中,当这个binlog被消费且返回ack,则这个位点会持久化到文件或zk,下次启动canal,从持久化的位点开始同步mysql数据,如果binlog没被消费,则会重复读binlog
event store持久化?
没有持久化,事件数组满了,一直阻塞
先了解一下canal如何维护一份增量订阅&消费的关系信息:
默认支持的instance.xml有四种: spring/memory-instance.xml spring/default-instance.xml spring/group-instance.xml spring/file-instance.xml
解析位点,上一次解析binlog到了什么位置,对应组件 CanalLogPositionManager 消费位点,客户端ack后,记录客户端提交的最后位点 CanalMetaManager
目前默认支持的xxx-instance.xml有四种:
memory-instance.xml介绍:
所有的组件(parser , sink , store)都选择了内存版模式,记录位点的都选择了memory模式,重启后又会回到初始位点进行解析
特点:速度最快,依赖最少(不需要zookeeper)
场景:一般应用在quickstart,或者是出现问题后,进行数据分析的场景,不应该将其应用于生产环境
default-instance.xml介绍:
store选择了内存模式,其余的parser/sink依赖的位点管理选择了持久化模式,目前持久化的方式主要是写入zookeeper,保证数据集群共享.
特点:支持HA
场景:生产环境,集群化部署.
group-instance.xml介绍:
主要针对需要进行多库合并时,可以将多个物理instance合并为一个逻辑instance,提供客户端访问。
场景:分库业务。 比如产品数据拆分了4个库,每个库会有一个instance,如果不用group,业务上要消费数据时,需要启动4个客户端,分别链接4个instance实例。使用group后,可以在canal server上合并为一个逻辑instance,只需要启动1个客户端,链接这个逻辑instance即可.
instance.xml设计初衷:
允许进行自定义扩展,比如实现了基于数据库的位点管理后,可以自定义一份自己的instance.xml,整个canal设计中最大的灵活性在于此
https://juejin.im/entry/59f15508f265da4321534371
https://github.com/alibaba/canal/wiki/AdminGuide
https://github.com/alibaba/canal/wiki/ClientAPI
https://www.cnblogs.com/f-zhao/p/9112158.html
https://blog.csdn.net/lvzhuyiyi/article/details/51842697