kafka       

kafka

队列->分片->副本

副本:选主、主从交互、主从数据同步

分区副本有主从

broker集群(有中心)也有主从,主broker被称为controller(管理集群成员、维护主题、操作元数据等),通过zk选举产生

网络通信模型

image-20191204220047825

(1)Acceptor:1个接收线程,负责监听新的连接请求,同时注册OP_ACCEPT 事件,将新的连接按照“round robin”方式交给对应的 Processor 线程处理; (2)Processor:N个处理器线程,其中每个 Processor 都有自己的 selector,它会向 Acceptor 分配的 SocketChannel 注册相应的 OP_READ 事件,N 的大小由“num.networker.threads”决定; (3)KafkaRequestHandler:M个请求处理线程,包含在线程池—KafkaRequestHandlerPool内部,从RequestChannel的全局请求队列—requestQueue中获取请求数据并交给KafkaApis处理,M的大小由“num.io.threads”决定; (4)RequestChannel:其为Kafka服务端的请求通道,该数据结构中包含了一个全局的请求队列 requestQueue和多个与Processor处理器相对应的响应队列responseQueue,提供给Processor与请求处理线程KafkaRequestHandler和KafkaApis交换数据的地方。 (5)NetworkClient:其底层是对 Java NIO 进行相应的封装,位于Kafka的网络接口层。Kafka消息生产者对象—KafkaProducer的send方法主要调用NetworkClient完成消息发送; (6)SocketServer:其是一个NIO的服务,它同时启动一个Acceptor接收线程和多个Processor处理器线程。提供了一种典型的Reactor多线程模式,将接收客户端请求和处理请求相分离; (7)KafkaServer:代表了一个Kafka Broker的实例;其startup方法为实例启动的入口; (8)KafkaApis:Kafka的业务逻辑处理Api,负责处理不同类型的请求;比如“发送消息”“获取消息偏移量—offset”“处理心跳请求”等;

acceptor

  def run() {
    // 注册accept事件
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    startupComplete()
    try {
      var currentProcessorIndex = 0
      while (isRunning) {
        try {

          val ready = nioSelector.select(500)
          if (ready > 0) {
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            while (iter.hasNext && isRunning) {
              try {
                val key = iter.next
                iter.remove()

                if (key.isAcceptable) {
                  accept(key).foreach { socketChannel =>

                    // Assign the channel to the next processor (using round-robin) to which the
                    // channel can be added without blocking. If newConnections queue is full on
                    // all processors, block until the last one is able to accept a connection.
                    var retriesLeft = synchronized(processors.length)
                    var processor: Processor = null
                    do {
                      retriesLeft -= 1
                      processor = synchronized {
                        // adjust the index (if necessary) and retrieve the processor atomically for
                        // correct behaviour in case the number of processors is reduced dynamically						
                        // round robin算法找到处理线程
                        currentProcessorIndex = currentProcessorIndex % processors.length
                        processors(currentProcessorIndex)
                      }
                      currentProcessorIndex += 1
                      // 处理连接
                    } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
                  }
                } else
                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")
              } catch {
                case e: Throwable => error("Error while accepting connection", e)
              }
            }
          }
        }
        catch {
          // We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
          // to a select operation on a specific channel or a bad request. We don't want
          // the broker to stop responding to requests from other clients in these scenarios.
          case e: ControlThrowable => throw e
          case e: Throwable => error("Error occurred", e)
        }
      }
    } finally {
      debug("Closing server socket and selector.")
      CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)
      CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)
      shutdownComplete()
    }
  }


  private def assignNewConnection(socketChannel: SocketChannel, processor: Processor, mayBlock: Boolean): Boolean = {
    if (processor.accept(socketChannel, mayBlock, blockedPercentMeter)) {
      debug(s"Accepted connection from ${socketChannel.socket.getRemoteSocketAddress} on" +
        s" ${socketChannel.socket.getLocalSocketAddress} and assigned it to processor ${processor.id}," +
        s" sendBufferSize [actual|requested]: [${socketChannel.socket.getSendBufferSize}|$sendBufferSize]" +
        s" recvBufferSize [actual|requested]: [${socketChannel.socket.getReceiveBufferSize}|$recvBufferSize]")
      true
    } else
      false
  }

processor

kafka.network.Processor#accept

private val newConnections = new ArrayBlockingQueue[SocketChannel](connectionQueueSize)
private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()


	def accept(socketChannel: SocketChannel,
             mayBlock: Boolean,
             acceptorIdlePercentMeter: com.yammer.metrics.core.Meter): Boolean = {
    val accepted = {
      // 加入队列newConnection,队列属于ArrayBlockingQueue
      if (newConnections.offer(socketChannel))
        true
      else if (mayBlock) {
        val startNs = time.nanoseconds
        newConnections.put(socketChannel)
        acceptorIdlePercentMeter.mark(time.nanoseconds() - startNs)
        true
      } else
        false
    }
    if (accepted)
    // 马上唤醒selector.pool(),继续从newConnections里拿新连接
      wakeup()
    accepted
  }

kafka.network.Processor#run
 override def run() {
    startupComplete()
    try {
      while (isRunning) {
        try {
          // setup any new connections that have been queued up
          //  newConnections.poll(),从队列中拿到连接成功的socket,并注册到nio selector
          configureNewConnections()
          // register any new responses for writing
          // 处理响应队列responseQueue里的响应,注册OP_WRITE到nio selector
          processNewResponses()
          // selector.poll()监听事件
          poll()
          // read事件被触发;selector.completedReceives.asScala.foreach;将请求Request添加至requestChannel的全局请求队列—requestQueue中,等待KafkaRequestHandler来处理
          processCompletedReceives()
          // write事件被触发;selector.completedSends.asScala.foreach,将response发送给客户端,则将其从inflightResponses移除,同时通过调用“selector.unmute”方法为对应的连接通道重新注册OP_READ事件
          processCompletedSends()
          // 断开事件被触发;处理断开连接的队列;selector.disconnected.keySet.asScala.foreach;将该response从inflightResponses集合中移除,同时将connectionQuotas统计计数减1
          processDisconnected()
          closeExcessConnections()
        } catch {
          // We catch all the throwables here to prevent the processor thread from exiting. We do this because
          // letting a processor exit might cause a bigger impact on the broker. This behavior might need to be
          // reviewed if we see an exception that needs the entire broker to stop. Usually the exceptions thrown would
          // be either associated with a specific socket channel or a bad request. These exceptions are caught and
          // processed by the individual methods above which close the failing channel and continue processing other
          // channels. So this catch block should only ever see ControlThrowables.
          case e: Throwable => processException("Processor got uncaught exception.", e)
        }
      }
    } finally {
      debug(s"Closing selector - processor $id")
      CoreUtils.swallow(closeAll(), this, Level.ERROR)
      shutdownComplete()
    }
  }

  // `protected` for test usage
  protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) {
    val connectionId = response.request.context.connectionId
    trace(s"Socket server received response to send to $connectionId, registering for write and sending data: $response")
    // `channel` can be None if the connection was closed remotely or if selector closed it for being idle for too long
    if (channel(connectionId).isEmpty) {
      warn(s"Attempting to send response via channel for which there is no open connection, connection id $connectionId")
      response.request.updateRequestMetrics(0L, response)
    }
    // Invoke send for closingChannel as well so that the send is failed and the channel closed properly and
    // removed from the Selector after discarding any pending staged receives.
    // `openOrClosingChannel` can be None if the selector closed the connection because it was idle for too long
    if (openOrClosingChannel(connectionId).isDefined) {
      // 通过“selector.send”注册OP_WRITE事件
      selector.send(responseSend)
      // 将该Response从responseQueue响应队列中移至inflightResponses集合中
      inflightResponses += (connectionId -> response)
    }
  }


requestChannel

RequestChannel为Processor处理器线程与KafkaRequestHandler线程之间的数据交换提供了一个数据缓冲区,

class RequestChannel(val queueSize: Int, val metricNamePrefix : String) extends KafkaMetricsGroup {
  import RequestChannel._
  val metrics = new RequestChannel.Metrics
  // 请求队列
  private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
  // 线程组
  private val processors = new ConcurrentHashMap[Int, Processor]()
  val requestQueueSizeMetricName = metricNamePrefix.concat(RequestQueueSizeMetric)
  val responseQueueSizeMetricName = metricNamePrefix.concat(ResponseQueueSizeMetric)
}

