canal       

canal

整体架构

image-20200321100323648

image-20200321100543408

说明:

instance模块:

image-20200321153724458

canal client可以是消息队列

image-20200321154317205

EventParser

image-20200321100729971

EventStore负责存储解析后的Binlog事件,而解析动作负责拉取Binlog,它的流程比较复杂。需要和MetaManager进行交互。 比如要记录每次拉取的Position,这样下一次就可以从上一次的最后一个位置继续拉取。所以MetaManager应该是有状态的。

EventParser的流程如下:

  1. Connection获取上一次解析成功的位置 (如果第一次启动,则获取初始指定的位置或者是当前数据库的binlog位点)
  2. Connection建立链接,发送BINLOG_DUMP指令
  3. Mysql开始推送Binaly Log
  4. 接收到的Binaly Log的通过Binlog parser进行协议解析,补充一些特定信息
  5. 传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储成功
  6. 存储成功后,定时记录Binaly Log位置

com.alibaba.otter.canal.parse.inbound.mysql.MysqlMultiStageCoprocessor.SinkStoreStage#onEvent

public void onEvent(MessageEvent event, long sequence, boolean endOfBatch) throws Exception {
    try {
        if (event.getEntry() != null) {
            transactionBuffer.add(event.getEntry());
        }

        LogEvent logEvent = event.getEvent();
        if (connection instanceof MysqlConnection && logEvent.getSemival() == 1) {
            // semi ack回报
            ((MysqlConnection) connection).sendSemiAck(logEvent.getHeader().getLogFileName(),
                logEvent.getHeader().getLogPos());
        }

        // clear for gc
        event.setBuffer(null);
        event.setEvent(null);
        event.setTable(null);
        event.setEntry(null);
        event.setNeedDmlParse(false);
    } catch (Throwable e) {
        exception = new CanalParseException(e);
        throw exception;
    }
}

也是通过数组进行数据交换

private int                      bufferSize    = 1024;
 private AtomicLong               putSequence   = new AtomicLong(INIT_SQEUENCE); // 代表当前put操作最后一次写操作发生的位置
    private AtomicLong               flushSequence = new AtomicLong(INIT_SQEUENCE); // 代表满足flush条件后最后一次数据flush的时间

com.alibaba.otter.canal.parse.inbound.EventTransactionBuffer#add(com.alibaba.otter.canal.protocol.CanalEntry.Entry)

public void add(CanalEntry.Entry entry) throws InterruptedException {
    switch (entry.getEntryType()) {
        case TRANSACTIONBEGIN:
            flush();// 刷新上一次的数据
            put(entry);
            break;
        case TRANSACTIONEND:
            put(entry);
            flush();
            break;
        case ROWDATA:
            put(entry);
            // 针对非DML的数据,直接输出,不进行buffer控制
            EventType eventType = entry.getHeader().getEventType();
            if (eventType != null && !isDml(eventType)) {
                flush();
            }
            break;
        case HEARTBEAT:
            // master过来的heartbeat,说明binlog已经读完了,是idle状态
            put(entry);
            flush();
            break;
        default:
            break;
    }
}

消费数组里的数据

com.alibaba.otter.canal.parse.inbound.EventTransactionBuffer#flush

private void flush() throws InterruptedException {
    long start = this.flushSequence.get() + 1;
    long end = this.putSequence.get();

    if (start <= end) {
        List<CanalEntry.Entry> transaction = new ArrayList<CanalEntry.Entry>();
        for (long next = start; next <= end; next++) {
            transaction.add(this.entries[getIndex(next)]);
        }

        flushCallback.flush(transaction);
        flushSequence.set(end);// flush成功后,更新flush位置
    }
}
public void flush(List<CanalEntry.Entry> transaction) throws InterruptedException {
    boolean successed = consumeTheEventAndProfilingIfNecessary(transaction);
    if (!running) {
        return;
    }

    if (!successed) {
        throw new CanalParseException("consume failed!");
    }
		
  	// 消费数组的binlog成功,记录消费位点
    LogPosition position = buildLastTransactionPosition(transaction);
    if (position != null) { // 可能position为空
      	// 持久化到内存
        logPositionManager.persistLogPosition(AbstractEventParser.this.destination, position);
    }
}

com.alibaba.otter.canal.parse.inbound.AbstractEventParser#consumeTheEventAndProfilingIfNecessary

