librdkafka配置参数整理。
全局配置参数
Property | C/P | Range | Default | Description |
---|---|---|---|---|
builtin.features | * | gzip, snappy, ssl, sasl, regex, lz4, sasl_gssapi, sasl_plain, sasl_scram, plugins | 标示该librdkafka的支持的内建特性。应用程序可以查看或设置这些值来检查是否支持这些特性. Type: CSV flags |
|
client.id | * | rdkafka | 客户端标识. Type: string |
|
metadata.broker.list | * | 初始化的broker列表,host:port格式, 应用程序可以使用rd_kafka_brokers_add()动态添加broker. Type: string |
||
bootstrap.servers | * | 参考 metadata.broker.list | ||
message.max.bytes | * | 1000 .. 1000000000 | 1000000 | 消息发送最大字节数. Type: integer |
message.copy.max.bytes | * | 0 .. 1000000000 | 65535 | 消息缓冲区最大字节数,多出的消息将通过引用传递,但是会消耗更多的内存(struct iovec). Type: integer |
receive.message.max.bytes | * | 1000 .. 1000000000 | 100000000 | kafka消息接收最大字节数,这事一种安全机制,防止在最坏的情况耗尽内存问题。建议值:fetch.message.max.bytes * 分区数 + 消息的最大字节数. Type: integer |
max.in.flight.requests.per.connection | * | 1 .. 1000000 | 1000000 | broker连接数最大值,针对每个broker配置。主要用于发送消息,但是其他机制将会限制每个broker未处理消息消费数量为1个. Type: integer |
max.in.flight | * | 参考 max.in.flight.requests.per.connection | ||
metadata.request.timeout.ms | * | 10 .. 900000 | 60000 | 没有数据操作的超时时间,单位毫秒. Type: integer |
topic.metadata.refresh.interval.ms | * | -1 .. 3600000 | 300000 | 数据刷新时间间隔,单位毫秒,自动刷新错误和连接,设置为-1则关闭刷新时间间隔. Type: integer |
metadata.max.age.ms | * | 1 .. 86400000 | -1 | 元数据缓存最大生命周期. 默认值为数据刷新时间间隔 metadata.refresh.interval.ms * 3 Type: integer |
topic.metadata.refresh.fast.interval.ms | * | 1 .. 60000 | 250 | topic主题失去leader领导者时,元数据请求发送间隔. 用户快速恢复broker leader. Type: integer |
topic.metadata.refresh.fast.cnt | * | 0 .. 1000 | 10 | 保留,未启用. * Type: integer |
topic.metadata.refresh.sparse | * | true, false | true | 极少的元数据请求,消费者的网络带宽很小 Type: boolean |
topic.blacklist | * | topic内名单,逗号分割增则表达式列表. Type: pattern list |
||
debug | * | generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, all | 一个逗号分割的调试上下文列表,包括:生产者:broker、topic、message; 消费者:cgroup、topic、tetch等 Type: CSV flags |
|
socket.timeout.ms | * | 10 .. 300000 | 60000 | 默认的网络请求超时时间, Producer:ProduceRequests将使用批处理中第一条消息的socket.timeout.ms和其余message.timeout.ms的较小值。 使用者:FetchRequests将使用fetch.wait.max.ms + socket.timeout.ms. Type: integer |
socket.blocking.max.ms | * | 1 .. 60000 | 1000 | socket套接字可能阻塞的最大时间,比较小的值提高了响应速度,但是CPU负载比较大. 已废弃. Deprecated Type: integer |
socket.send.buffer.bytes | * | 0 .. 100000000 | 0 | broker端发送缓冲区大小,0则使用系统默认值. Type: integer |
socket.receive.buffer.bytes | * | 0 .. 100000000 | 0 | broker端接收缓冲区大小,0则使用系统默认值. Type: integer |
socket.keepalive.enable | * | true, false | false | 启用TCP keep-alives (SO_KEEPALIVE) on broker sockets Type: boolean |
socket.nagle.disable | * | true, false | false | 禁用nagle 算法TCP_NODELAY Type: boolean |
socket.max.fails | * | 0 .. 1000000 | 1 | 发送失败的最大次数,超过该次数后断开与broker的连接,0 禁用;注意:连接会自动重连. Type: integer |
broker.address.ttl | * | 0 .. 86400000 | 1000 | broker地址解析结果缓存值(毫秒). Type: integer |
broker.address.family | * | any, v4, v6 | any | any ipv4 ipv6 Type: enum value |
reconnect.backoff.jitter.ms | * | 0 .. 3600000 | 500 | 通过该配置参数+-50%调整broker重连. Type: integer |
statistics.interval.ms | * | 0 .. 86400000 | 0 | librdkafka统计时间间隔,应用程序也应该通过注册回调函数来实现对统计指标的监控,0禁用,单位1000ms. Type: integer |
enabled_events | * | 0 .. 2147483647 | 0 | See rd_kafka_conf_set_events() Type: integer |
error_cb | * | 发送错误的回调函数, (set with rd_kafka_conf_set_error_cb()) Type: pointer |
||
throttle_cb | * | 调整回调函数 (set with rd_kafka_conf_set_throttle_cb()) Type: pointer |
||
stats_cb | * | 统计回调函数 (set with rd_kafka_conf_set_stats_cb()) Type: pointer |
||
log_cb | * | 日志回调函数 (set with rd_kafka_conf_set_log_cb()) Type: pointer |
||
log_level | * | 0 .. 7 | 6 | 日志级别 (syslog(3) levels) Type: integer |
log.queue | * | true, false | false | 禁用内部librdkafka线程中的自发log_cb,而是使用rd_kafka_set_log_queue()将队列中的日志消息排入队列,并通过标准轮询API提供日志回调或事件。 注意:日志消息将一直存在于临时队列中,直到日志队列被设置. Type: boolean |
log.thread.name | * | true, false | true | 在日志中记录线程名称,经常用于调试librdkafka内部问题 Type: boolean |
log.connection.close | * | true, false | true | 记录broker关闭事件,受0.9版本的connection.max.idle.ms参数影响,一般建议关闭该参数. Type: boolean |
socket_cb | * | 为socket套接字创建CLOEXEC提供回调函数 Type: pointer |
||
connect_cb | * | socket连接的回调函数 Type: pointer |
||
closesocket_cb | * | socket关闭的回调函数 Type: pointer |
||
open_cb | * | 打开文件时CLOEXEC的回调函数 Type: pointer |
||
opaque | * | 应用程序设置的上下文参数,一般用与消息发送后的回调、librdkafka注册函数回调的上下文,主要用于参数传递,由c++到c在转会c++指针寻址上 (set with rd_kafka_conf_set_opaque()) Type: pointer |
||
default_topic_conf | * | 自动订阅主题的默认配置参数 Type: pointer |
||
internal.termination.signal | * | 0 .. 128 | 0 | 370/5000librdkafka将用于快速终止rd_kafka_destroy()的信号。 如果未设置此信号,则会在rd_kafka_wait_destroyed()返回true之前发生延迟,因为内部线程正在超时执行其系统调用。 如果这个信号被设置,但延迟将是最小的。 应用程序应该在安装内部信号处理程序时屏蔽此信号。 Type: integer |
api.version.request | * | true, false | true | 请求broker支持的API版本以调整可用协议功能的功能。 如果设置为false,或者ApiVersionRequest失败,则将使用备用版本broker.version.fallback。 注意:取决于broker版本> = 0.10.0。 如果(较旧的)代理不支持请求,则使用broker.version.fallback回退. Type: boolean |
api.version.request.timeout.ms | * | 1 .. 300000 | 10000 | broker的api版本请求超时时间. Type: integer |
api.version.fallback.ms | * | 0 .. 604800000 | 1200000 | Dictates how long the broker.version.fallback fallback is used in the case the ApiVersionRequest fails. NOTE: The ApiVersionRequest is only issued when a new connection to the broker is made (such as after an upgrade). Type: integer |
broker.version.fallback | * | 0.9.0 | Older broker versions (<0.10.0) provides no way for a client to query for supported protocol features (ApiVersionRequest, see api.version.request) making it impossible for the client to know what features it may use. As a workaround a user may set this property to the expected broker version and the client will automatically adjust its feature set accordingly if the ApiVersionRequest fails (or is disabled). The fallback broker version will be used for api.version.fallback.ms. Valid values are: 0.9.0, 0.8.2, 0.8.1, 0.8.0. Any other value, such as 0.10.2.1, enables ApiVersionRequests. Type: string |
|
security.protocol | * | plaintext, ssl, sasl_plaintext, sasl_ssl | plaintext | 用于和broker通信的协议,默认plaintext. Type: enum value |
ssl.cipher.suites | * | 根据SSL/TLS规范,客户端提交使用的加密算法. See manual page for ciphers(1) and SSL_CTX_set_cipher_list(3). Type: string |
||
ssl.key.location | * | 客户端私钥的路径. Type: string |
||
ssl.key.password | * | 私钥密码 Type: string |
||
ssl.certificate.location | * | 公钥地址. Type: string |
||
ssl.ca.location | * | CA证书路径. Type: string |
||
ssl.crl.location | * | CRL 路径,用于 broker 的证书校验. Type: string |
||
ssl.keystore.location | * | keystore存储路径. Type: string |
||
ssl.keystore.password | * | keystore 密码. Type: string |
||
sasl.mechanisms | * | GSSAPI | 使用 SASL 机制鉴权。 支持:GSSAPI, PLAIN. 提示: 只能配置一种机制名. Type: string |
|
sasl.mechanism | * | Alias for sasl.mechanisms | ||
sasl.kerberos.service.name | * | kafka | Kafka 运行的 Kerberos 首要名, not including /hostname@REALM Type: string |
|
sasl.kerberos.principal | * | kafkaclient | 客户端的 Kerberos 首要名. (Not supported on Windows, will use the logon user’s principal). Type: string |
|
sasl.kerberos.kinit.cmd | * | 完整的 kerberos kinit 命令串,%{config.prop.name} 替换为与配置对象一直的值,%{broker.name} broker 的主机名. Type: string |
||
sasl.kerberos.keytab | * | Kerberos keytab 文件的路径。如果不设置,则使用系统默认的。提示:不会自动使用,必须在 sasl.kerberos.kinit.cmd 中添加到模板 … -t %{sasl.kerberos.keytab}. Type: string |
||
sasl.kerberos.min.time.before.relogin | * | 1 .. 86400000 | 60000 | Key 恢复尝试的最小时间,毫秒. Type: integer |
sasl.username | * | 使用 PLAIN 机制时,SASL 用户名 Type: string |
||
sasl.password | * | 使用 PLAIN 机制时,SASL 密码 Type: string |
||
plugin.library.paths | * | 使用;分割的插件库列表,如果没有标明库文件后缀,则根据平台自动配置dll或者so后缀名. Type: string |
||
interceptors | * | 通过rd_kafka_conf_interceptoes添加拦截器. *Type: * |
||
group.id | * | 客户端分组编号id Type: string |
||
partition.assignment.strategy | * | range,roundrobin | 分区策略名称(轮询、范围). Type: string |
|
session.timeout.ms | * | 1 .. 3600000 | 30000 | 客户端会话超时时间. Type: integer |
heartbeat.interval.ms | * | 1 .. 3600000 | 1000 | 组会话超时时间. Type: integer |
group.protocol.type | * | consumer | 组协议类型 Type: string |
|
coordinator.query.interval.ms | * | 1 .. 3600000 | 600000 | 多久查询一次当前的客户端组协调器。 如果当前分配的协调器已关闭,则在协调器重新分配的情况下,配置的查询时间间隔将除以10以更快地恢复. Type: integer |
enable.auto.commit | C | true, false | true | 在后台周期性的自动提交偏移量. Note: setting this to false does not prevent the consumer from fetching previously committed start offsets. To circumvent this behaviour set specific start offsets per partition in the call to assign(). Type: boolean |
auto.commit.interval.ms | C | 0 .. 86400000 | 5000 | 消费者偏移量提交(写入)到存储的频率,毫秒。(0 = 不可用) . (0 = disable). This setting is used by the high-level consumer. Type: integer |
enable.auto.offset.store | C | true, false | true | Automatically store offset of last message provided to application. Type: boolean |
queued.min.messages | C | 1 .. 10000000 | 100000 | Minimum number of messages per topic+partition librdkafka tries to maintain in the local consumer queue. Type: integer |
queued.max.messages.kbytes | C | 1 .. 2097151 | 1048576 | Maximum number of kilobytes per topic+partition in the local consumer queue. This value may be overshot by fetch.message.max.bytes. This property has higher priority than queued.min.messages. Type: integer |
fetch.wait.max.ms | C | 0 .. 300000 | 100 | Maximum time the broker may wait to fill the response with fetch.min.bytes. Type: integer |
fetch.message.max.bytes | C | 1 .. 1000000000 | 1048576 | Initial maximum number of bytes per topic+partition to request when fetching messages from the broker. If the client encounters a message larger than this value it will gradually try to increase it until the entire message can be fetched. Type: integer |
max.partition.fetch.bytes | C | Alias for fetch.message.max.bytes | ||
fetch.min.bytes | C | 1 .. 100000000 | 1 | Minimum number of bytes the broker responds with. If fetch.wait.max.ms expires the accumulated data will be sent to the client regardless of this setting. Type: integer |
fetch.error.backoff.ms | C | 0 .. 300000 | 500 | How long to postpone the next fetch request for a topic+partition in case of a fetch error. Type: integer |
offset.store.method | C | none, file, broker | broker | Offset commit store method: ‘file’ - local file store (offset.store.path, et.al), ‘broker’ - broker commit store (requires Apache Kafka 0.8.2 or later on the broker). Type: enum value |
consume_cb | C | Message consume callback (set with rd_kafka_conf_set_consume_cb()) Type: pointer |
||
rebalance_cb | C | Called after consumer group has been rebalanced (set with rd_kafka_conf_set_rebalance_cb()) Type: pointer |
||
offset_commit_cb | C | Offset commit result propagation callback. (set with rd_kafka_conf_set_offset_commit_cb()) Type: pointer |
||
enable.partition.eof | C | true, false | true | Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the consumer reaches the end of a partition. Type: boolean |
check.crcs | C | true, false | false | Verify CRC32 of consumed messages, ensuring no on-the-wire or on-disk corruption to the messages occurred. This check comes at slightly increased CPU usage. Type: boolean |
queue.buffering.max.messages | P | 1 .. 10000000 | 100000 | Maximum number of messages allowed on the producer queue. Type: integer |
queue.buffering.max.kbytes | P | 1 .. 2097151 | 1048576 | Maximum total message size sum allowed on the producer queue. This property has higher priority than queue.buffering.max.messages. Type: integer |
queue.buffering.max.ms | P | 0 .. 900000 | 0 | Delay in milliseconds to wait for messages in the producer queue to accumulate before constructing message batches (MessageSets) to transmit to brokers. A higher value allows larger and more effective (less overhead, improved compression) batches of messages to accumulate at the expense of increased message delivery latency. Type: integer |
linger.ms | P | Alias for queue.buffering.max.ms | ||
message.send.max.retries | P | 0 .. 10000000 | 2 | How many times to retry sending a failing MessageSet. Note: retrying may cause reordering. Type: integer |
retries | P | Alias for message.send.max.retries | ||
retry.backoff.ms | P | 1 .. 300000 | 100 | The backoff time in milliseconds before retrying a protocol request. Type: integer |
queue.buffering.backpressure.threshold | P | 0 .. 1000000 | 10 | The threshold of outstanding not yet transmitted requests needed to backpressure the producer’s message accumulator. A lower number yields larger and more effective batches. Type: integer |
compression.codec | P | none, gzip, snappy, lz4 | none | compression codec to use for compressing message sets. This is the default value for all topics, may be overriden by the topic configuration property compression.codec. Type: enum value |
compression.type | P | Alias for compression.codec | ||
batch.num.messages | P | 1 .. 1000000 | 10000 | Maximum number of messages batched in one MessageSet. The total MessageSet size is also limited by message.max.bytes. Type: integer |
delivery.report.only.error | P | true, false | false | Only provide delivery reports for failed messages. Type: boolean |
dr_cb | P | Delivery report callback (set with rd_kafka_conf_set_dr_cb()) Type: pointer |
||
dr_msg_cb | P | Delivery report callback (set with rd_kafka_conf_set_dr_msg_cb()) Type: pointer |
Topic configuration properties
Property | C/P | Range | Default | Description |
---|---|---|---|---|
request.required.acks | P | -1 .. 1000 | 1 | This field indicates how many acknowledgements the leader broker must receive from ISR brokers before responding to the request: 0=Broker does not send any response/ack to client, 1=Only the leader broker will need to ack the message, -1 or all=broker will block until message is committed by all in sync replicas (ISRs) or broker’s min.insync.replicas setting before sending response. Type: integer |
acks | P | Alias for request.required.acks | ||
request.timeout.ms | P | 1 .. 900000 | 5000 | The ack timeout of the producer request in milliseconds. This value is only enforced by the broker and relies on request.required.acks being != 0. Type: integer |
message.timeout.ms | P | 0 .. 900000 | 300000 | Local message timeout. This value is only enforced locally and limits the time a produced message waits for successful delivery. A time of 0 is infinite. Type: integer |
queuing.strategy | P | fifo, lifo | fifo | Producer queuing strategy. FIFO preserves produce ordering, while LIFO prioritizes new messages. WARNING: lifo is experimental and subject to change or removal. Type: enum value |
produce.offset.report | P | true, false | false | Report offset of produced message back to application. The application must be use the dr_msg_cb to retrieve the offset from rd_kafka_message_t.offset. Type: boolean |
partitioner | P | consistent_random | Partitioner: random - random distribution, consistent - CRC32 hash of key (Empty and NULL keys are mapped to single partition), consistent_random - CRC32 hash of key (Empty and NULL keys are randomly partitioned), murmur2 - Java Producer compatible Murmur2 hash of key (NULL keys are mapped to single partition), murmur2_random - Java Producer compatible Murmur2 hash of key (NULL keys are randomly partitioned. This is functionally equivalent to the default partitioner in the Java Producer.). Type: string |
|
partitioner_cb | P | Custom partitioner callback (set with rd_kafka_topic_conf_set_partitioner_cb()) Type: pointer |
||
msg_order_cmp | P | Message queue ordering comparator (set with rd_kafka_topic_conf_set_msg_order_cmp()). Also see queuing.strategy. Type: pointer |
||
opaque | * | Application opaque (set with rd_kafka_topic_conf_set_opaque()) Type: pointer |
||
compression.codec | P | none, gzip, snappy, lz4, inherit | inherit | Compression codec to use for compressing message sets. inherit = inherit global compression.codec configuration. Type: enum value |
compression.type | P | Alias for compression.codec | ||
auto.commit.enable | C | true, false | true | If true, periodically commit offset of the last message handed to the application. This committed offset will be used when the process restarts to pick up where it left off. If false, the application will have to call rd_kafka_offset_store() to store an offset (optional). NOTE: This property should only be used with the simple legacy consumer, when using the high-level KafkaConsumer the global enable.auto.commit property must be used instead. NOTE: There is currently no zookeeper integration, offsets will be written to broker or local file according to offset.store.method. Type: boolean |
enable.auto.commit | C | Alias for auto.commit.enable | ||
auto.commit.interval.ms | C | 10 .. 86400000 | 60000 | The frequency in milliseconds that the consumer offsets are committed (written) to offset storage. This setting is used by the low-level legacy consumer. Type: integer |
auto.offset.reset | C | smallest, earliest, beginning, largest, latest, end, error | largest | Action to take when there is no initial offset in offset store or the desired offset is out of range: ‘smallest’,’earliest’ - automatically reset the offset to the smallest offset, ‘largest’,’latest’ - automatically reset the offset to the largest offset, ‘error’ - trigger an error which is retrieved by consuming messages and checking ‘message->err’. Type: enum value |
offset.store.path | C | . | Path to local file for storing offsets. If the path is a directory a filename will be automatically generated in that directory based on the topic and partition. Type: string |
|
offset.store.sync.interval.ms | C | -1 .. 86400000 | -1 | fsync() interval for the offset file, in milliseconds. Use -1 to disable syncing, and 0 for immediate sync after each write. Type: integer |
offset.store.method | C | file, broker | broker | Offset commit store method: ‘file’ - local file store (offset.store.path, et.al), ‘broker’ - broker commit store (requires “group.id” to be configured and Apache Kafka 0.8.2 or later on the broker.). Type: enum value |
consume.callback.max.messages | C | 0 .. 1000000 | 0 | Maximum number of messages to dispatch in one rd_kafka_consume_callback*() call (0 = unlimited) Type: integer |