KafkaRequestHandler

KafkaRequestHandler也是一种线程类,在KafkaServer实例启动时候会实例化一个线程池—KafkaRequestHandlerPool对象(包含了若干个KafkaRequestHandler线程),这些线程以守护线程的方式在后台运行。在KafkaRequestHandler的run方法中会循环地从RequestChannel中阻塞式读取request,读取后再交由KafkaApis来具体处理。

  def run() {
    while (!stopped) {
      // We use a single meter for aggregate idle percentage for the thread pool.
      // Since meter is calculated as total_recorded_value / time_window and
      // time_window is independent of the number of threads, each recorded idle
      // time should be discounted by # threads.
      val startSelectTime = time.nanoseconds

      // 有300毫秒超时时间
      val req = requestChannel.receiveRequest(300)
      val endTime = time.nanoseconds
      val idleTime = endTime - startSelectTime
      aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)

      req match {
        case RequestChannel.ShutdownRequest =>
          debug(s"Kafka request handler $id on broker $brokerId received shut down command")
          shutdownComplete.countDown()
          return

        case request: RequestChannel.Request =>
          try {
            request.requestDequeueTimeNanos = endTime
            trace(s"Kafka request handler $id on broker $brokerId handling request $request")
            // 正常io请求,让apis处理
            apis.handle(request)
          } catch {
            case e: FatalExitError =>
              shutdownComplete.countDown()
              Exit.exit(e.statusCode)
            case e: Throwable => error("Exception when handling request", e)
          } finally {
            request.releaseBuffer()
          }

        case null => // continue
      }
    }
    shutdownComplete.countDown()
  }
class KafkaRequestHandlerPool(val brokerId: Int,
                              val requestChannel: RequestChannel,
                              val apis: KafkaApis,
                              time: Time,
                              numThreads: Int,
                              requestHandlerAvgIdleMetricName: String,
                              logAndThreadNamePrefix : String) extends Logging with KafkaMetricsGroup {

  private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
  /* a meter to track the average free capacity of the request handlers */
  private val aggregateIdleMeter = newMeter(requestHandlerAvgIdleMetricName, "percent", TimeUnit.NANOSECONDS)

  this.logIdent = "[" + logAndThreadNamePrefix + " Kafka Request Handler on Broker " + brokerId + "], "
  val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
  for (i <- 0 until numThreads) {
    // 创建KafkaRequestHandler线程
    createHandler(i)
  }

  def createHandler(id: Int): Unit = synchronized {
    runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
    KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()
  }
}

KafkaApis

  def handle(request: RequestChannel.Request) {
    try {
      trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
        s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
      request.header.apiKey match {
        case ApiKeys.PRODUCE => handleProduceRequest(request)
        case ApiKeys.FETCH => handleFetchRequest(request)
        case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
        case ApiKeys.METADATA => handleTopicMetadataRequest(request)
        case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
        case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
        case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request)
        case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
        case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
        case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
        case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
        case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
        case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
        case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
        case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
        case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
        case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
        case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
        case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
        case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
        case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
        case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
        case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request)
        case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
        case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request)
        case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request)
        case ApiKeys.END_TXN => handleEndTxnRequest(request)
        case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request)
        case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request)
        case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
        case ApiKeys.CREATE_ACLS => handleCreateAcls(request)
        case ApiKeys.DELETE_ACLS => handleDeleteAcls(request)
        case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)
        case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
        case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)
        case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
        case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
        case ApiKeys.CREATE_PARTITIONS => handleCreatePartitionsRequest(request)
        case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateTokenRequest(request)
        case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewTokenRequest(request)
        case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request)
        case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
        case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)
        case ApiKeys.ELECT_PREFERRED_LEADERS => handleElectPreferredReplicaLeader(request)
        case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigsRequest(request)
      }
    } catch {
      case e: FatalExitError => throw e
      case e: Throwable => handleError(request, e)
    } finally {
      request.apiLocalCompleteTimeNanos = time.nanoseconds
    }
  }

主题

image-20191108092730192

一个主题多个分区

image-20191211105451796

image-20191211105517601

增加主题

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

指定副本数和分区数,新增主题涉及的操作有分区、副本状态的转化、分区leader的分配、分区存储日志的创建

副本

image-20191108151921837

image-20191108152125545

  1. 每个分区在创建时都要选举一个副本,称为领导者副本,其余的副本自动称为追随者副本。

  2. 追随者副本是不对外提供服务的,它唯一的任务就是从领导者副本异步拉取消息,并写入到自己的提交日志中,从而实现与领导者副本的同步。
  3. 当领导者副本挂掉了,或者说领导者副本所在的 Broker 宕机时,Kafka 依托于 ZooKeeper 提供的监控功能能够实时感知到,并立即开启新一轮的领导者选举,从追随者副本中选一个作为新的领导者。老 Leader 副本重启回来后,只能作为追随者副本加入到集群中。

In-sync Replicas(ISR)

所有与leader同步的副本,包括leader

如何算同步:

Broker 端参数 replica.lag.time.max.ms 参数值。这个参数的含义是 Follower 副本能够落后 Leader 副本的最长时间间隔,当前默认值是 10 秒。这就是说,只要一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。

领导者选举

Kafka 把所有不在 ISR 中的存活副本都称为非同步副本。通常来说,非同步副本落后 Leader 太多,因此,如果选择这些副本作为新 Leader,就可能出现数据的丢失。毕竟,这些副本中保存的消息远远落后于老 Leader 中的消息。在 Kafka 中,选举这种副本的过程称为 Unclean 领导者选举。Broker 端参数 unclean.leader.election.enable 控制是否允许 Unclean 领导者选举。

默认不开启该设置

如何选举

kafka通过轮询算法保证副本是均匀分布在多个broker上

存储

Partition的数据文件

Partition中的每条Message由offset来表示它在这个partition中的偏移量,这个offset不是该Message在partition数据文件中的实际存储位置,而是逻辑上一个值,它唯一确定了partition中的一条Message。因此,可以认为offset是partition中Message的id。partition中的每条Message包含了以下三个属性:

我们来思考一下,如果一个partition只有一个数据文件会怎么样?

  1. 新数据是添加在文件末尾(调用FileMessageSet的append方法),不论文件数据文件有多大,这个操作永远都是O(1)的。
  2. 查找某个offset的Message(调用FileMessageSet的searchFor方法)是顺序查找的。因此,如果数据文件很大的话,查找的效率就低。

那Kafka是如何解决查找效率的的问题呢?有两大法宝:1) 分段 2) 索引。

为数据文件建索引

数据文件分段使得可以在一个较小的数据文件中查找对应offset的Message了,但是这依然需要顺序扫描才能找到对应offset的Message。为了进一步提高查找的效率,Kafka为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为.index。 索引文件中包含若干个索引条目,每个条目表示数据文件中一条Message的索引。索引包含两个部分(均为4个字节的数字),分别为相对offset和position。

partition是分段的,每个段叫LogSegment,包括了一个数据文件和一个索引文件,下图是某个partition目录下的文件: partition 可以看到,这个partition有4个LogSegment。

image-20211122205205160

比如:要查找绝对offset为7的Message:

  1. 首先是用二分查找确定它是在哪个LogSegment中,自然是在第一个Segment中。
  2. 打开这个Segment的index文件,也是用二分查找找到offset小于或者等于指定offset的索引条目中最大的那个offset。自然offset为6的那个索引是我们要找的,通过索引文件我们知道offset为6的Message在数据文件中的位置为9807。
  3. 打开数据文件,从位置为9807的那个地方开始顺序扫描直到找到offset为7的那条Message。

这套机制是建立在offset是有序的。索引文件被映射到内存中,所以查找的速度还是很快的。

一句话,Kafka的Message存储采用了分区(partition),分段(LogSegment)和稀疏索引这几个手段来达到了高效性。

生产者

在创建 KafkaProducer 实例时,生产者应用会在后台创建并启动一个名为 Sender 的线程,该 Sender 线程开始运行时首先会创建与 Broker 的连接,连接 bootstrap.servers 参数指定的所有 Broker

Producer 每 5 分钟都会强制刷新一次元数据以保证它是最及时的数据

image-20191107221809350

sender任务从RecordAccumulator取消息
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();

image-20191107223940194