protected boolean consumeTheEventAndProfilingIfNecessary(List<CanalEntry.Entry> entrys) throws CanalSinkException,
                                                                                       InterruptedException {
    long startTs = -1;
    boolean enabled = getProfilingEnabled();
    if (enabled) {
        startTs = System.currentTimeMillis();
    }

    boolean result = eventSink.sink(entrys, (runningInfo == null) ? null : runningInfo.getAddress(), destination);

    if (enabled) {
        this.processingInterval = System.currentTimeMillis() - startTs;
    }

    if (consumedEventCount.incrementAndGet() < 0) {
        consumedEventCount.set(0);
    }

    return result;
}

EventSink

image-20200321100749066

把事件保存在事件内存数组里

com.alibaba.otter.canal.parse.inbound.AbstractEventParser#consumeTheEventAndProfilingIfNecessary

com.alibaba.otter.canal.sink.entry.EntryEventSink#sink

com.alibaba.otter.canal.sink.entry.EntryEventSink#sinkData

private boolean sinkData(List<CanalEntry.Entry> entrys, InetSocketAddress remoteAddress)
                                                                                        throws InterruptedException {
    boolean hasRowData = false;
    boolean hasHeartBeat = false;
    List<Event> events = new ArrayList<Event>();
    for (CanalEntry.Entry entry : entrys) {
        if (!doFilter(entry)) {
            continue;
        }

        if (filterTransactionEntry
            && (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND)) {
            long currentTimestamp = entry.getHeader().getExecuteTime();
            // 基于一定的策略控制,放过空的事务头和尾,便于及时更新数据库位点,表明工作正常
            if (lastTransactionCount.incrementAndGet() <= emptyTransctionThresold
                && Math.abs(currentTimestamp - lastTransactionTimestamp) <= emptyTransactionInterval) {
                continue;
            } else {
                lastTransactionCount.set(0L);
                lastTransactionTimestamp = currentTimestamp;
            }
        }

        hasRowData |= (entry.getEntryType() == EntryType.ROWDATA);
        hasHeartBeat |= (entry.getEntryType() == EntryType.HEARTBEAT);
        Event event = new Event(new LogIdentity(remoteAddress, -1L), entry, raw);
        events.add(event);
    }

    if (hasRowData || hasHeartBeat) {
        // 存在row记录 或者 存在heartbeat记录,直接跳给后续处理
        return doSink(events);
    } else {
        // 需要过滤的数据
        if (filterEmtryTransactionEntry && !CollectionUtils.isEmpty(events)) {
            long currentTimestamp = events.get(0).getExecuteTime();
            // 基于一定的策略控制,放过空的事务头和尾,便于及时更新数据库位点,表明工作正常
            if (Math.abs(currentTimestamp - lastEmptyTransactionTimestamp) > emptyTransactionInterval
                || lastEmptyTransactionCount.incrementAndGet() > emptyTransctionThresold) {
                lastEmptyTransactionCount.set(0L);
                lastEmptyTransactionTimestamp = currentTimestamp;
                return doSink(events);
            }
        }

        // 直接返回true,忽略空的事务头和尾
        return true;
    }
}

com.alibaba.otter.canal.sink.entry.EntryEventSink#doSink

如果不成功,函数不会返回,一直阻塞在这里

protected boolean doSink(List<Event> events) {
    for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {
        events = handler.before(events);
    }
    long blockingStart = 0L;
    int fullTimes = 0;
    do {
        if (eventStore.tryPut(events)) {
            if (fullTimes > 0) {
                eventsSinkBlockingTime.addAndGet(System.nanoTime() - blockingStart);
            }
            for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {
                events = handler.after(events);
            }
            return true;
        } else {
            if (fullTimes == 0) {
                blockingStart = System.nanoTime();
            }
            applyWait(++fullTimes);
            if (fullTimes % 100 == 0) {
                long nextStart = System.nanoTime();
                eventsSinkBlockingTime.addAndGet(nextStart - blockingStart);
                blockingStart = nextStart;
            }
        }

        for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {
            events = handler.retry(events);
        }
			// 如果put失败,则死循环
    } while (running && !Thread.interrupted());
    return false;
}
// 超过3次,阻塞,3次以内,让出时间片yield,感觉没什么必要
private void applyWait(int fullTimes) {
    int newFullTimes = fullTimes > maxFullTimes ? maxFullTimes : fullTimes;
    if (fullTimes <= 3) { // 3次以内
        Thread.yield();
    } else { // 超过3次,最多只sleep 10ms
        LockSupport.parkNanos(1000 * 1000L * newFullTimes);
    }

}

