微信搜索superit|邀请体验:大数据, 数据管理、OLAP分析与可视化平台 | 赞助作者:赞助作者


未分类 aide_941 5℃











bootstrap.servers      host1:port1,host2:port2,....

(host配置的是主机名而不是ip时,出现无法连接:Exception in thread “main” org.apache.kafka.common.config.ConfigException: DNS resolution failed for url in bootstrap.servers





key.serializer            Serializer class for key that implements theSerializer interface.   例如:”org.apache.kafka.common.serialization.StringSerializer”

value.serializer         Serializer class for value that implements theSerializer interface.例如:”org.apache.kafka.common.serialization.StringSerializer”






2.简单的producer往Kafka 集群 broker里面放消息的代码如下;如需从大文件读取数据,产生message,需要考虑大文件读取,不能把整个文件一次性全部读入内存。(http://www.importnew.com/14512.html  Java读取大文件




A Kafka client that publishes records to the Kafka cluster.    Kafka生产者负责向Kafka集群发送消息。



The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.


Here is a simple example of using the producer to send records with strings containing sequential numbers as the key/value pairs.


 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 Producer<String, String> producer = new KafkaProducer<>(props);
 for(int i = 0; i < 100; i++)
     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));




The producer consists of a pool of buffer space that holds records that haven’t yet been transmitted to the server as well as a background I/O thread that is responsible for turning these records into requests and transmitting them to the cluster. Failure to close the producer after use will leak these resources.



The send() method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.




     Future<RecordMetadata>    send(ProducerRecord<K,V> record)
//Asynchronously send a record to a topic.

    Future<RecordMetadata>    send(ProducerRecord<K,V> record, Callback callback)
//Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged.


A Future represents the result of an asynchronous computation. Methods are provided to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation. The result can only be retrieved using method get when the computation has completed, blocking if necessary until it is ready. Cancellation is performed by the cancel method. Additional methods are provided to determine if the task completed normally or was cancelled. Once a computation has completed, the computation cannot be cancelled. If you would like to use a Future for the sake of cancellability but not provide a usable result, you can declare types of the form Future<?> and return null as a result of the underlying task.


The acks config controls the criteria under which requests are considered complete. The “all” setting we have specified will result in blocking on the full commit of the record, the slowest but most durable setting.



If the request fails, the producer can automatically retry, though since we have specified retries as 0 it won’t. Enabling retries also opens up the possibility of duplicates (see the documentation on message delivery semantics for details).



The producer maintains buffers of unsent records for each partition. These buffers are of a size specified by the batch.size config. Making this larger can result in more batching, but requires more memory (since we will generally have one of these buffers for each active partition).



By default a buffer is available to send immediately even if there is additional unused space in the buffer. However if you want to reduce the number of requests you can set linger.ms to something greater than 0. This will instruct the producer to wait up to that number of milliseconds before sending a request in hope that more records will arrive to fill up the same batch. This is analogous to Nagle’s algorithm in TCP. For example, in the code snippet above, likely all 100 records would be sent in a single request since we set our linger time to 1 millisecond. However this setting would add 1 millisecond of latency to our request waiting for more records to arrive if we didn’t fill up the buffer. Note that records that arrive close together in time will generally batch together even with linger.ms=0 so under heavy load batching will occur regardless of the linger configuration; however setting this to something larger than 0 can lead to fewer, more efficient requests when not under maximal load at the cost of a small amount of latency.

默认情况下,缓存记录可以被立即发送,即使它被加入没有使用的缓存区。然而如果你想要减少请求次数,你可以设置linger.ms为大于0的值。它将使得生产者等待指定毫秒数后再发送一个请求,以便等待更多的记录到达填充到一批数据里面。 这类似于TCP中的Nagel(纳格尔)算法。例如,在以上代码段中,100条记录将在一次请求中发送,当我们设置了linger时间为1毫秒。然而这样设置会导致请求增加1毫秒的延时,为了更多的记录到达,在缓冲区没有填满的情况下。请注意,记录近乎同时达到的,将放在同一批,即使linger.ms设置为0,在这种过负载情况下将会略linger设置。然而大多数情况下,设置linger.ms当在非极限负载的情况下,牺牲一点点时延,可以使请求发送数据变少,请求更高效。


The buffer.memory controls the total amount of memory available to the producer for buffering. If records are sent faster than they can be transmitted to the server then this buffer space will be exhausted. When the buffer space is exhausted additional send calls will block. The threshold for time to block is determined by max.block.ms after which it throws a TimeoutException.



阻塞max.block.ms 毫秒,超过该阻塞时间,仍然存在records send速度过快,将抛出异常。

buffer.memory    The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent 
faster than they can be delivered to the server the producer will block for max.block.ms after which it will throw an exception.
This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the 
producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as 
for maintaining in-flight requests.
默认值:long    33554432    [0,...]    high


The key.serializer and value.serializer instruct how to turn the key and value objects the user provides with their ProducerRecord into bytes. You can use the included ByteArraySerializer or StringSerializer for simple string or byte types.

key.serializer和value.serializer设置了如何把用户提供的key和value对象序列化成字节。你可以用ByteArraySerializer 或 StringSerializer 对简单的 字符串对象或者字节数组对象 进行序列化。




The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the 
same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes.
No attempt will be made to batch records larger than this size.
Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent.
A small batch size will make batching less common and may reduce throughput 
(a batch size of zero will disable batching entirely). A very large batch size may use memory 
a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation
of additional records.

max.block.ms 决定send和partitionsFor方法被阻塞的时间。当缓存区满了或者元数据不可用的时间将产生阻塞。

The configuration controls how long KafkaProducer.send() and KafkaProducer.partitionsFor() will block.These methods 
can be blocked either because the buffer is full or metadata unavailable.
Blocking in the user-supplied serializers or partitioner will not 
be counted against this timeout.


min.insync.replicas:When a producer sets acks to “all” (or “-1”),
min.insync.replicas specifies the minimum number of replicas
that must acknowledge a write for the write to be considered successful.

If this minimum cannot be met, then the producer will raise an exception(either NotEnoughReplicas or NotEnoughReplicasAfterAppend).

When used together, min.insync.replicas and acks allow you to enforce greater durability guarantees.


A typical scenario would be to create a topic with a replication factor of 3, set min.insync.replicas to 2,
and produce with acks of “all”. This will ensure that the producer raises an exception if a majority of replicas do not receive a write.







查看所有topic:   ./kafka-topics.sh –list –zookeeper host1:port,host2:port

查看指定topic:  ./kafka-topics.sh –describe –zookeeper localhost:port –topic  kafka_topic

topic消息在磁盘中分布情况:  du -sh /data*/zdh/kafka/data/kafka_topic*



./kafka-console-consumer.sh –zookeeper host:2181 –topic kafka_topic   –from beginning   (–from beginning 是从头开始消费,不加则是消费当前正在发送到该topic的消息)

转载请注明:SuperIT » Kafka实践1–Producer

喜欢 (0)or分享 (0)