无丢失消息

  1. 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。
  2. 设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
  3. 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
  4. 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
  5. 设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
  6. 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
  7. 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。
  8. 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。

消息重发

最多一次(at most once):消息可能会丢失,但绝不会被重复发送。

至少一次(at least once):消息不会丢失,但有可能被重复发送。

精确一次(exactly once):消息不会丢失,也不会被重复发送。

倘若消息成功“提交”,但 Broker 的应答没有成功发送回 Producer 端(比如网络出现瞬时抖动),那么 Producer 就无法确定消息是否真的提交成功了。因此,它只能选择重试,也就是再次发送相同的消息。这就是 Kafka 默认提供至少一次可靠性保障的原因,不过这会导致消息重复发送

幂等

去重重发消息设置。

指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可,即 props.put(“enable.idempotence”, ture),或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)。enable.idempotence 被设置成 true 后,Producer 自动升级成幂等性 Producer,其他所有的代码逻辑都不需要改变。Kafka 自动帮你做消息的重复去重

  1. 它只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性
  2. 它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了

事务

  1. 和幂等性 Producer 一样,开启 enable.idempotence = true。
  2. 设置 Producer 端参数 transactional. id。最好为其设置一个有意义的名字。
    1. read_uncommitted:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。
    2. read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。

事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理

producer.initTransactions();
try {
            producer.beginTransaction();
            producer.send(record1);
            producer.send(record2);
            producer.commitTransaction();
} catch (KafkaException e) {
            producer.abortTransaction();
}

事务能够保证跨分区、跨会话间的幂等性

客户端生产者

多线程单队列

image-20191212095641531

image-20191212135157447

KafkaProducer实例化

    KafkaProducer(Map<String, Object> configs,
                  Serializer<K> keySerializer,
                  Serializer<V> valueSerializer,
                  ProducerMetadata metadata,
                  KafkaClient kafkaClient,
                  ProducerInterceptors interceptors,
                  Time time) {
        ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer,
                valueSerializer));
        try {
            Map<String, Object> userProvidedConfigs = config.originals();
            this.producerConfig = config;
            this.time = time;
            String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
            if (clientId.length() <= 0)
                clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
            this.clientId = clientId;

            String transactionalId = userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ?
                    (String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null;
            LogContext logContext;
            if (transactionalId == null)
                logContext = new LogContext(String.format("[Producer clientId=%s] ", clientId));
            else
                logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", clientId, transactionalId));
            log = logContext.logger(KafkaProducer.class);
            log.trace("Starting the Kafka producer");

            Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
            MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
                    .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
                    .recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
                    .tags(metricTags);
            List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                    MetricsReporter.class,
                    Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
            reporters.add(new JmxReporter(JMX_PREFIX));
            this.metrics = new Metrics(metricConfig, reporters, time);
            this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
            long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
            if (keySerializer == null) {
                this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                                                                                         Serializer.class);
                this.keySerializer.configure(config.originals(), true);
            } else {
                config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
                this.keySerializer = keySerializer;
            }
            if (valueSerializer == null) {
                this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                                                                                           Serializer.class);
                this.valueSerializer.configure(config.originals(), false);
            } else {
                config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
                this.valueSerializer = valueSerializer;
            }

            // load interceptors and make sure they get clientId
            userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
            ProducerConfig configWithClientId = new ProducerConfig(userProvidedConfigs, false);
            List<ProducerInterceptor<K, V>> interceptorList = (List) configWithClientId.getConfiguredInstances(
                    ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);
            if (interceptors != null)
                this.interceptors = interceptors;
            else
                this.interceptors = new ProducerInterceptors<>(interceptorList);
            ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer,
                    valueSerializer, interceptorList, reporters);
            this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
            this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
            this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));

            this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
            this.transactionManager = configureTransactionState(config, logContext, log);
            int deliveryTimeoutMs = configureDeliveryTimeout(config, log);

            this.apiVersions = new ApiVersions();
            this.accumulator = new RecordAccumulator(logContext,
                    config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                    this.compressionType,
                    lingerMs(config),
                    retryBackoffMs,
                    deliveryTimeoutMs,
                    metrics,
                    PRODUCER_METRIC_GROUP_NAME,
                    time,
                    apiVersions,
                    transactionManager,
                    new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));
            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
                    config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
                    config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG));
            if (metadata != null) {
                this.metadata = metadata;
            } else {
                this.metadata = new ProducerMetadata(retryBackoffMs,
                        config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
                        logContext,
                        clusterResourceListeners,
                        Time.SYSTEM);
              	// 初始化了bootstrap地址
                this.metadata.bootstrap(addresses, time.milliseconds());
            }
            this.errors = this.metrics.sensor("errors");
            this.sender = newSender(logContext, kafkaClient, this.metadata);
            String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
            this.ioThread.start();
            config.logUnused();
            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
            log.debug("Kafka producer started");
        } catch (Throwable t) {
            // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
            close(Duration.ofMillis(0), true);
            // now propagate the exception
            throw new KafkaException("Failed to construct kafka producer", t);
        }
    }

发送消息

org.apache.kafka.clients.producer.KafkaProducer#doSend

   private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;
        try {
            throwIfProducerClosed();
            // first make sure the metadata for the topic is available
            ClusterAndWaitTime clusterAndWaitTime;
            try {
              	// 获取topic的meta信息,如果没有,则需要发送获取meta请求
                clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
            } catch (KafkaException e) {
                if (metadata.isClosed())
                    throw new KafkaException("Producer closed while send in progress", e);
                throw e;
            }
            long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
            Cluster cluster = clusterAndWaitTime.cluster;
            byte[] serializedKey;
            try {
                serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in key.serializer", cce);
            }
            byte[] serializedValue;
            try {
                serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in value.serializer", cce);
            }
            // 分区
            int partition = partition(record, serializedKey, serializedValue, cluster);
            tp = new TopicPartition(record.topic(), partition);

            setReadOnly(record.headers());
            Header[] headers = record.headers().toArray();

            int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                    compressionType, serializedKey, serializedValue, headers);
            ensureValidRecordSize(serializedSize);
            long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
            log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
            // producer callback will make sure to call both 'callback' and interceptor callback
            Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

            if (transactionManager != null && transactionManager.isTransactional())
                transactionManager.maybeAddPartitionToTransaction(tp);

          	// 将请求放入RecordAccumulator,队列的封装
            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs);
            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
            return result.future;
            // handling exceptions and record the errors;
            // for API exceptions return them in the future,
            // for other exceptions throw directly
        } catch (ApiException e) {
          ...
        }
    }
public RecordAppendResult append(TopicPartition tp,
                                 long timestamp,
                                 byte[] key,
                                 byte[] value,
                                 Header[] headers,
                                 Callback callback,
                                 long maxTimeToBlock) throws InterruptedException {
    // We keep track of the number of appending thread to make sure we do not miss batches in
    // abortIncompleteBatches().
    appendsInProgress.incrementAndGet();
    ByteBuffer buffer = null;
    if (headers == null) headers = Record.EMPTY_HEADERS;
    try {
        // check if we have an in-progress batch
      	// 找到分区对应的消息队列
        Deque<ProducerBatch> dq = getOrCreateDeque(tp);
        synchronized (dq) {
            if (closed)
                throw new KafkaException("Producer closed while send in progress");
            RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
            if (appendResult != null)
                return appendResult;
        }

        // we don't have an in-progress record batch try to allocate a new batch
        byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
        int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
        log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
        buffer = free.allocate(size, maxTimeToBlock);
        synchronized (dq) {
            // Need to check if producer is closed again after grabbing the dequeue lock.
            if (closed)
                throw new KafkaException("Producer closed while send in progress");

            RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
            if (appendResult != null) {
                // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                return appendResult;
            }

            MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
            ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
            FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));

            dq.addLast(batch);
            incomplete.add(batch);

            // Don't deallocate this buffer in the finally block as it's being used in the record batch
            buffer = null;
            return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
        }
    } finally {
        if (buffer != null)
            free.deallocate(buffer);
        appendsInProgress.decrementAndGet();
    }
}
public final class RecordAccumulator {