将数据放到内容,没有持久化

实现参考了Disruptor,但tryPut有加锁,disruptor是无锁队列

事件数组大小bufferSize=16 * 1024;

com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer#tryPut(java.util.List)

public boolean tryPut(List<Event> data) throws CanalStoreException {
    if (data == null || data.isEmpty()) {
        return true;
    }

    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lock();
    try {
        if (!checkFreeSlotAt(putSequence.get() + data.size())) {
            return false;
        } else {
            doPut(data);
            return true;
        }
    } finally {
        lock.unlock();
    }
}
private void doPut(List<Event> data) {
    long current = putSequence.get();
    long end = current + data.size();

    // 先写数据,再更新对应的cursor,并发度高的情况,putSequence会被get请求可见,拿出了ringbuffer中的老的Entry值
    for (long next = current + 1; next <= end; next++) {
        entries[getIndex(next)] = data.get((int) (next - current - 1));
    }

    putSequence.set(end);

    // 记录一下gets memsize信息,方便快速检索
    if (batchMode.isMemSize()) {
        long size = 0;
        for (Event event : data) {
            size += calculateSize(event);
        }

        putMemSize.getAndAdd(size);
    }
    profiling(data, OP.PUT);
    // tell other threads that store is not empty
    notEmpty.signal();
}

EventStore

EventStore是一个RingBuffer,有三个指针:Put、Get、Ack。

image-20200321100828582

保存代码在上面EventSink那段

get代码如下

com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer#get(com.alibaba.otter.canal.protocol.position.Position, int)

public Events<Event> get(Position start, int batchSize) throws InterruptedException, CanalStoreException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        try {
            // 阻塞
            while (!checkUnGetSlotAt((LogPosition) start, batchSize))
                notEmpty.await();
        } catch (InterruptedException ie) {
            notEmpty.signal(); // propagate to non-interrupted thread
            throw ie;
        }

        return doGet(start, batchSize);
    } finally {
        lock.unlock();
    }
}
private Events<Event> doGet(Position start, int batchSize) throws CanalStoreException {
    LogPosition startPosition = (LogPosition) start;

    long current = getSequence.get();
    long maxAbleSequence = putSequence.get();
    long next = current;
    long end = current;
    // 如果startPosition为null,说明是第一次,默认+1处理
    if (startPosition == null || !startPosition.getPostion().isIncluded()) { // 第一次订阅之后,需要包含一下start位置,防止丢失第一条记录
        next = next + 1;
    }

    if (current >= maxAbleSequence) {
        return new Events<Event>();
    }

    Events<Event> result = new Events<Event>();
    List<Event> entrys = result.getEvents();
    long memsize = 0;
    if (batchMode.isItemSize()) {
        end = (next + batchSize - 1) < maxAbleSequence ? (next + batchSize - 1) : maxAbleSequence;
        // 提取数据并返回
        for (; next <= end; next++) {
            Event event = entries[getIndex(next)];
            if (ddlIsolation && isDdl(event.getEventType())) {
                // 如果是ddl隔离,直接返回
                if (entrys.size() == 0) {
                    entrys.add(event);// 如果没有DML事件,加入当前的DDL事件
                    end = next; // 更新end为当前
                } else {
                    // 如果之前已经有DML事件,直接返回了,因为不包含当前next这记录,需要回退一个位置
                    end = next - 1; // next-1一定大于current,不需要判断
                }
                break;
            } else {
                entrys.add(event);
            }
        }
    } else {
        long maxMemSize = batchSize * bufferMemUnit;
        for (; memsize <= maxMemSize && next <= maxAbleSequence; next++) {
            // 永远保证可以取出第一条的记录,避免死锁
            Event event = entries[getIndex(next)];
            if (ddlIsolation && isDdl(event.getEventType())) {
                // 如果是ddl隔离,直接返回
                if (entrys.size() == 0) {
                    entrys.add(event);// 如果没有DML事件,加入当前的DDL事件
                    end = next; // 更新end为当前
                } else {
                    // 如果之前已经有DML事件,直接返回了,因为不包含当前next这记录,需要回退一个位置
                    end = next - 1; // next-1一定大于current,不需要判断
                }
                break;
            } else {
                entrys.add(event);
                memsize += calculateSize(event);
                end = next;// 记录end位点
            }
        }

    }

    PositionRange<LogPosition> range = new PositionRange<LogPosition>();
    result.setPositionRange(range);

    range.setStart(CanalEventUtils.createPosition(entrys.get(0)));
    range.setEnd(CanalEventUtils.createPosition(entrys.get(result.getEvents().size() - 1)));
    range.setEndSeq(end);
    // 记录一下是否存在可以被ack的点

    for (int i = entrys.size() - 1; i >= 0; i--) {
        Event event = entrys.get(i);
        // GTID模式,ack的位点必须是事务结尾,因为下一次订阅的时候mysql会发送这个gtid之后的next,如果在事务头就记录了会丢这最后一个事务
        if ((CanalEntry.EntryType.TRANSACTIONBEGIN == event.getEntryType() && StringUtils.isEmpty(event.getGtid()))
            || CanalEntry.EntryType.TRANSACTIONEND == event.getEntryType() || isDdl(event.getEventType())) {
            // 将事务头/尾设置可被为ack的点
            range.setAck(CanalEventUtils.createPosition(event));
            break;
        }
    }

    if (getSequence.compareAndSet(current, end)) {
        getMemSize.addAndGet(memsize);
        notFull.signal();
        profiling(result.getEvents(), OP.GET);
        return result;
    } else {
        return new Events<Event>();
    }
}

