下载
安装
解压下载的安装包,进入目录
cd /opt/kafka-2.8.0-src
修改配置文件vi config/server.properties
,增加两行
broker.id=1
zookeeper.connect=127.0.0.1:2181, 127.0.0.1:2182, 127.0.0.1:2183
启动kafka,但是报错,根据提示安装 scala
[mbs@server1 kafka-2.8.0-src]$ bin/kafka-server-start.sh config/server.properties
Classpath is empty. Please build the project first e.g. by running './gradlew jar -PscalaVersion=2.13.5'
[mbs@server1 kafka-2.8.0-src]$ ./gradlew jar -PscalaVersion=2.13.5
……
BUILD SUCCESSFUL in 17m 35s
再次启动kafka
[mbs@server1 kafka-2.8.0-src]$ bin/kafka-server-start.sh config/server.properties
测试
创建topic
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic topictest
replication-factor:副本数量
partitions:分区数量
查询所有topic
bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181
启动生产者
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic topictest
启动消费者
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic topictest
在生产者中写入内容并回车,可以看到消费者中有输出
应用
-
引入 jar 包
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency>
-
代码实现
import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; public class KafkaListenerTest { public static void main(String[] args) { // kafka 配置 Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.62:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); kafkaConsumer.subscribe(Arrays.asList("topictest")); while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println(record.value()); } } } }
-
配置本地 hosts
上述代码运行时报错:
11:50:40.536 [main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-group-1, groupId=group] Error connecting to node server1:9092 (id: 2147483646 rack: null) java.net.UnknownHostException: server1: nodename nor servname provided, or not known at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928) at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323) ……
需要在本地的 hosts 文件中配置一下:
vi /etc/hosts
新增一行:192.168.0.62 server1
就可以正常运行代码了 -
关闭日志
Kafka 自带的日志特别多,可以通过如下配置关闭所有 Kafka 的日志:
<logger name="org.apache.kafka" level="off" />