    private final Logger log;
    private volatile boolean closed;
    private final AtomicInteger flushesInProgress;
    private final AtomicInteger appendsInProgress;
    private final int batchSize;
    private final CompressionType compression;
    private final int lingerMs;
    private final long retryBackoffMs;
    private final int deliveryTimeoutMs;
    private final BufferPool free;
    private final Time time;
    private final ApiVersions apiVersions;
  	// 一个分区一个消息队列
    private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
    private final IncompleteBatches incomplete;
    // The following variables are only accessed by the sender thread, so we don't need to protect them.
    private final Map<TopicPartition, Long> muted;
    private int drainIndex;
    private final TransactionManager transactionManager;
    private long nextBatchExpiryTimeMs = Long.MAX_VALUE; // the earliest time (absolute) a batch will expire.

    private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
        // add topic to metadata topic list if it is not there already and reset expiry
        Cluster cluster = metadata.fetch();

        if (cluster.invalidTopics().contains(topic))
            throw new InvalidTopicException(topic);

        metadata.add(topic);

        Integer partitionsCount = cluster.partitionCountForTopic(topic);
        // Return cached metadata if we have it, and if the record's partition is either undefined
        // or within the known partition range
        if (partitionsCount != null && (partition == null || partition < partitionsCount))
            return new ClusterAndWaitTime(cluster, 0);

        long begin = time.milliseconds();
        long remainingWaitMs = maxWaitMs;
        long elapsed;
        // Issue metadata requests until we have metadata for the topic and the requested partition,
        // or until maxWaitTimeMs is exceeded. This is necessary in case the metadata
        // is stale and the number of partitions for this topic has increased in the meantime.
        do {
            if (partition != null) {
                log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
            } else {
                log.trace("Requesting metadata update for topic {}.", topic);
            }
            metadata.add(topic);
          	// 设置更新标记
            int version = metadata.requestUpdate();
          	// 唤醒sender线程
            sender.wakeup();
            try {
            		// 等待meta更新完成  	
                metadata.awaitUpdate(version, remainingWaitMs);
            } catch (TimeoutException ex) {
                // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
                throw new TimeoutException(
                        String.format("Topic %s not present in metadata after %d ms.",
                                topic, maxWaitMs));
            }
            cluster = metadata.fetch();
            elapsed = time.milliseconds() - begin;
            if (elapsed >= maxWaitMs) {
                throw new TimeoutException(partitionsCount == null ?
                        String.format("Topic %s not present in metadata after %d ms.",
                                topic, maxWaitMs) :
                        String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
                                partition, topic, partitionsCount, maxWaitMs));
            }
            metadata.maybeThrowException();
            remainingWaitMs = maxWaitMs - elapsed;
            partitionsCount = cluster.partitionCountForTopic(topic);
          	// do while,死循环直到获取到partitions
        } while (partitionsCount == null || (partition != null && partition >= partitionsCount));

        return new ClusterAndWaitTime(cluster, elapsed);
    }
 public synchronized void awaitUpdate(final int lastVersion, final long timeoutMs) throws InterruptedException {
        long currentTimeMs = time.milliseconds();
        long deadlineMs = currentTimeMs + timeoutMs < 0 ? Long.MAX_VALUE : currentTimeMs + timeoutMs;
        time.waitObject(this, () -> {
            maybeThrowException();
            return updateVersion() > lastVersion || isClosed();
        }, deadlineMs);

        if (isClosed())
            throw new KafkaException("Requested metadata update after close");
    }

sender线程

org.apache.kafka.clients.producer.internals.Sender#run
org.apache.kafka.clients.producer.internals.Sender#runOnce
org.apache.kafka.clients.NetworkClient#poll