server

netty3.2.2

如果有消息队列中间件来消费binlog,则不需要启动netty server

com.alibaba.otter.canal.server.netty.CanalServerWithNetty#start

public void start() {
    super.start();

    if (!embeddedServer.isStart()) {
        embeddedServer.start();
    }

    this.bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
        Executors.newCachedThreadPool()));
    /*
     * enable keep-alive mechanism, handle abnormal network connection
     * scenarios on OS level. the threshold parameters are depended on OS.
     * e.g. On Linux: net.ipv4.tcp_keepalive_time = 300
     * net.ipv4.tcp_keepalive_probes = 2 net.ipv4.tcp_keepalive_intvl = 30
     */
    bootstrap.setOption("child.keepAlive", true);
    /*
     * optional parameter.
     */
    bootstrap.setOption("child.tcpNoDelay", true);

    // 构造对应的pipeline
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {

        public ChannelPipeline getPipeline() throws Exception {
            ChannelPipeline pipelines = Channels.pipeline();
            pipelines.addLast(FixedHeaderFrameDecoder.class.getName(), new FixedHeaderFrameDecoder());
            // support to maintain child socket channel.
            pipelines.addLast(HandshakeInitializationHandler.class.getName(),
                new HandshakeInitializationHandler(childGroups));
            pipelines.addLast(ClientAuthenticationHandler.class.getName(),
                new ClientAuthenticationHandler(embeddedServer));

            SessionHandler sessionHandler = new SessionHandler(embeddedServer);
            pipelines.addLast(SessionHandler.class.getName(), sessionHandler);
            return pipelines;
        }
    });

    // 启动
    if (StringUtils.isNotEmpty(ip)) {
        this.serverChannel = bootstrap.bind(new InetSocketAddress(this.ip, this.port));
    } else {
        this.serverChannel = bootstrap.bind(new InetSocketAddress(this.port));
    }
}

client

image-20200321112245960

客户端通过canal server获取mysql binlog有几种方式(get方法和getWithoutAck):

private Events<Event> getEvents(CanalEventStore eventStore, Position start, int batchSize, Long timeout,
                                TimeUnit unit) {
    if (timeout == null) {
        return eventStore.tryGet(start, batchSize); // 即时获取
    } else if (timeout <= 0){
        return eventStore.get(start, batchSize); // 阻塞获取
    } else {
        return eventStore.get(start, batchSize, timeout, unit); // 异步获取
    }
}

注意:EventStore的实现采用了类似Disruptor的RingBuffer环形缓冲区。RingBuffer的实现类是MemoryEventStoreWithBuffer

get方法和getWithoutAck方法的区别是:

com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded#getWithoutAck(com.alibaba.otter.canal.protocol.ClientIdentity, int, java.lang.Long, java.util.concurrent.TimeUnit)

