// 生成模版
python datax.py -r mysqlreader -w mysqlwriter > mysql2mysql.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/sally"],
"table": ["account"]
}
],
"password": "123456",
"username": "test",
"where": ""
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/wzcredit2",
"table": ["account"]
}
],
"password": "123456",
"preSql": [],
"session": [],
"username": "test",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": "3"
}
}
}
}
// 执行数据同步
python datax.py mysql2mysql.json
相当于消息队列,readerTask将任务放入channel,writerTask消费channel里的数据
读和写公用同一个channel,各自用BufferedRecordExchanger封装了channel,加了一层内存缓存,写任务先读内存buffer,buffer没有,则将channel的数据放入buffer,读任务先写内存buffer,buffer满了,再写入channel
public class MemoryChannel extends Channel {
// 等待Reader处理完的时间,也就是pull的时间,继承自Channel
protected volatile long waitReaderTime = 0;
// 等待Writer处理完的时间,也就是push的时间,继承自Channel
protected volatile long waitWriterTime = 0;
// Channel里面保存的数据大小
private AtomicInteger memoryBytes = new AtomicInteger(0);
// 存放记录的queue
private ArrayBlockingQueue<Record> queue = null;
}
protected void doPush(Record r) {
try {
long startTime = System.nanoTime();
this.queue.put(r);
waitWriterTime += System.nanoTime() - startTime;
memoryBytes.addAndGet(r.getMemorySize());
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
protected Record doPull() {
try {
long startTime = System.nanoTime();
Record r = this.queue.take();
waitReaderTime += System.nanoTime() - startTime;
memoryBytes.addAndGet(-r.getMemorySize());
return r;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}
整个抽取流程是个job,可以拆成多个task
通过RecordSender从数据源读数据,写入到channel
通过RecordReceiver从channel中读取数据,写入目标数据源
com.alibaba.datax.core.taskgroup.TaskGroupContainer#start
// core.container.taskGroup.channel默认等于5
int channelNumber = this.configuration.getInt(
CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL);
int channelsPerTaskGroup = this.configuration.getInt(
CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, 5);
{
"core": {
"transport" : {
"channel": {
"speed": {
"record": 100,
"byte": 100
}
}
}
},
"job": {
"setting": {
"speed": {
"record": 500,
"byte": 1000,
"channel" : 1
}
}
}
}
首先计算按照字节数限速,channel的数目应该为 500 / 100 = 5
然后按照记录数限速, channel的数目应该为 1000 / 100 = 10
最后返回两者的最小值 5。虽然指定了channel为1, 但只有在没有限速的条件下,才会使用。
adjustChannelNumber方法,实现了上述功能
com.alibaba.datax.core.job.JobContainer#adjustChannelNumber
private void adjustChannelNumber() {
int needChannelNumberByByte = Integer.MAX_VALUE;
int needChannelNumberByRecord = Integer.MAX_VALUE;
// 是否指定字节数限速
boolean isByteLimit = (this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 0) > 0);
if (isByteLimit) {
// 总的限速字节数
long globalLimitedByteSpeed = this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 10 * 1024 * 1024);
// 单个Channel的字节数
Long channelLimitedByteSpeed = this.configuration
.getLong(CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE);
if (channelLimitedByteSpeed == null || channelLimitedByteSpeed <= 0) {
DataXException.asDataXException(
FrameworkErrorCode.CONFIG_ERROR,
"在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数");
}
// 计算所需要的channel数目
needChannelNumberByByte =
(int) (globalLimitedByteSpeed / channelLimitedByteSpeed);
needChannelNumberByByte =
needChannelNumberByByte > 0 ? needChannelNumberByByte : 1;
}
// 是否指定记录数限速
boolean isRecordLimit = (this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 0)) > 0;
if (isRecordLimit) {
// 总的限速记录数
long globalLimitedRecordSpeed = this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 100000);
// 获取单个channel的限定的记录数
Long channelLimitedRecordSpeed = this.configuration.getLong(
CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD);
if (channelLimitedRecordSpeed == null || channelLimitedRecordSpeed <= 0) {
DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR,
"在有总tps限速条件下,单个channel的tps值不能为空,也不能为非正数");
}
// 计算所需要的channel数目
needChannelNumberByRecord =
(int) (globalLimitedRecordSpeed / channelLimitedRecordSpeed);
needChannelNumberByRecord =
needChannelNumberByRecord > 0 ? needChannelNumberByRecord : 1;
LOG.info("Job set Max-Record-Speed to " + globalLimitedRecordSpeed + " records.");
}
// 取较小值
this.needChannelNumber = needChannelNumberByByte < needChannelNumberByRecord ?
needChannelNumberByByte : needChannelNumberByRecord;
// 返回最小值
if (this.needChannelNumber < Integer.MAX_VALUE) {
return;
}
// 是否指定了channel数目
boolean isChannelLimit = (this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL, 0) > 0);
if (isChannelLimit) {
this.needChannelNumber = this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL);
LOG.info("Job set Channel-Number to " + this.needChannelNumber
+ " channels.");
return;
}
throw DataXException.asDataXException(
FrameworkErrorCode.CONFIG_ERROR,
"Job运行速度必须设置");
}
https://zhmin.github.io/2018/12/28/datax-job/