void runOnce() {
    if (transactionManager != null) {
        try {
            if (transactionManager.shouldResetProducerStateAfterResolvingSequences())
                // Check if the previous run expired batches which requires a reset of the producer state.
                transactionManager.resetProducerId();

            if (!transactionManager.isTransactional()) {
                // this is an idempotent producer, so make sure we have a producer id
                maybeWaitForProducerId();
            } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {
                transactionManager.transitionToFatalError(
                    new KafkaException("The client hasn't received acknowledgment for " +
                        "some previously sent messages and can no longer retry them. It isn't safe to continue."));
            } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest()) {
                // as long as there are outstanding transactional requests, we simply wait for them to return
                client.poll(retryBackoffMs, time.milliseconds());
                return;
            }

            // do not continue sending if the transaction manager is in a failed state or if there
            // is no producer id (for the idempotent case).
            if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) {
                RuntimeException lastError = transactionManager.lastError();
                if (lastError != null)
                    maybeAbortBatches(lastError);
                client.poll(retryBackoffMs, time.milliseconds());
                return;
            } else if (transactionManager.hasAbortableError()) {
                accumulator.abortUndrainedBatches(transactionManager.lastError());
            }
        } catch (AuthenticationException e) {
            // This is already logged as error, but propagated here to perform any clean ups.
            log.trace("Authentication exception while processing transactional request: {}", e);
            transactionManager.authenticationFailed(e);
        }
    }

    long currentTimeMs = time.milliseconds();
  	// 从accumulator拿出将要发送的数据,并发送client.send
    long pollTimeout = sendProducerData(currentTimeMs);
    client.poll(pollTimeout, currentTimeMs);
}
private long sendProducerData(long now) {
    Cluster cluster = metadata.fetch();
    // get the list of partitions with data ready to send
    RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

    // if there are any partitions whose leaders are not known yet, force metadata update
    if (!result.unknownLeaderTopics.isEmpty()) {
        // The set of topics with unknown leader contains topics with leader election pending as well as
        // topics which may have expired. Add the topic again to metadata to ensure it is included
        // and request metadata update, since there are messages to send to the topic.
        for (String topic : result.unknownLeaderTopics)
            this.metadata.add(topic);

        log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
            result.unknownLeaderTopics);
        this.metadata.requestUpdate();
    }

    // remove any nodes we aren't ready to send to
    Iterator<Node> iter = result.readyNodes.iterator();
    long notReadyTimeout = Long.MAX_VALUE;
    while (iter.hasNext()) {
        Node node = iter.next();
        if (!this.client.ready(node, now)) {
            iter.remove();
            notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
        }
    }

    // create produce requests
    Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
    addToInflightBatches(batches);
    if (guaranteeMessageOrder) {
        // Mute all the partitions drained
        for (List<ProducerBatch> batchList : batches.values()) {
            for (ProducerBatch batch : batchList)
                this.accumulator.mutePartition(batch.topicPartition);
        }
    }

    accumulator.resetNextBatchExpiryTime();
    List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
    List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
    expiredBatches.addAll(expiredInflightBatches);

    // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
    // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
    // we need to reset the producer id here.
    if (!expiredBatches.isEmpty())
        log.trace("Expired {} batches in accumulator", expiredBatches.size());
    for (ProducerBatch expiredBatch : expiredBatches) {
        String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
            + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
        failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
        if (transactionManager != null && expiredBatch.inRetry()) {
            // This ensures that no new batches are drained until the current in flight batches are fully resolved.
            transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
        }
    }
    sensors.updateProduceRequestMetrics(batches);

    // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
    // loop and try sending more data. Otherwise, the timeout will be the smaller value between next batch expiry
    // time, and the delay time for checking data availability. Note that the nodes may have data that isn't yet
    // sendable due to lingering, backing off, etc. This specifically does not include nodes with sendable data
    // that aren't ready to send since they would cause busy looping.
    long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
    pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
    pollTimeout = Math.max(pollTimeout, 0);
    if (!result.readyNodes.isEmpty()) {
        log.trace("Nodes with data ready to send: {}", result.readyNodes);
        // if some partitions are already ready to be sent, the select time would be 0;
        // otherwise if some partition already has some data accumulated but not ready yet,
        // the select time will be the time difference between now and its linger expiry time;
        // otherwise the select time will be the time difference between now and the metadata expiry time;
        pollTimeout = 0;
    }
    sendProduceRequests(batches, now);
    return pollTimeout;
}
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
    if (batches.isEmpty())
        return;

    Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
    final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());

    // find the minimum magic version used when creating the record sets
    byte minUsedMagic = apiVersions.maxUsableProduceMagic();
    for (ProducerBatch batch : batches) {
        if (batch.magic() < minUsedMagic)
            minUsedMagic = batch.magic();
    }

    for (ProducerBatch batch : batches) {
        TopicPartition tp = batch.topicPartition;
        MemoryRecords records = batch.records();

        // down convert if necessary to the minimum magic used. In general, there can be a delay between the time
        // that the producer starts building the batch and the time that we send the request, and we may have
        // chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use
        // the new message format, but found that the broker didn't support it, so we need to down-convert on the
        // client before sending. This is intended to handle edge cases around cluster upgrades where brokers may
        // not all support the same message format version. For example, if a partition migrates from a broker
        // which is supporting the new magic version to one which doesn't, then we will need to convert.
        if (!records.hasMatchingMagic(minUsedMagic))
            records = batch.records().downConvert(minUsedMagic, 0, time).records();
        produceRecordsByPartition.put(tp, records);
        recordsByPartition.put(tp, batch);
    }

    String transactionalId = null;
    if (transactionManager != null && transactionManager.isTransactional()) {
        transactionalId = transactionManager.transactionalId();
    }
    ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
            produceRecordsByPartition, transactionalId);
    RequestCompletionHandler callback = new RequestCompletionHandler() {
        public void onComplete(ClientResponse response) {
            handleProduceResponse(response, recordsByPartition, time.milliseconds());
        }
    };

    String nodeId = Integer.toString(destination);
    ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
            requestTimeoutMs, callback);
  	// 发送请求
    client.send(clientRequest, now);
    log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
    String destination = clientRequest.destination();
    RequestHeader header = clientRequest.makeHeader(request.version());
    if (log.isDebugEnabled()) {
        int latestClientVersion = clientRequest.apiKey().latestVersion();
        if (header.apiVersion() == latestClientVersion) {
            log.trace("Sending {} {} with correlation id {} to node {}", clientRequest.apiKey(), request,
                    clientRequest.correlationId(), destination);
        } else {
            log.debug("Using older server API v{} to send {} {} with correlation id {} to node {}",
                    header.apiVersion(), clientRequest.apiKey(), request, clientRequest.correlationId(), destination);
        }
    }
    Send send = request.toSend(destination, header);
    InFlightRequest inFlightRequest = new InFlightRequest(
            clientRequest,
            header,
            isInternalRequest,
            request,
            send,
            now);
    this.inFlightRequests.add(inFlightRequest);
  	// 暂存在channel
    selector.send(send);
}



	public void send(Send send) {
        String connectionId = send.destination();
        KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
        if (closingChannels.containsKey(connectionId)) {
            // ensure notification via `disconnected`, leave channel in the state in which closing was triggered
            this.failedSends.add(connectionId);
        } else {
            try {
                channel.setSend(send);
            } catch (Exception e) {
                // update the state for consistency, the channel will be discarded after `close`
                channel.state(ChannelState.FAILED_SEND);
                // ensure notification via `disconnected` when `failedSends` are processed in the next poll
                this.failedSends.add(connectionId);
                close(channel, CloseMode.DISCARD_NO_NOTIFY);
                if (!(e instanceof CancelledKeyException)) {
                    log.error("Unexpected exception during send, closing connection {} and rethrowing exception {}",
                            connectionId, e);
                    throw e;
                }
            }
        }
    }
    public List<ClientResponse> poll(long timeout, long now) {
        ensureActive();

        if (!abortedSends.isEmpty()) {
            // If there are aborted sends because of unsupported version exceptions or disconnects,
            // handle them immediately without waiting for Selector#poll.
            List<ClientResponse> responses = new ArrayList<>();
            handleAbortedSends(responses);
            completeResponses(responses);
            return responses;
        }
				// 如果需要更新meta,则发送获取meta请求
        long metadataTimeout = metadataUpdater.maybeUpdate(now);
        try {
            this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);
        }

        // process completed actions
        long updatedNow = this.time.milliseconds();
        List<ClientResponse> responses = new ArrayList<>();
        handleCompletedSends(responses, updatedNow);
        handleCompletedReceives(responses, updatedNow);
        handleDisconnections(responses, updatedNow);
        handleConnections();
        handleInitiateApiVersionRequests(updatedNow);
        handleTimedOutRequests(responses, updatedNow);
        completeResponses(responses);

        return responses;
    }
				public long maybeUpdate(long now) {
            // should we update our metadata?
            long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
            long waitForMetadataFetch = hasFetchInProgress() ? defaultRequestTimeoutMs : 0;

            long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);

            if (metadataTimeout > 0) {
                return metadataTimeout;
            }

            // Beware that the behavior of this method and the computation of timeouts for poll() are
            // highly dependent on the behavior of leastLoadedNode.
            Node node = leastLoadedNode(now);
            if (node == null) {
                log.debug("Give up sending metadata request since no node is available");
                return reconnectBackoffMs;
            }

            return maybeUpdate(now, node);
        }

				private long maybeUpdate(long now, Node node) {
            String nodeConnectionId = node.idString();

            if (canSendRequest(nodeConnectionId, now)) {
              	// 已建立连接
                Metadata.MetadataRequestAndVersion requestAndVersion = metadata.newMetadataRequestAndVersion();
                this.inProgressRequestVersion = requestAndVersion.requestVersion;
                MetadataRequest.Builder metadataRequest = requestAndVersion.requestBuilder;
                log.debug("Sending metadata request {} to node {}", metadataRequest, node);
     						// 发送meta请求,相当于生成者发送消息,最终调用selector.send
                sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
                return defaultRequestTimeoutMs;
            }

            // If there's any connection establishment underway, wait until it completes. This prevents
            // the client from unnecessarily connecting to additional nodes while a previous connection
            // attempt has not been completed.
            if (isAnyNodeConnecting()) {
                // Strictly the timeout we should return here is "connect timeout", but as we don't
                // have such application level configuration, using reconnect backoff instead.
                return reconnectBackoffMs;
            }

            if (connectionStates.canConnect(nodeConnectionId, now)) {
                // We don't have a connection to this node right now, make one
                log.debug("Initialize connection to node {} for sending metadata request", node);
              	// 如果还没有连接,则初始化连接
                initiateConnect(node, now);
                return reconnectBackoffMs;
            }

            // connected, but can't send more OR connecting
            // In either case, we just need to wait for a network event to let us know the selected
            // connection might be usable again.
            return Long.MAX_VALUE;
        }
 		private void initiateConnect(Node node, long now) {
        String nodeConnectionId = node.idString();
        try {
            connectionStates.connecting(nodeConnectionId, now, node.host(), clientDnsLookup);
            InetAddress address = connectionStates.currentAddress(nodeConnectionId);
            log.debug("Initiating connection to node {} using address {}", node, address);
            selector.connect(nodeConnectionId,
                    new InetSocketAddress(address, node.port()),
                    this.socketSendBuffer,
                    this.socketReceiveBuffer);
        } catch (IOException e) {
            log.warn("Error connecting to node {}", node, e);
            /* attempt failed, we'll try again after the backoff */
            connectionStates.disconnected(nodeConnectionId, now);
            /* maybe the problem is our metadata, update it */
            metadataUpdater.requestUpdate();
        }
    }
 public void send(Send send) {
        String connectionId = send.destination();
   			// 拿到一个socketChannel
        KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
        if (closingChannels.containsKey(connectionId)) {
            // ensure notification via `disconnected`, leave channel in the state in which closing was triggered
            this.failedSends.add(connectionId);
        } else {
            try {
              	// 设置请求到socketChannel
                channel.setSend(send);
            } catch (Exception e) {
                // update the state for consistency, the channel will be discarded after `close`
                channel.state(ChannelState.FAILED_SEND);
                // ensure notification via `disconnected` when `failedSends` are processed in the next poll
                this.failedSends.add(connectionId);
                close(channel, CloseMode.DISCARD_NO_NOTIFY);
                if (!(e instanceof CancelledKeyException)) {
                    log.error("Unexpected exception during send, closing connection {} and rethrowing exception {}",
                            connectionId, e);
                    throw e;
                }
            }
        }
    }
    public void setSend(Send send) {
        if (this.send != null)
            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
        this.send = send;
        this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
    }
    public void poll(long timeout) throws IOException {
        if (timeout < 0)
            throw new IllegalArgumentException("timeout should be >= 0");

        boolean madeReadProgressLastCall = madeReadProgressLastPoll;
        clear();

        boolean dataInBuffers = !keysWithBufferedRead.isEmpty();

        if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty() || (madeReadProgressLastCall && dataInBuffers))
            timeout = 0;

        if (!memoryPool.isOutOfMemory() && outOfMemory) {
            //we have recovered from memory pressure. unmute any channel not explicitly muted for other reasons
            log.trace("Broker no longer low on memory - unmuting incoming sockets");
            for (KafkaChannel channel : channels.values()) {
                if (channel.isInMutableState() && !explicitlyMutedChannels.contains(channel)) {
                    channel.maybeUnmute();
                }
            }
            outOfMemory = false;
        }

        /* check ready keys */
        long startSelect = time.nanoseconds();
        int numReadyKeys = select(timeout);
        long endSelect = time.nanoseconds();
        this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());

        if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
            Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();

            // Poll from channels that have buffered data (but nothing more from the underlying socket)
            if (dataInBuffers) {
                keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice
                Set<SelectionKey> toPoll = keysWithBufferedRead;
                keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed
                pollSelectionKeys(toPoll, false, endSelect);
            }

            // Poll from channels where the underlying socket has more data
            pollSelectionKeys(readyKeys, false, endSelect);
            // Clear all selected keys so that they are included in the ready count for the next select
            readyKeys.clear();

            pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
            immediatelyConnectedKeys.clear();
        } else {
            madeReadProgressLastPoll = true; //no work is also "progress"
        }

        long endIo = time.nanoseconds();
        this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());

        // Close channels that were delayed and are now ready to be closed
        completeDelayedChannelClose(endIo);

        // we use the time at the end of select to ensure that we don't close any connections that
        // have just been processed in pollSelectionKeys
        maybeCloseOldestConnection(endSelect);

        // Add to completedReceives after closing expired connections to avoid removing
        // channels with completed receives until all staged receives are completed.
        addToCompletedReceives();
    }