/**
 * 不指定 position 获取事件。canal 会记住此 client 最新的 position。 <br/>
 * 如果是第一次 fetch,则会从 canal 中保存的最老一条数据开始输出。
 *
 * <pre>
 * 几种case:
 * a. 如果timeout为null,则采用tryGet方式,即时获取
 * b. 如果timeout不为null
 *    1. timeout为0,则采用get阻塞方式,获取数据,不设置超时,直到有足够的batchSize数据才返回
 *    2. timeout不为0,则采用get+timeout方式,获取数据,超时还没有batchSize足够的数据,有多少返回多少
 * 
 * 注意: meta获取和数据的获取需要保证顺序性,优先拿到meta的,一定也会是优先拿到数据,所以需要加同步. (不能出现先拿到meta,拿到第二批数据,这样就会导致数据顺序性出现问题)
 * </pre>
 */
@Override
public Message getWithoutAck(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit)
                                                                                                       throws CanalServerException {
    checkStart(clientIdentity.getDestination());
    checkSubscribe(clientIdentity);

    CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
    synchronized (canalInstance) {
        // 获取到流式数据中的最后一批获取的位置
        PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().getLastestBatch(clientIdentity);

        Events<Event> events = null;
        if (positionRanges != null) { // 存在流数据
            events = getEvents(canalInstance.getEventStore(), positionRanges.getStart(), batchSize, timeout, unit);
        } else {// ack后第一次获取
            Position start = canalInstance.getMetaManager().getCursor(clientIdentity);
            if (start == null) { // 第一次,还没有过ack记录,则获取当前store中的第一条
                start = canalInstance.getEventStore().getFirstPosition();
            }

            events = getEvents(canalInstance.getEventStore(), start, batchSize, timeout, unit);
        }

        if (CollectionUtils.isEmpty(events.getEvents())) {
            // logger.debug("getWithoutAck successfully, clientId:{}
            // batchSize:{} but result
            // is null",
            // clientIdentity.getClientId(),
            // batchSize);
            return new Message(-1, true, new ArrayList()); // 返回空包,避免生成batchId,浪费性能
        } else {
            // 记录到流式信息
            Long batchId = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange());
            boolean raw = isRaw(canalInstance.getEventStore());
            List entrys = null;
            if (raw) {
                entrys = Lists.transform(events.getEvents(), new Function<Event, ByteString>() {

                    public ByteString apply(Event input) {
                        return input.getRawEntry();
                    }
                });
            } else {
                entrys = Lists.transform(events.getEvents(), new Function<Event, CanalEntry.Entry>() {

                    public CanalEntry.Entry apply(Event input) {
                        return input.getEntry();
                    }
                });
            }
            if (logger.isInfoEnabled()) {
                logger.info("getWithoutAck successfully, clientId:{} batchSize:{}  real size is {} and result is [batchId:{} , position:{}]",
                    clientIdentity.getClientId(),
                    batchSize,
                    entrys.size(),
                    batchId,
                    events.getPositionRange());
            }
            return new Message(batchId, raw, entrys);
        }

    }
}

ack

com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded#ack

 /**
     * 进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
     *
     * <pre>
     * 注意:进行反馈时必须按照batchId的顺序进行ack(需有客户端保证)
     * </pre>
     */
public void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException {
    checkStart(clientIdentity.getDestination());
    checkSubscribe(clientIdentity);

    CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
    PositionRange<LogPosition> positionRanges = null;
    positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity, batchId); // 更新位置
    if (positionRanges == null) { // 说明是重复的ack/rollback
        throw new CanalServerException(String.format("ack error , clientId:%s batchId:%d is not exist , please check",
            clientIdentity.getClientId(),
            batchId));
    }

    // 更新cursor最好严格判断下位置是否有跳跃更新
    // Position position = lastRollbackPostions.get(clientIdentity);
    // if (position != null) {
    // // Position position =
    // canalInstance.getMetaManager().getCursor(clientIdentity);
    // LogPosition minPosition =
    // CanalEventUtils.min(positionRanges.getStart(), (LogPosition)
    // position);
    // if (minPosition == position) {// ack的position要晚于该最后ack的位置,可能有丢数据
    // throw new CanalServerException(
    // String.format(
    // "ack error , clientId:%s batchId:%d %s is jump ack , last ack:%s",
    // clientIdentity.getClientId(), batchId, positionRanges,
    // position));
    // }
    // }

    // 更新cursor,并持久化消费位点,不同的instance.xml不同的持久化方案
    if (positionRanges.getAck() != null) {
        canalInstance.getMetaManager().updateCursor(clientIdentity, positionRanges.getAck());
        if (logger.isInfoEnabled()) {
            logger.info("ack successfully, clientId:{} batchId:{} position:{}",
                clientIdentity.getClientId(),
                batchId,
                positionRanges);
        }
    }

    // 可定时清理数据
    canalInstance.getEventStore().ack(positionRanges.getEnd(), positionRanges.getEndSeq());
}

