1, 每个broker启动的时候都会去注册一个临时节点 /controller, 那个broker先注册这个节点,那个就是所有broker的leader,并将自己的信息写入到这个临时节点里面。如下:
[zk: 10.3.63.204:2181,10.3.63.205:2181(CONNECTED) 3] get /controller
{"version":1,"brokerid":0,"timestamp":"1407310302044"}
cZxid = 0x700000592
ctime = Wed Aug 06 15:32:01 CST 2014
mZxid = 0x700000592
mtime = Wed Aug 06 15:32:01 CST 2014
pZxid = 0x700000592
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x147aa389edd0001
dataLength = 54
numChildren = 0
每个broker都会起动kafkaController这个进程, 但只有一个是leader,controller主要是负责删除一些多余的
topic或者其他选举某个topic的pation的leader使用。
2,当关闭的时候,回调用KafkaServer的shutdown方法, 里面会先尝试关闭controller,具体调用代码如下:
CoreUtils.swallow(controlledShutdown())。
代码的逻辑是从zookeeper的controller读出leader的id,并从broker/ids/id读出broker的信息, 然后发送一个
ControlledShutdownRequest的请求到它上面,直到读到成功返回后才说明shutdownSuccessed
3, 具体处理这个请求的逻辑世在KafkaApis中来处理的, 具体的代码如下:
def handleControlledShutdownRequest(request: RequestChannel.Request) { // ensureTopicExists is only for client facing requests // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they // stop serving data to clients for the topic being deleted val controlledShutdownRequest = request.requestObj.asInstanceOf[ControlledShutdownRequest] val partitionsRemaining = controller.shutdownBroker(controlledShutdownRequest.brokerId) val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId, ErrorMapping.NoError, partitionsRemaining) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(controlledShutdownResponse))) }
里面可以看到,发送到主的leader上面,调用KafkaController的 def shutdownBroker(id: Int),工作的具体内容是循环topic的partittion, 然后判断当前的分区是否是主的, // If the broker leads the topic partition, transition the leader and update isr. Updates zk and // notifies all affected brokers
如果不是的,
// Stop the replica first. The state change below initiates ZK changes which should take some time
// before which the stop replica request should be completed (in most cases)
对应的问题是, 如果关闭controller的时间足够长的话,会导致timeout,然后会重新发送关闭的请求。因为锁的缘故,回导致再次的请求也会超时。这样会导致controller的非正常关闭, 重新启动时会有会滚的操作。 虽然这种情况下不会影响到具体的使用。
https://issues.apache.org/jira/browse/KAFKA-1342
分析启动过程:
1, 设置状态为Starting
2, kafkaScheduler.startup - 主要是后台需要定时执行的一些任务
3, initZk - 初始化和zookeeper的链接
4, logManager.startup - 这个主要是通过上面的scheduler来定时循环执行三个任务:kafka-log-retention kafka-log-flusher kafka-recovery-point-checkpoint,如果配置了清理的话,还会起动
5, socketServer.startup,是个NIO的服务, 线程模型如下
1 Acceptor thread that handles new connections
N Processor threads that each have their own selector and read requests from sockets
M Handler threads that handle requests and produce responses back to the processor threads for writing.
6, replicaManager.startup - 主要是通过调度器定时执行 maybeShrinkIsr方法的线程
7, createOffsetManager - 通过调度器启动定时执行 compact 方法的线程
8, kafkaController.startup - 注册zk session失效事件,竞争leader。如果是leader的话,则会回调
kafkaController的 onControllerFailover 方法。
9, consumerCoordinator.startup - Kafka coordinator handles consumer group and consumer offset management 主要是处理消费组和消费者偏移量的问题
10, start processing requests requestHandlerPool-KafkaApis 主要是通过KafkaRequestHandlerPool
来启动处理请求的线程,每个线程实际最后调用的还是KafkaApis
11, 设置状态 runningAsBroker
12, topicConfigManager.startup - 主要监听 /config/changes,然后 Process the given list of config changes
13, tell everyone we are alive - KafkaHealthcheck.startup,主要是和zk保持心跳连接
14, register broker metrics - 主要是一些统计信息
Broker的状态
broker 有以下几种状态
case object NotRunning extends BrokerStates { val state: Byte = 0 } case object Starting extends BrokerStates { val state: Byte = 1 } case object RecoveringFromUncleanShutdown extends BrokerStates { val state: Byte = 2 } case object RunningAsBroker extends BrokerStates { val state: Byte = 3 } case object RunningAsController extends BrokerStates { val state: Byte = 4 } case object PendingControlledShutdown extends BrokerStates { val state: Byte = 6 } case object BrokerShuttingDown extends BrokerStates { val state: Byte = 7 }
状态之间的流转图如下:
/**
* Broker states are the possible state that a kafka broker can be in.
* A broker should be only in one state at a time.
* The expected state transition with the following defined states is:
*
* +-----------+
* |Not Running|
* +-----+-----+
* |
* v
* +-----+-----+
* |Starting +--+
* +-----+-----+ | +----+------------+
* | +>+RecoveringFrom |
* v |UncleanShutdown |
* +----------+ +-----+-----+ +-------+---------+
* |RunningAs | |RunningAs | |
* |Controller+<--->+Broker +<-----------+
* +----------+ +-----+-----+
* | |
* | v
* | +-----+------------+
* |-----> |PendingControlled |
* |Shutdown |
* +-----+------------+
* |
* v
* +-----+----------+
* |BrokerShutting |
* |Down |
* +-----+----------+
* |
* v
* +-----+-----+
* |Not Running|
* +-----------+
*
* Custom states is also allowed for cases where there are custom kafka states for different scenarios.
*/
相关推荐
kafka-summarykafka学习总结,源码剖析目录一、基础篇开篇说明概念说明配置说明znode分类kafka协议分类Kafka线程日志存储格式kakfa架构设计二、流程篇1、kafka启动过程2、日志初始化和清理过程3、选举controller过程...
KAFKA和OracleGoldenGate安装过程中出现的问题及其解决方法
已编译 Kafka-Manager-1.3.3.22 linux下直接解压解压kafka-manager-1.3.3.22.zip到/opt/module目录 ...备注:指定端口号看启动过程中 "-Dhttp.port=7456" 端口可以自己设置 http://hadoop102:7456
简单实现了kafka的搭建过程,以及kafka搭建时的启动及配置说明
在kafka集群中,每个代理节点(Broker)在启动都会实例化一个KafkaController类。该类会执行一系列业务逻辑,选举出主题分区的leader节点。 (1)第一个启动的代理节点,会在Zookeeper系统里面创建一个临时节点/...
2.3 Kafka一键启动/关闭脚本 第三章 基础操作 3.1 创建topic 3.2 生产消息到Kafka 3.3 从Kafka消费消息 3.4 使用Kafka Tools操作Kafka 第四章 Kafka基准测试 第五章 Java编程操作Kafka 5.1 同步生产消息到Kafka中 ...
启动过程时,必须将配置文件作为参数传递。 ## Kafka Exporter Properties # HTTP port used for the exporter exporter.port=12340 # Time in seconds that the metrics, once retrieved, will consider as valid...
它包括IoC(Inverse of Control,控制反转)容器、AOP(Aspect-Oriented Programming,面向切面编程)等特性,可以简化开发过程、提高代码的可维护性和可测试性。 2. Spring MVC框架:Spring MVC是基于Spring框架的...
在项目启动之初来预测将来项目会碰到什么需求, 是极其困难的。 消息系统在处理过程中间插入了一个隐含的、 基于数据的接口层, 两边的处理过程都要实现这一接口。 这允许你独立的扩展或修改两边的处理过程, 只要...
##Mac安装kafka 并生产消息和消费消息 安装kafka $ brew install kafka 1、 安装过程将依赖安装 zookeeper 2、 软件位置 /usr/local/Cellar/zookeeper /usr/local/Cellar/kafka ...3、启动kafka服务 kafka-serve
在采用Kafka作为启动过类型的Fabric网络中,configtx.yaml 及 cryto-config.yaml配置文件依然有着重要的地位,但是其中的配置样本与先前的内容会有些不同。 本章将进行基于Kafka集群的部署,其中重要的概念是对前三...
奥乔尼特斯概述您需要设置并快速运行一堆代理... 请注意,我包含了一个相当简单的容器,它将使用本地/var/lib/kafka来存储其日志。 随意挂载一些东西,甚至动态分配一个卷。做吧!第 1 步:在运行门户的情况下在 AWS 上
使用Replicator的Kafka Streams迁移示例关键点如果Kafka Streams exactly-once ,则复制器必须配置为isolation.level=read_committed 迁移的应用程序需要使用auto.offset.reset=latest重新启动为了一个接一个地迁移...
然而,大多数情况下你可能不想手动启动服务器,有可能是你安排MySQL服务器在系统引导时自动启动,作为标准引导过程的一部分,在Unix下,该引导过程由系统的Unix用户root执行,并且任何在该过程中运行的进程均以root...
当应用程序启动时,除了 Zookeeper 和 Kafka 之外,它由四个独立的单元组成。 每个单元都是一个单独的过程。 这些进程不必在同一台计算机上。 这些单位是: 原料药 处理器 订户一 订户二 主API监听主机操作系统的 ...
##下载源码git clonekafka-log-appender:将日志内容写到kafka程序log-kafka-storm:docker-compose脚本和storm程序##准备docker环境###启动dockerdocker-compose环境搭建过程请查看我的进入log-kafka-storm目录,...
即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输支持KafkaServer间的消息分区,及分布式消息消费,同时保证每个partition内的消息顺序传输同时支持离线数据处理和实时数据处理为什么要用Message...
MirrorMaker是事实上的标准工具,用于在不同群集之间镜像Kafka主题。 默认情况下,群集之间的主题名称始终保持相同,但是在某些情况下,有必要在镜像过程中更改主题名称。 该存储库显示了如何通过实现自定义Message...
4 安装过程中需要配置相关数据库 10 5 安装完成,提示成功 11 四、 安装kafka 11 1 下载软件 11 2 修改配置 11 3 启动服务 11 五、 安装Oryx 11 1 下载软件 12 2 下载配置文件oryx.conf 12 3 创建kafka主题(topic)...
153_kafka集群安装与启动4 ^; K& j3 @6 p0 M 154_kafka创建主题以及查看主题结构 155_考察zk中kafka结构9 N: Y8 u4 {# m/ z1 d3 H 156_kafka分区服务器服务方式 157_kafka编程API实现生产者和消费者+ w9 l1 N( D8 E%...