request,response头部,首先是一个定长的,4字节的头,表示完整数据包的大小,用于分包、粘包处理

在InFlightRequests中,存放了所有发出去,但是response还没有回来的request。request发出去的时候,入队;response回来,就把相对应的request出队

final class InFlightRequests {

    private final int maxInFlightRequestsPerConnection;
    private final Map<String, Deque<NetworkClient.InFlightRequest>> requests = new HashMap<>();
    /** Thread safe total number of in flight requests. */
    private final AtomicInteger inFlightRequestCount = new AtomicInteger(0);
 

服务端,保持同一个连接请求的顺序性,每当一个channel上面接收到一个request,这个channel就会被mute,然后等response返回之后,才会再unmute。这样就保证了同1个连接上面,同时只会有1个请求被处理

消费者

单线程

消费组

每个KafkaServer都有一个GroupCoordinator实例,管理多个消费者组,主要用于offset位移管理和Consumer Rebalance

协调者GroupCoodinator 和控制器KafkaController一样,在每一个broker都会启动一个GroupCoodinator,Kafka 按照消费者组的名称将其分配给对应的GroupCoodinator进行管理;每一个GroupCoodinator只负责管理一部分消费者组,而非集群中全部的消费者组。而每个消费组都有唯一的一个协调者。

image-20210701170451742

协调者主要是负责管理和协调消费者组的所有消费者,协调者是本身是一个broker节点服务,只是其主要用于为消费者组服务。如管理消费者之间的分区平衡操作、管理消费者的消费进度(消费偏移量)、监控消费者是否宕机、是否需要启动再平衡操作等。

协调者主要有两大功能:

  1. 作为消费组的偏移量存储介质(目前的偏移量不是存放在ZK上,而是存放在协调者上),记录消费者的对各个分区的消费进度。
  2. 作为消费者的分区平衡管理器。消费者通过定时给协调者发送心跳检测请求,以表示消费者的存活状态。协调者根据消费组中存活的消费者数量,按照分区分配算法给消费者均衡分配分区。

每个消费组都有唯一的一个协调者(消费组会通过offset保存的位置在哪个broker,就选举它作为这个消费组的coordinator),协调者会保存消费组相关的元数据、消费者提交分区的偏移量给协调者,协调者更新消费组元数据。除了更新到日志文件,还会保存到缓存中,提高查询效率。

消费者消费消息时,会记录消费者offset(注意不是分区的offset,不同的上下文环境一定要区分),这个消费者的offset,也是保存在一个特殊的内部主题,叫做__consumer_offsets,它就一个作用,那就是保存消费组里消费者的offset。默认创建时会生成50个分区(offsets.topic.num.partitions设置),一个副本,如果50个分区分布在50台服务器上,将大大缓解消费者提交offset的压力。可以在创建消费者的时候产生这个特殊消费组。

消费者的offset到底保存到哪个分区呢,kafka中是按照消费组group.id来确定的,使用Math.abs(groupId.hashCode())%50,来计算分区号,这样就可以确定一个消费组下的所有的消费者的offset,都会保存到哪个分区了.

那么问题又来了,既然一个消费组内的所有消费者都把offset提交到了__consumer_offsets下的同一个分区,如何区分不同消费者的offset呢?原来提交到这个分区下的消息,key是groupId+topic+分区号,value是消费者offset。这个key里有分区号,注意这个分区号是消费组里消费者消费topic的分区号。由于实际情况下一个topic下的一个分区,只能被一个消费组里的一个消费者消费,这就不担心offset混乱的问题了。

实际上,topic下多个分区均匀分布给一个消费组下的消费者消费,是由coordinator来完成的,它会监听消费者,如果有消费者宕机或添加新的消费者,就会rebalance,使用一定的策略让分区重新分配给消费者。如下图所示,消费组会通过offset保存的位置在哪个broker,就选举它作为这个消费组的coordinator,负责监听各个消费者心跳了解其健康状况,并且将topic对应的leader分区,尽可能平均的分给消费组里的消费者,根据消费者的变动,如新增一个消费者,会触发coordinator进行rebalance。

Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制

组内必然可以有多个消费者或消费者实例(Consumer Instance),它们共享一个公共的 ID,这个 ID 被称为 Group ID。组内的所有消费者协调在一起来消费订阅主题(Subscribed Topics)的所有分区(Partition)

每个分区只能由同一个消费者组内的一个 Consumer 实例来消费

服务器保存(topic, partition, consumer_group_id) – offset对应关系

消费者入组

  1. 对于每1个consumer group,Kafka集群为其从broker集群中选择一个broker作为其coordinator。因此,第1步就是找到这个coordinator

  2. 找到coordinator之后,发送JoinGroup请求

    第一个发送 JoinGroup 请求的成员自动成为领导者

    image-20191213094928449

  3. JoinGroup返回之后,发送SyncGroup,得到自己所分配到的partition

image-20191213094951652

为什么要在consumer中选一个leader出来,进行分配,而不是由coordinator直接分配呢?关于这个, Kafka的官方文档有详细的分析。其中一个重要原因是为了灵活性:如果让server分配,一旦需要新的分配策略,server集群要重新部署,这对于已经在线上运行的集群来说,代价是很大的;而让client分配,server集群就不需要重新部署了

    private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
      	// 确保单线程
        acquireAndEnsureOpen();
        try {
            if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
                throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
            }

            // poll for new data until the timeout expires
            do {
                client.maybeTriggerWakeup();

                if (includeMetadataInTimeout) {
                    if (!updateAssignmentMetadataIfNeeded(timer)) {
                        return ConsumerRecords.empty();
                    }
                } else {
                    while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {
                        log.warn("Still waiting for metadata");
                    }
                }

                final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
                if (!records.isEmpty()) {
                    // before returning the fetched records, we can send off the next round of fetches
                    // and avoid block waiting for their responses to enable pipelining while the user
                    // is handling the fetched records.
                    //
                    // NOTE: since the consumed position has already been updated, we must not allow
                    // wakeups or any other errors to be triggered prior to returning the fetched records.
                    if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
                        client.pollNoWakeup();
                    }

                    return this.interceptors.onConsume(new ConsumerRecords<>(records));
                }
            } while (timer.notExpired());

            return ConsumerRecords.empty();
        } finally {
            release();
        }
    }

偏移量提交

老版本 Consumer 的位移管理是依托于 Apache ZooKeeper,ZooKeeper 其实并不适用于这种高频的写操作。

新版本把位移保存在 Kafka Broker内部的主题中,主题名叫__consumer_offsets

从用户的角度来说,位移提交分为自动提交和手动提交;从 Consumer 端的角度来说,位移提交分为同步提交和异步提交

手动提交

