离线同步工具DataX       

离线同步工具DataX

架构

image-20200106212643199

image-20200106213037076

// 生成模版
python datax.py -r mysqlreader -w mysqlwriter > mysql2mysql.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": ["*"],
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/sally"],
                                "table": ["account"]
                            }
                        ],
                        "password": "123456",
                        "username": "test",
                        "where": ""
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": ["*"],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/wzcredit2",
                                "table": ["account"]
                            }
                        ],
                        "password": "123456",
                        "preSql": [],
                        "session": [],
                        "username": "test",
                        "writeMode": "insert"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "3"
            }
        }
    }
}

// 执行数据同步
python datax.py mysql2mysql.json

channel

相当于消息队列,readerTask将任务放入channel,writerTask消费channel里的数据

读和写公用同一个channel,各自用BufferedRecordExchanger封装了channel,加了一层内存缓存,写任务先读内存buffer,buffer没有,则将channel的数据放入buffer,读任务先写内存buffer,buffer满了,再写入channel

基于内存的channel,MemoryChannel

public class MemoryChannel extends Channel {
    
    // 等待Reader处理完的时间,也就是pull的时间,继承自Channel
    protected volatile long waitReaderTime = 0;
    // 等待Writer处理完的时间,也就是push的时间,继承自Channel
    protected volatile long waitWriterTime = 0;

    // Channel里面保存的数据大小
		private AtomicInteger memoryBytes = new AtomicInteger(0);
    // 存放记录的queue
		private ArrayBlockingQueue<Record> queue = null;
}
protected void doPush(Record r) {
   try {
      long startTime = System.nanoTime();
      this.queue.put(r);
      waitWriterTime += System.nanoTime() - startTime;
           memoryBytes.addAndGet(r.getMemorySize());
   } catch (InterruptedException ex) {
      Thread.currentThread().interrupt();
   }
}
protected Record doPull() {
   try {
      long startTime = System.nanoTime();
      Record r = this.queue.take();
      waitReaderTime += System.nanoTime() - startTime;
      memoryBytes.addAndGet(-r.getMemorySize());
      return r;
   } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      throw new IllegalStateException(e);
   }
}

job

整个抽取流程是个job,可以拆成多个task

task

通过RecordSender从数据源读数据,写入到channel

通过RecordReceiver从channel中读取数据,写入目标数据源

image-20200105215735411

com.alibaba.datax.core.taskgroup.TaskGroupContainer#start

// core.container.taskGroup.channel默认等于5
int channelNumber = this.configuration.getInt(
        CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL);


int channelsPerTaskGroup = this.configuration.getInt(
                CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, 5);
{
    "core": {
       "transport" : {
          "channel": {
             "speed": {
                "record": 100,
                "byte": 100
             }
          }
       }
    },
    "job": {
      "setting": {
        "speed": {
          "record": 500,
          "byte": 1000,
          "channel" : 1
        }
      }
    }
}

首先计算按照字节数限速,channel的数目应该为 500 / 100 = 5

然后按照记录数限速, channel的数目应该为 1000 / 100 = 10

最后返回两者的最小值 5。虽然指定了channel为1, 但只有在没有限速的条件下,才会使用。

adjustChannelNumber方法,实现了上述功能

com.alibaba.datax.core.job.JobContainer#adjustChannelNumber

private void adjustChannelNumber() {
    int needChannelNumberByByte = Integer.MAX_VALUE;
    int needChannelNumberByRecord = Integer.MAX_VALUE;

    // 是否指定字节数限速
    boolean isByteLimit = (this.configuration.getInt(
            CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 0) > 0);
    if (isByteLimit) {
        // 总的限速字节数
        long globalLimitedByteSpeed = this.configuration.getInt(
                CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 10 * 1024 * 1024);

        // 单个Channel的字节数
        Long channelLimitedByteSpeed = this.configuration
                .getLong(CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE);
        if (channelLimitedByteSpeed == null || channelLimitedByteSpeed <= 0) {
            DataXException.asDataXException(
                    FrameworkErrorCode.CONFIG_ERROR,
                    "在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数");
        }
        // 计算所需要的channel数目
        needChannelNumberByByte =
                (int) (globalLimitedByteSpeed / channelLimitedByteSpeed);
        needChannelNumberByByte =
                needChannelNumberByByte > 0 ? needChannelNumberByByte : 1;
    }
    // 是否指定记录数限速
    boolean isRecordLimit = (this.configuration.getInt(
            CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 0)) > 0;
    if (isRecordLimit) {
        // 总的限速记录数
        long globalLimitedRecordSpeed = this.configuration.getInt(
                CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 100000);
        // 获取单个channel的限定的记录数
        Long channelLimitedRecordSpeed = this.configuration.getLong(
                CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD);
        if (channelLimitedRecordSpeed == null || channelLimitedRecordSpeed <= 0) {
            DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR,
                    "在有总tps限速条件下,单个channel的tps值不能为空,也不能为非正数");
        }
        // 计算所需要的channel数目
        needChannelNumberByRecord =
                (int) (globalLimitedRecordSpeed / channelLimitedRecordSpeed);
        needChannelNumberByRecord =
                needChannelNumberByRecord > 0 ? needChannelNumberByRecord : 1;
        LOG.info("Job set Max-Record-Speed to " + globalLimitedRecordSpeed + " records.");
    }

    // 取较小值
    this.needChannelNumber = needChannelNumberByByte < needChannelNumberByRecord ?
            needChannelNumberByByte : needChannelNumberByRecord;

    // 返回最小值
    if (this.needChannelNumber < Integer.MAX_VALUE) {
        return;
    }

    // 是否指定了channel数目
    boolean isChannelLimit = (this.configuration.getInt(
            CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL, 0) > 0);
    if (isChannelLimit) {
        this.needChannelNumber = this.configuration.getInt(
                CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL);

        LOG.info("Job set Channel-Number to " + this.needChannelNumber
                + " channels.");

        return;
    }

    throw DataXException.asDataXException(
            FrameworkErrorCode.CONFIG_ERROR,
            "Job运行速度必须设置");
}

如何解决reader端,select单表数据了很大

reference

https://zhmin.github.io/2018/12/28/datax-job/