canal入口

com.alibaba.otter.canal.deployer.CanalLauncher#main

com.alibaba.otter.canal.deployer.CanalStarter#start

public synchronized void start() throws Throwable {
    String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
    if (!"tcp".equalsIgnoreCase(serverMode)) {
        // 启动消息队列生产者,将解析的binlog消息放入消息队列
        ExtensionLoader<CanalMQProducer> loader = ExtensionLoader.getExtensionLoader(CanalMQProducer.class);
        canalMQProducer = loader
            .getExtension(serverMode.toLowerCase(), CONNECTOR_SPI_DIR, CONNECTOR_STANDBY_SPI_DIR);
        if (canalMQProducer != null) {
            ClassLoader cl = Thread.currentThread().getContextClassLoader();
            Thread.currentThread().setContextClassLoader(canalMQProducer.getClass().getClassLoader());
            canalMQProducer.init(properties);
            Thread.currentThread().setContextClassLoader(cl);
        }
    }

    if (canalMQProducer != null) {
        MQProperties mqProperties = canalMQProducer.getMqProperties();
        // disable netty
      	// 如果有队列生产者,则将nettyserver关闭,因为不需要server,有队列生产者消费binlog
        System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
        if (mqProperties.isFlatMessage()) {
            // 设置为raw避免ByteString->Entry的二次解析
            System.setProperty("canal.instance.memory.rawEntry", "false");
        }
    }

    logger.info("## start the canal server.");
    controller = new CanalController(properties);
    controller.start();
    logger.info("## the canal server is running now ......");
    shutdownThread = new Thread() {

        public void run() {
            try {
                logger.info("## stop the canal server");
                controller.stop();
                CanalLauncher.runningLatch.countDown();
            } catch (Throwable e) {
                logger.warn("##something goes wrong when stopping canal Server:", e);
            } finally {
                logger.info("## canal server is down.");
            }
        }

    };
    Runtime.getRuntime().addShutdownHook(shutdownThread);

    if (canalMQProducer != null) {
        canalMQStarter = new CanalMQStarter(canalMQProducer);
        String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
      	// 开始消费
        canalMQStarter.start(destinations);
        controller.setCanalMQStarter(canalMQStarter);
    }

    // start canalAdmin
    String port = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT);
    if (canalAdmin == null && StringUtils.isNotEmpty(port)) {
        String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
        String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
        CanalAdminController canalAdmin = new CanalAdminController(this);
        canalAdmin.setUser(user);
        canalAdmin.setPasswd(passwd);

        String ip = CanalController.getProperty(properties, CanalConstants.CANAL_IP);

        logger.debug("canal admin port:{}, canal admin user:{}, canal admin password: {}, canal ip:{}",
            port,
            user,
            passwd,
            ip);

        CanalAdminWithNetty canalAdminWithNetty = CanalAdminWithNetty.instance();
        canalAdminWithNetty.setCanalAdmin(canalAdmin);
        canalAdminWithNetty.setPort(Integer.parseInt(port));
        canalAdminWithNetty.setIp(ip);
        canalAdminWithNetty.start();
        this.canalAdmin = canalAdminWithNetty;
    }

    running = true;
}

com.alibaba.otter.canal.deployer.CanalController#start

public void start() throws Throwable {
    logger.info("## start the canal server[{}({}):{}]", ip, registerIp, port);
    // 创建整个canal的工作节点
    final String path = ZookeeperPathUtils.getCanalClusterNode(registerIp + ":" + port);
    initCid(path);
    if (zkclientx != null) {
        this.zkclientx.subscribeStateChanges(new IZkStateListener() {

            public void handleStateChanged(KeeperState state) throws Exception {

            }

            public void handleNewSession() throws Exception {
                initCid(path);
            }

            @Override
            public void handleSessionEstablishmentError(Throwable error) throws Exception {
                logger.error("failed to connect to zookeeper", error);
            }
        });
    }
    // 优先启动embeded服务
    embededCanalServer.start();
    // 尝试启动一下非lazy状态的通道
    for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) {
        final String destination = entry.getKey();
        InstanceConfig config = entry.getValue();
        // 创建destination的工作节点
        if (!embededCanalServer.isStart(destination)) {
            // HA机制启动
            ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
            if (!config.getLazy() && !runningMonitor.isStart()) {
                runningMonitor.start();
            }
        }

        if (autoScan) {
            instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction);
        }
    }

    if (autoScan) {
        instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();
        for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
            if (!monitor.isStart()) {
                monitor.start();
            }
        }
    }

    // 启动网络接口
    if (canalServer != null) {
        canalServer.start();
    }
}