while (true) {
            ConsumerRecords<String, String> records =
                        consumer.poll(Duration.ofSeconds(1));
            process(records); // 处理消息
            try {
                        consumer.commitSync();
            } catch (CommitFailedException e) {
                        handle(e); // 处理提交失败异常
            }
}

没有重试机制

自动提交

设置enable.auto.commit为true

设置auto.commit.interval.ms,它的默认值是 5 秒,表明Kafka每5秒会为你自动提交一次位移。

poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息

调用 commitSync() 时,Consumer 程序会处于阻塞状态,直到远端的 Broker 返回提交结果

有重复消费情况

结合两种提交策略:

   try {
           while(true) {
                        ConsumerRecords<String, String> records = 
                                    consumer.poll(Duration.ofSeconds(1));
                        process(records); // 处理消息
                        commitAysnc(); // 使用异步提交规避阻塞
            }
} catch(Exception e) {
            handle(e); // 处理异常
} finally {
            try {
                        consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
  } finally {
       consumer.close();
}
}

消费模型

每个线程维护一个 KafkaConsumer

image-20200617162011854

单 KafkaConsumer 实例 + 多 worker 线程

image-20200617162143949

消费不保证顺序,引入队列

image-20200617162402169

在消费过程中,如果 Kafka 消费组发生重平衡,此时的分区被分配给其它消费组了,如果还有消息没有消费,虽然 Kakfa 可以实现 ConsumerRebalanceListener 接口,在新一轮重平衡前主动提交消费偏移量,但这貌似解决不了未消费的消息被打乱顺序的可能性?

因此在消费前,还需要主动进行判断此分区是否被分配给其它消费者处理,并且还需要锁定该分区在消费当中不能被分配到其它消费者中(但 kafka 目前时做不到这一点)。

参考 RocetMQ 的做法:

在消费前主动调用 ProcessQueue#isDropped 方法判断队列是否已过期,并且对该队列进行加锁处理(向 broker 端请求该队列加锁)。

RocketMQ

RocketMQ 不像 Kafka 那么“原生”,RocketMQ 早已为你准备好了你的需求,它本身的消费模型就是单 consumer 实例 + 多 worker 线程模型,有兴趣的小伙伴可以从以下方法观摩 RocketMQ 的消费逻辑:

org.apache.rocketmq.client.impl.consumer.PullMessageService#run

RocketMQ 会为每个队列分配一个 PullRequest,并将其放入 pullRequestQueue,PullMessageService 线程会不断轮询从 pullRequestQueue 中取出 PullRequest 去拉取消息,接着将拉取到的消息给到 ConsumeMessageService 处理,ConsumeMessageService 有两个子接口:

// 并发消息消费逻辑实现类
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
// 顺序消息消费逻辑实现类
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;

其中,ConsumeMessageConcurrentlyService 内部有一个线程池,用于并发消费,同样地,如果需要顺序消费,那么 RocketMQ 提供了 ConsumeMessageOrderlyService 类进行顺序消息消费处理。

经过对 Kafka 消费线程模型的思考之后,从 ConsumeMessageOrderlyService 源码中能够看出 RocketMQ 能够实现局部消费顺序,我认为主要有以下两点:

1)RocketMQ 会为每个消息队列建一个对象锁,这样只要线程池中有该消息队列在处理,则需等待处理完才能进行下一次消费,保证在当前 Consumer 内,同一队列的消息进行串行消费。

2)向 Broker 端请求锁定当前顺序消费的队列,防止在消费过程中被分配给其它消费者处理从而打乱消费顺序

是按顺序的,只是周期拉取的,然后放入**ProcessQueue#**msgTreeMap等待消费,每次MessageListenerOrderly从takeMessags获取最先生成的消息

重平衡

Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区

  1. 组成员数发生变更。比如有新的 Consumer 实例加入组或者离开组,抑或是有 Consumer 实例崩溃被“踢出”组。
  2. 订阅主题数发生变更。Consumer Group 可以使用正则表达式的方式订阅主题,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。
  3. 订阅主题的分区数发生变更。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。

image-20191108114145781

在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。

控制器

每个broker就是一个kafka的实例或者称之为kafka的服务。其实控制器也是一个broker,控制器也叫leader broker。

  1. 主题管理(创建、删除、增加分区)
  2. 分区重分配
  3. Preferred 领导者选举
  4. 集群成员管理(新增 Broker、Broker 主动关闭、Broker 宕机)
  5. 数据服务

集群成员管理:Controller 负责对集群所有成员进行有效管理,包括自动发现新增 Broker、自动处理下线 Broker,以及及时响应 Broker 数据的变更。

主题管理:Controller 负责对集群上的所有主题进行高效管理,包括创建主题、变更主题以及删除主题,等等。对于删除主题而言,实际的删除操作由底层的 TopicDeletionManager 完成。

控制器保持的数据

image-20200806174018908

controller选举

通过在zookeeper上创建临时节点的方式,选举为leader broker,即控制器

image-20191211114800350

触发选举

image-20200806083941360

  1. 集群从零启动时;

  2. Broker 侦测 /controller 节点消失时;

  3. Broker 侦测到 /controller 节点数据发生变更时。

    如果 Broker 之前是 Controller,那么该 Broker 需要首先执行卸任操作,然后再尝试竞选;如果 Broker 之前不是 Controller,那么,该 Broker 直接去竞选新 Controller。

选举

image-20200806085146288

分区leader选举

ar集合中第一个存活副本,且在isr集合中

当一个broker停止或者crashes时,所有本来将它作为leader的分区将会把leader转移到其它broker上去。这意味着当这个broker重启时,它将不再担任何分区的leader,kafka的client也不会从这个broker来读取消息,从而导致资源的浪费。比如下面的broker 7是挂掉重启的,我们可以发现Partition 1虽然在broker 7上有数据,但是由于它挂了,所以Kafka重新将broker 3当作该分区的Leader,然而broker 3已经是Partition 6的Leader了。

[iteblog@www.iteblog.com ~]$ kafka-topics.sh --topic iteblog   \
--describe --zookeeper www.iteblog.com:2181
 
Topic:iteblog PartitionCount:7  ReplicationFactor:2 Configs:
  Topic: iteblog  Partition: 0  Leader: 1 Replicas: 1,4 Isr: 1,4
  Topic: iteblog  Partition: 1  Leader: 3 Replicas: 7,3 Isr: 3,7
  Topic: iteblog  Partition: 2  Leader: 5 Replicas: 5,7 Isr: 5,7
  Topic: iteblog  Partition: 3  Leader: 6 Replicas: 6,1 Isr: 1,6
  Topic: iteblog  Partition: 4  Leader: 4 Replicas: 4,2 Isr: 4,2
  Topic: iteblog  Partition: 5  Leader: 2 Replicas: 2,5 Isr: 5,2
  Topic: iteblog  Partition: 6  Leader: 3 Replicas: 3,6 Isr: 3,6

https://blog.csdn.net/qq_29493353/article/details/88532089

控制器故障转移(Failover)

image-20191210174046139

最开始时,Broker 0 是控制器。当 Broker 0 宕机后,ZooKeeper 通过 Watch 机制感知到并删除了 /controller 临时节点。之后,所有存活的 Broker 开始竞选新的控制器身份。Broker 3 最终赢得了选举,成功地在 ZooKeeper 上重建了 /controller 节点。之后,Broker 3 会从 ZooKeeper 中读取集群元数据信息,并初始化到自己的缓存中

private def elect(): Unit = {
    activeControllerId = zkClient.getControllerId.getOrElse(-1)
    /*
     * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition,
     * it's possible that the controller has already been elected when we get here. This check will prevent the following
     * createEphemeralPath method from getting into an infinite loop if this broker is already the controller.
     */
    if (activeControllerId != -1) {
      debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")
      return
    }

    try {
      // 如果没抛异常,则该实例选举成功,成为leader
      val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)
      controllerContext.epoch = epoch
      controllerContext.epochZkVersion = epochZkVersion
      activeControllerId = config.brokerId

      info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} " +
        s"and epoch zk version is now ${controllerContext.epochZkVersion}")

      onControllerFailover()
    }
  ...
}

image-20191210181215208

在分区高水位以下的消息被认为是已提交消息,反之就是未提交消息。消费者只能消费已提交消息

