🌀 技术人生
凡事有交代,件件有着落,事事有回音
Kafka生产者Java API

准备工作:

maven工程,zookeeper集群

1.开启Kafka集群,这里可以参考我之前的文章,里面有详细的教程

2.Java API编程

maven的pom.xml文件

1
2
3
4
5
6
7
<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.0.0</version>
    </dependency>
</dependencies>

ProducerTest.java

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package cn.ysjh;

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class ProducerTest {
    public static void main(String args[]) {
        //1.配置生产者属性 
        Properties props = new Properties();
        // Kafka服务端的主机名和端口号,可以是多个 
        props.put("bootstrap.servers", "172.17.0.3:9092");
        //配置发送的消息是否等待应答 
        props.put("acks", "all");
        //配置消息发送失败的重试 
        props.put("retries", 0);
        // 批量处理数据的大小:16kb 
        props.put("batch.size", 16384);
        // 设置批量处理数据的延迟,单位:ms 
        props.put("linger.ms", 1);
        // 设置内存缓冲区的大小 
        props.put("buffer.memory", 33554432);
        //数据在发送之前一定要序列化 
        // key序列化 
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value序列化 
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //2.实例化KafkaProducer 
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 50; i++) {
            //3.调用Producer的send方法,进行消息的发送,每条待发送的消息,都必须封装为一个Record对象 
            producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), Integer.toString(i)));
        }
        //4.close释放资源 
        producer.close();
    }
}

3.在Kafka集群中开启一个消费者

bin/kafka-console-consumer.sh –bootstrap-server cdh0:9092 –topic test

我之前按照这个步骤一直不能成功写入数据,一直没有找到原因,后来才发现原因,是这里需要进行额外一个配置文件的配置

修改 server.properties中的

这里要改为ip地址,改过之后就能成功写入数据了


最后修改于 2022-09-22

知识共享许可协议
本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。