com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded#start()

public void start() {
    if (!isStart()) {
        super.start();
        // 如果存在provider,则启动metrics service
        loadCanalMetrics();
        metrics.setServerPort(metricsPort);
        metrics.initialize();
      	// 根据destination生成instances,一个destionation一个instances,默认使用spring容器
        canalInstances = MigrateMap.makeComputingMap(new Function<String, CanalInstance>() {

            public CanalInstance apply(String destination) {
                return canalInstanceGenerator.generate(destination);
            }
        });

        // lastRollbackPostions = new MapMaker().makeMap();
    }
}

com.alibaba.otter.canal.server.CanalMQStarter#start

public synchronized void start(String destinations) {
    try {
        if (running) {
            return;
        }
        mqProperties = canalMQProducer.getMqProperties();
        // set filterTransactionEntry
        if (mqProperties.isFilterTransactionEntry()) {
            System.setProperty("canal.instance.filter.transaction.entry", "true");
        }

        canalServer = CanalServerWithEmbedded.instance();

        // 对应每个instance启动一个worker线程
        executorService = Executors.newCachedThreadPool();
        logger.info("## start the MQ workers.");

        String[] dsts = StringUtils.split(destinations, ",");
        for (String destination : dsts) {
            destination = destination.trim();
            CanalMQRunnable canalMQRunnable = new CanalMQRunnable(destination);
            canalMQWorks.put(destination, canalMQRunnable);
            executorService.execute(canalMQRunnable);
        }

        running = true;
        logger.info("## the MQ workers is running now ......");

        shutdownThread = new Thread() {

            public void run() {
                try {
                    logger.info("## stop the MQ workers");
                    running = false;
                    executorService.shutdown();
                    canalMQProducer.stop();
                } catch (Throwable e) {
                    logger.warn("##something goes wrong when stopping MQ workers:", e);
                } finally {
                    logger.info("## canal MQ is down.");
                }
            }

        };

        Runtime.getRuntime().addShutdownHook(shutdownThread);
    } catch (Throwable e) {
        logger.error("## Something goes wrong when starting up the canal MQ workers:", e);
    }
}

消息队列开始消费

private void worker(String destination, AtomicBoolean destinationRunning) {
    while (!running || !destinationRunning.get()) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            // ignore
        }
    }

    logger.info("## start the MQ producer: {}.", destination);
    MDC.put("destination", destination);
    final ClientIdentity clientIdentity = new ClientIdentity(destination, (short) 1001, "");
    while (running && destinationRunning.get()) {
        try {
            CanalInstance canalInstance = canalServer.getCanalInstances().get(destination);
            if (canalInstance == null) {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    // ignore
                }
                continue;
            }
            MQDestination canalDestination = new MQDestination();
            canalDestination.setCanalDestination(destination);
            CanalMQConfig mqConfig = canalInstance.getMqConfig();
            canalDestination.setTopic(mqConfig.getTopic());
            canalDestination.setPartition(mqConfig.getPartition());
            canalDestination.setDynamicTopic(mqConfig.getDynamicTopic());
            canalDestination.setPartitionsNum(mqConfig.getPartitionsNum());
            canalDestination.setPartitionHash(mqConfig.getPartitionHash());

						// 订阅destination的instance          	
            canalServer.subscribe(clientIdentity);
            logger.info("## the MQ producer: {} is running now ......", destination);

            Integer getTimeout = mqProperties.getFetchTimeout();
            Integer getBatchSize = mqProperties.getBatchSize();
            while (running && destinationRunning.get()) {
                Message message;
                if (getTimeout != null && getTimeout > 0) {
                  	// 从canal中取消息
                    message = canalServer
                        .getWithoutAck(clientIdentity, getBatchSize, getTimeout.longValue(), TimeUnit.MILLISECONDS);
                } else {
                    message = canalServer.getWithoutAck(clientIdentity, getBatchSize);
                }

                final long batchId = message.getId();
                try {
                    int size = message.isRaw() ? message.getRawEntries().size() : message.getEntries().size();
                    if (batchId != -1 && size != 0) {
                      	// 发送消息到消息队列
                        canalMQProducer.send(canalDestination, message, new Callback() {

                            @Override
                            public void commit() {
                              	// 生产成功,提交确认
                                canalServer.ack(clientIdentity, batchId); // 提交确认
                            }

                            @Override
                            public void rollback() {
                                canalServer.rollback(clientIdentity, batchId);
                            }
                        }); // 发送message到topic
                    } else {
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            // ignore
                        }
                    }

                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        } catch (Exception e) {
            logger.error("process error!", e);
        }
    }
}