高水位和 LEO 是副本对象的两个重要属性。Kafka 所有副本都有对应的高水位和 LEO 值,而不仅仅是 Leader 副本。只不过 Leader 副本比较特殊,Kafka 使用 Leader 副本的高水位来定义所在分区的高水位。换句话说,分区的高水位就是其 Leader 副本的高水位。

image-20191211093647072

Broker 0 上保存了某分区的 Leader 副本和所有 Follower 副本的 LEO 值,而 Broker 1 上仅仅保存了该分区的某个 Follower 副本

处理生产者请求的逻辑如下:

i. 获取 Leader 副本所在 Broker 端保存的所有远程副本 LEO 值(LEO-1,LEO-2,……,LEO-n)

ii. 获取 Leader 副本高水位值:currentHW

iii. 更新 currentHW = max{currentHW, min(LEO-1, LEO-2, ……,LEO-n)}

处理 Follower 副本拉取消息的逻辑如下:

  1. 读取磁盘(或页缓存)中的消息数据

  2. 使用 Follower 副本发送请求中的位移值更新远程副本 LEO 值
  3. 更新分区高水位值(具体步骤与处理生产者请求的步骤相同)

从 Leader 拉取消息的处理逻辑如下:

  1. 写入消息到本地磁盘

  2. 更新 LEO 值。更新高水位值

    i. 获取 Leader 发送的高水位值:currentHW

    ii. 获取步骤 2 中更新过的 LEO 值:currentLEO

    iii. 更新高水位为 min(currentHW, currentLEO)

所有 Broker 都有各自的 Coordinator 组件,专门为 Consumer Group 服务,负责为 Group 执行 Rebalance 以及提供位移管理和组成员管理等。

Kafka 为某个Consumer Group确定Coordinator 所在的 Broker 的算法有 2 个步骤。

第 1 步:确定由位移主题的哪个分区来保存该 Group 数据:partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。

第 2 步:找出该分区 Leader 副本所在的 Broker,该 Broker 即为对应的 Coordinator。

第一层是主题层,每个主题可以配置 M 个分区,而每个分区又可以配置 N 个副本。

第二层是分区层,每个分区的 N 个副本中只能有一个充当领导者角色,对外提供服务;其他 N-1 个副本是追随者副本,只是提供数据冗余之用。

第三层是消息层,分区中包含若干条消息,每条消息的位移从 0 开始,依次递增。

最后,客户端程序只能与分区的领导者副本进行交互。

image-20191025140100157

分区副本分配

def assignReplicasToBrokers(brokerMetadatas: Seq[BrokerMetadata],
                              nPartitions: Int,
                              replicationFactor: Int,
                              fixedStartIndex: Int = -1,
                              startPartitionId: Int = -1): Map[Int, Seq[Int]] = {
    if (nPartitions <= 0)
      throw new InvalidPartitionsException("Number of partitions must be larger than 0.")
    if (replicationFactor <= 0)
      throw new InvalidReplicationFactorException("Replication factor must be larger than 0.")
    if (replicationFactor > brokerMetadatas.size)
      throw new InvalidReplicationFactorException(s"Replication factor: $replicationFactor larger than available brokers: ${brokerMetadatas.size}.")
    if (brokerMetadatas.forall(_.rack.isEmpty))
      assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, brokerMetadatas.map(_.id), fixedStartIndex,
        startPartitionId)
    else {
      if (brokerMetadatas.exists(_.rack.isEmpty))
        throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment.")
      assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex,
        startPartitionId)
    }
  }


  private def assignReplicasToBrokersRackUnaware(nPartitions: Int,
                                                 replicationFactor: Int,
                                                 brokerList: Seq[Int],
                                                 fixedStartIndex: Int,
                                                 startPartitionId: Int): Map[Int, Seq[Int]] = {
    val ret = mutable.Map[Int, Seq[Int]]()
    val brokerArray = brokerList.toArray
    val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
    var currentPartitionId = math.max(0, startPartitionId)
    var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
    for (_ <- 0 until nPartitions) {
      if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
        nextReplicaShift += 1
      val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
      val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
      for (j <- 0 until replicationFactor - 1)
        replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
      ret.put(currentPartitionId, replicaBuffer)
      currentPartitionId += 1
    }
    ret
  }

实现方案

image-20200806174825895

数据同步

协调器

  1. 消费者协调器(ConsumerCoordinator)
  2. 组协调器(GroupCoordinator)
  3. 任务管理协调器(WorkCoordinator)

问题

  1. broker leader作用
  2. 分区leader作用

选举机制

时间轮

image-20220208170647146

那么如何支持大跨度的定时任务呢?

如果要支持几十万毫秒的定时任务,难不成要扩容时间轮的那个数组?实际上这里有两种解决方案:

在 Kafka 中时间轮之间如何关联呢,如何展现这种高一层的时间轮关系?

其实很简单就是一个内部对象的指针,指向自己高一层的时间轮对象。

另外还有一个问题,如何推进时间轮的前进,让时间轮的时间往前走。

其实 Kafka 采用的是一种权衡的策略,把 DelayQueue 用在了合适的地方。DelayQueue 只存放了 TimerTaskList,并不是所有的 TimerTask,数量并不多,相比空推进带来的影响是利大于弊的

延迟队列

image-20220208171725418

image-20220208171741184

rocketMQ

namesrv VS zk

1、我们可以对比下kafka和rocketMq在协调节点选择上的差异,kafka通过zookeeper来进行协调,而rocketMq通过自身的namesrv进行协调。

2、kafka在具备选举功能,在Kafka里面,Master/Slave的选举,有2步:第1步,先通过ZK在所有机器中,选举出一个KafkaController;第2步,再由这个Controller,决定每个partition的Master是谁,Slave是谁。因为有了选举功能,所以kafka某个partition的master挂了,该partition对应的某个slave会升级为主对外提供服务。

3、rocketMQ不具备选举,Master/Slave的角色也是固定的。当一个Master挂了之后,你可以写到其他Master上,但不能让一个Slave切换成Master。那么rocketMq是如何实现高可用的呢,其实很简单,rocketMq的所有broker节点的角色都是一样,上面分配的topic和对应的queue的数量也是一样的,Mq只能保证当一个broker挂了,把原本写到这个broker的请求迁移到其他broker上面,而并不是这个broker对应的slave升级为主。

4、rocketMq在协调节点的设计上显得更加轻量,用了另外一种方式解决高可用的问题,思路也是可以借鉴的。

消息存储

image-20191211173947135

关于吞吐量

1、首先说明下面的几张图片来自于互联网共享,也就是我后面参考文章里面的列出的文章。

2、kafka在消息存储过程中会根据topic和partition的数量创建物理文件,也就是说我们创建一个topic并指定了3个partition,那么就会有3个物理文件目录,也就说说partition的数量和对应的物理文件是一一对应的。

3、rocketMq在消息存储方式就一个物流问题,也就说传说中的commitLog,rocketMq的queue的数量其实是在consumeQueue里面体现的,在真正存储消息的commitLog其实就只有一个物理文件。

4、kafka的多文件并发写入 VS rocketMq的单文件写入,性能差异kafka完胜可想而知。

5、kafka的大量文件存储会导致一个问题,也就说在partition特别多的时候,磁盘的访问会发生很大的瓶颈,毕竟单个文件看着是append操作,但是多个文件之间必然会导致磁盘的寻道。

  rocketmq kafka
ha master、slave 主题分区可以设置副本数
消息存储 一个broker一个文件 一个分区,一个文件
创建主题 分messagequeue:broker 分区:控制器(leader broker)
Messagequeue(分区) 每个broker都有相同个数的messagequeue 分区平均分配给broker

redis cluster

副本是以机器作为副本单位,kafka不是以机器作为副本单位

分区的思想(逻辑上)和redis cluster的思想(物理上)一样是主从结构

broker集群像多主结构,但也有一个controller角色

image-20210506125824890

reference

https://time.geekbang.org/column/article/105112

https://www.jianshu.com/p/a6b9e5342878

https://www.jianshu.com/p/c474ca9f9430

https://matt33.com/2017/07/08/kafka-producer-metadata/

https://blog.csdn.net/chunlongyu/article/details/52651960

http://objcoding.com/2020/04/26/multi-threaded-consumption/

http://zhengjianglong.cn/2018/04/21/kafka/kafka源码解析(5)–协调者实现原理/

https://www.cnblogs.com/dorothychai/p/6181058.html