kafka默认拉取数量 kafka消费者拉取数据频率

2025-04-18 09:51

Flink是如何从kafka中拉取数据的

首先来看一下 FlinkKafkaConsumerBase.run方法,相当于是Flink 从kafka中拉取数据的入口方法:

kafka默认拉取数量 kafka消费者拉取数据频率kafka默认拉取数量 kafka消费者拉取数据频率


kafka默认拉取数量 kafka消费者拉取数据频率


createFetcher方法

返回了一个 KafkaFetcher对象,我们点进去看一下

KafkaFetcher的构造器里面创建了一个 KafkaConsumerThread对象

至此为止createFetch就介绍完了,也可以看作是拉取数据的准备工作,接下来看一下kafkaFetcher.runFetchLoop();

KafkaFetch中的runFetchLoop方法,正式开始从kafka中拉取message

既然consumerThread.start()开始了实际的kafka consumer,我们一起来看一下consumerThread中的方法

至此如何从kafka中拉取数据,已经介绍完了

2、kafka如何选定分区数量

来自《kafka权威指南》第2章

num.partitions参数指定了新创建的主题将包含多少个分区。如果启用了主题自动创建功能(该功能默认是启用的),主题分区的个数就是该参数指定的值。该参数的默认值是 1。要注意,我们可以增加主题分区的个数,但不能减少分区的个数。所以,如果要让一个主题的分区个数少于 num.partitions 指定的值,需要手动创建该主题。

Kafka 集群通过分区对主题进行横向扩展,所以当有新的 broker 加入集群时,可以通过分区个数来实现集群的负载均衡。当然,这并不是说,在存在多个主题的情况下(它们分布在多个 broker 上),为了能让分区分布到所有 broker 上,主题分区的个数必须要大于 broker 的个数。不过,拥有大量消息的主题如果要进行负载分散,就需要大量的分区。

为主题选定分区数量并不是一件可有可无的事情,在进行数量选择时,需要考虑如下几个因素。

kafka配置参数详解

kafka的配置分为 broker、producter、consumer三个不同的配置

一 BROKER 的全局配置

为核心的三个配置 broker.id、log.dir、zookeeper.connect 。

------------------------------------------- 系统 相关 -------------------------------------------

broker.id =1

log.dirs = /tmp/kafka-logs

port =6667

message.max.bytes =3

num.io.threads =8

background.threads =4

queued.max.requests =500

host.name

aertised.host.name

aertised.port

socket.send.buffer.bytes =1001024

socket.receive.buffer.bytes =1001024

socket.request.max.bytes =100 1024 1024

------------------------------------------- LOG 相关 -------------------------------------------

log.segment.bytes =1024 1024 1024

log.roll.hours =247

log.cleanup.policy = delete

log.retention.minutes=7days

指定日志每隔多久检查看是否可以被删除,默认1分钟

log.cleanup.interval.mins=1

log.retention.bytes=-1

log.retention.check.interval.ms=5minutes

log.cleaner.enable=false

log.cleaner.threads =1

log.cleaner.io.max.bytes.per.second=None

log.cleaner.dedupe.buffer.size=500 1024 1024

log.cleaner.io.buffer.size=5121024

log.cleaner.io.buffer.load.factor =0.9

log.cleaner.backoff.ms =15000

log.cleaner.min.cleanable.ratio=0.5

log.cleaner.delete.retention.ms =1day

log.index.size.max.bytes =10 1024 1024

log.index.interval.bytes =4096

log.flush.interval.messages=None

log.flush.scheduler.interval.ms =3000

log.flush.interval.ms = None

log.delete.delay.ms =60000

log.flush.offset.checkpoint.interval.ms =60000

------------------------------------------- TOPIC 相关 =true

default.replication.factor =1

num.partitions =1

实例 --replication-factor3--partitions1--topic replicated-topic :名称replicated-topic有一个分区,分区被到三个broker上。

----------------------------------(Leader、replicas) 相关 ----------------------------------

controller.socket.timeout.ms =30000

controller.message.queue.size=10

replica.lag.time.max.ms =10000

replica.lag.max.messages =4000

replica.socket.timeout.ms=301000

replica.socket.receive.buffer.bytes=641024

replica.fetch.max.bytes =10241024

replica.fetch.wait.max.ms =500

replica.fetch.min.bytes =1

num.replica.fetchers=1

replica.high.watermark.checkpoint.interval.ms =5000

controlled.shutdown.enable =false

controlled.shutdown.max.retries =3

controlled.shutdown.retry.backoff.ms =5000

auto.leader.rebalance.enable =false

leader.imbalance.per.broker.percentage =10

leader.imbalance.check.interval.seconds =300

offset.metadata.max.bytes

----------------------------------ZooKeeper 相关----------------------------------

zookeeper.connect = localhost:2181

zookeeper.session.timeout.ms=6000

zookeeper.connection.timeout.ms =6000

zookeeper.sync.time.ms =2000

配置的修改

其中一部分配置是可以被每个topic自身的配置所代替,例如

新增配置

bin/kafka-topics.sh --zookeeper localhost:2181--create --topic my-topic --partitions1--replication-factor1--config max.message.bytes=64000--config flush.messages=1

修改配置

bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --config max.message.bytes=128000

删除配置 :

bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --deleteConfig max.message.bytes

二 CONSUMER 配置

为核心的配置是group.id、zookeeper.connect

group.id

consumer.id

client.id = group id value

zookeeper.connect=localhost:2182

zookeeper.session.timeout.ms =6000

zookeeper.connection.timeout.ms =6000

zookeeper.sync.time.ms =2000

auto.offset.reset = largest

socket.timeout.ms=301000

socket.receive.buffer.bytes=641024

fetch.message.max.bytes =601000

queued.max.message.cks =10

rebalance.max.retries =4

rebalance.backoff.ms =2000

refresh.leader.backoff.ms

fetch.min.bytes =1

fetch.wait.max.ms =100

consumer.timeout.ms = -1

三 PRODUCER 的配置

比较核心的配置:metadata.broker.list、request.required.acks、producer.type、serializer.class

metadata.broker.list

request.required.acks =0

request.timeout.ms =10000

send.buffer.bytes=1001024

key.serializer.class

partitioner.class=kafka.producer.DefaultPartitioner

compression.codec = =3

retry.backoff.ms =100

topic.metadata.refresh.interval.ms =6001000

client.id=""

------------------------------------------- 消息模式 相关 -------------------------------------------

producer.type=sync

queue.buffering.max.ms =5000

queue.buffering.max.messages =10000

queue.enqueue.timeout.ms = -1

batch.num.messages=200

serializer.class= kafka.serializer.DefaultEncoder

下一篇:lol一把加28分会跳段吗 lol1把加25分能跳段吗
上一篇:
相关文章
返回顶部小火箭