2阶段

  1. message = canalServer .getWithoutAck(clientIdentity, getBatchSize, getTimeout.longValue(), TimeUnit.MILLISECONDS);获得消息

  2. 生产成功,canalServer.ack(clientIdentity, batchId);删除消息
  3. 生产失败,canalServer.rollback(clientIdentity, batchId);

get方法和getWithoutAck方法的区别是:

客户端订阅

com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded#subscribe

/**
 * 客户端订阅,重复订阅时会更新对应的filter信息
 */
@Override
public void subscribe(ClientIdentity clientIdentity) throws CanalServerException {
    checkStart(clientIdentity.getDestination());

    CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
    if (!canalInstance.getMetaManager().isStart()) {
        canalInstance.getMetaManager().start();
    }

    canalInstance.getMetaManager().subscribe(clientIdentity); // 执行一下meta订阅

    Position position = canalInstance.getMetaManager().getCursor(clientIdentity);
    if (position == null) {
        position = canalInstance.getEventStore().getFirstPosition();// 获取一下store中的第一条
        if (position != null) {
            canalInstance.getMetaManager().updateCursor(clientIdentity, position); // 更新一下cursor
        }
        logger.info("subscribe successfully, {} with first position:{} ", clientIdentity, position);
    } else {
        logger.info("subscribe successfully, use last cursor position:{} ", clientIdentity, position);
    }

    // 通知下订阅关系变化
    canalInstance.subscribeChange(clientIdentity);
}

问题

先了解一下canal如何维护一份增量订阅&消费的关系信息:

instance

默认支持的instance.xml有四种: spring/memory-instance.xml spring/default-instance.xml spring/group-instance.xml spring/file-instance.xml

解析位点,上一次解析binlog到了什么位置,对应组件 CanalLogPositionManager 消费位点,客户端ack后,记录客户端提交的最后位点 CanalMetaManager

目前默认支持的xxx-instance.xml有四种:

memory-instance.xml介绍:

所有的组件(parser , sink , store)都选择了内存版模式,记录位点的都选择了memory模式,重启后又会回到初始位点进行解析

特点:速度最快,依赖最少(不需要zookeeper)

场景:一般应用在quickstart,或者是出现问题后,进行数据分析的场景,不应该将其应用于生产环境

default-instance.xml介绍:

store选择了内存模式,其余的parser/sink依赖的位点管理选择了持久化模式,目前持久化的方式主要是写入zookeeper,保证数据集群共享.

特点:支持HA

场景:生产环境,集群化部署.

group-instance.xml介绍:

主要针对需要进行多库合并时,可以将多个物理instance合并为一个逻辑instance,提供客户端访问。

场景:分库业务。 比如产品数据拆分了4个库,每个库会有一个instance,如果不用group,业务上要消费数据时,需要启动4个客户端,分别链接4个instance实例。使用group后,可以在canal server上合并为一个逻辑instance,只需要启动1个客户端,链接这个逻辑instance即可.

instance.xml设计初衷:

允许进行自定义扩展,比如实现了基于数据库的位点管理后,可以自定义一份自己的instance.xml,整个canal设计中最大的灵活性在于此

参考

https://juejin.im/entry/59f15508f265da4321534371

https://github.com/alibaba/canal/wiki/AdminGuide

https://github.com/alibaba/canal/wiki/ClientAPI

https://www.cnblogs.com/f-zhao/p/9112158.html

https://blog.csdn.net/lvzhuyiyi/article/details/51842697