王者荣耀云缨到底有多高 王者荣耀云缨漂
1 2024-11-10 09:54:32
首先来看一下 FlinkKafkaConsumerBase.run方法,相当于是Flink 从kafka中拉取数据的入口方法:
kafka默认拉取数量 kafka消费者拉取数据频率
kafka默认拉取数量 kafka消费者拉取数据频率
createFetcher方法
返回了一个 KafkaFetcher对象,我们点进去看一下
KafkaFetcher的构造器里面创建了一个 KafkaConsumerThread对象
至此为止createFetch就介绍完了,也可以看作是拉取数据的准备工作,接下来看一下kafkaFetcher.runFetchLoop();
KafkaFetch中的runFetchLoop方法,正式开始从kafka中拉取message
既然consumerThread.start()开始了实际的kafka consumer,我们一起来看一下consumerThread中的方法
至此如何从kafka中拉取数据,已经介绍完了
来自《kafka权威指南》第2章
num.partitions参数指定了新创建的主题将包含多少个分区。如果启用了主题自动创建功能(该功能默认是启用的),主题分区的个数就是该参数指定的值。该参数的默认值是 1。要注意,我们可以增加主题分区的个数,但不能减少分区的个数。所以,如果要让一个主题的分区个数少于 num.partitions 指定的值,需要手动创建该主题。
Kafka 集群通过分区对主题进行横向扩展,所以当有新的 broker 加入集群时,可以通过分区个数来实现集群的负载均衡。当然,这并不是说,在存在多个主题的情况下(它们分布在多个 broker 上),为了能让分区分布到所有 broker 上,主题分区的个数必须要大于 broker 的个数。不过,拥有大量消息的主题如果要进行负载分散,就需要大量的分区。
为主题选定分区数量并不是一件可有可无的事情,在进行数量选择时,需要考虑如下几个因素。
kafka的配置分为 broker、producter、consumer三个不同的配置
一 BROKER 的全局配置
为核心的三个配置 broker.id、log.dir、zookeeper.connect 。
------------------------------------------- 系统 相关 -------------------------------------------
broker.id =1
log.dirs = /tmp/kafka-logs
port =6667
message.max.bytes =3
num.io.threads =8
background.threads =4
queued.max.requests =500
host.name
aertised.host.name
aertised.port
socket.send.buffer.bytes =1001024
socket.receive.buffer.bytes =1001024
socket.request.max.bytes =100 1024 1024
------------------------------------------- LOG 相关 -------------------------------------------
log.segment.bytes =1024 1024 1024
log.roll.hours =247
log.cleanup.policy = delete
log.retention.minutes=7days
指定日志每隔多久检查看是否可以被删除,默认1分钟
log.cleanup.interval.mins=1
log.retention.bytes=-1
log.retention.check.interval.ms=5minutes
log.cleaner.enable=false
log.cleaner.threads =1
log.cleaner.io.max.bytes.per.second=None
log.cleaner.dedupe.buffer.size=500 1024 1024
log.cleaner.io.buffer.size=5121024
log.cleaner.io.buffer.load.factor =0.9
log.cleaner.backoff.ms =15000
log.cleaner.min.cleanable.ratio=0.5
log.cleaner.delete.retention.ms =1day
log.index.size.max.bytes =10 1024 1024
log.index.interval.bytes =4096
log.flush.interval.messages=None
log.flush.scheduler.interval.ms =3000
log.flush.interval.ms = None
log.delete.delay.ms =60000
log.flush.offset.checkpoint.interval.ms =60000
------------------------------------------- TOPIC 相关 =true
default.replication.factor =1
num.partitions =1
实例 --replication-factor3--partitions1--topic replicated-topic :名称replicated-topic有一个分区,分区被到三个broker上。
----------------------------------(Leader、replicas) 相关 ----------------------------------
controller.socket.timeout.ms =30000
controller.message.queue.size=10
replica.lag.time.max.ms =10000
replica.lag.max.messages =4000
replica.socket.timeout.ms=301000
replica.socket.receive.buffer.bytes=641024
replica.fetch.max.bytes =10241024
replica.fetch.wait.max.ms =500
replica.fetch.min.bytes =1
num.replica.fetchers=1
replica.high.watermark.checkpoint.interval.ms =5000
controlled.shutdown.enable =false
controlled.shutdown.max.retries =3
controlled.shutdown.retry.backoff.ms =5000
auto.leader.rebalance.enable =false
leader.imbalance.per.broker.percentage =10
leader.imbalance.check.interval.seconds =300
offset.metadata.max.bytes
----------------------------------ZooKeeper 相关----------------------------------
zookeeper.connect = localhost:2181
zookeeper.session.timeout.ms=6000
zookeeper.connection.timeout.ms =6000
zookeeper.sync.time.ms =2000
配置的修改
其中一部分配置是可以被每个topic自身的配置所代替,例如
新增配置
bin/kafka-topics.sh --zookeeper localhost:2181--create --topic my-topic --partitions1--replication-factor1--config max.message.bytes=64000--config flush.messages=1
修改配置
bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --config max.message.bytes=128000
删除配置 :
bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --deleteConfig max.message.bytes
二 CONSUMER 配置
为核心的配置是group.id、zookeeper.connect
group.id
consumer.id
client.id = group id value
zookeeper.connect=localhost:2182
zookeeper.session.timeout.ms =6000
zookeeper.connection.timeout.ms =6000
zookeeper.sync.time.ms =2000
auto.offset.reset = largest
socket.timeout.ms=301000
socket.receive.buffer.bytes=641024
fetch.message.max.bytes =601000
queued.max.message.cks =10
rebalance.max.retries =4
rebalance.backoff.ms =2000
refresh.leader.backoff.ms
fetch.min.bytes =1
fetch.wait.max.ms =100
consumer.timeout.ms = -1
三 PRODUCER 的配置
比较核心的配置:metadata.broker.list、request.required.acks、producer.type、serializer.class
metadata.broker.list
request.required.acks =0
request.timeout.ms =10000
send.buffer.bytes=1001024
key.serializer.class
partitioner.class=kafka.producer.DefaultPartitioner
compression.codec = =3
retry.backoff.ms =100
topic.metadata.refresh.interval.ms =6001000
client.id=""
------------------------------------------- 消息模式 相关 -------------------------------------------
producer.type=sync
queue.buffering.max.ms =5000
queue.buffering.max.messages =10000
queue.enqueue.timeout.ms = -1
batch.num.messages=200
serializer.class= kafka.serializer.DefaultEncoder