Kafka

Jul 26, 2021


下载

下载地址

安装

解压下载的安装包,进入目录

cd /opt/kafka-2.8.0-src

修改配置文件vi config/server.properties,增加两行

broker.id=1
zookeeper.connect=127.0.0.1:2181, 127.0.0.1:2182, 127.0.0.1:2183

启动kafka,但是报错,根据提示安装 scala

[mbs@server1 kafka-2.8.0-src]$ bin/kafka-server-start.sh config/server.properties
Classpath is empty. Please build the project first e.g. by running './gradlew jar -PscalaVersion=2.13.5'
[mbs@server1 kafka-2.8.0-src]$ ./gradlew jar -PscalaVersion=2.13.5
……
BUILD SUCCESSFUL in 17m 35s

再次启动kafka

[mbs@server1 kafka-2.8.0-src]$ bin/kafka-server-start.sh config/server.properties

测试

创建topic

bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic topictest

replication-factor:副本数量
partitions:分区数量

查询所有topic

bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181

启动生产者

bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic topictest

启动消费者

bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic topictest

在生产者中写入内容并回车,可以看到消费者中有输出

应用

  1. 引入 jar 包

     <dependency>
         <groupId>org.apache.kafka</groupId>
         <artifactId>kafka-clients</artifactId>
         <version>2.4.1</version>
     </dependency>
    
  2. 代码实现

     import java.util.Arrays;
     import java.util.Properties;
     import org.apache.kafka.clients.consumer.ConsumerConfig;
     import org.apache.kafka.clients.consumer.ConsumerRecord;
     import org.apache.kafka.clients.consumer.ConsumerRecords;
     import org.apache.kafka.clients.consumer.KafkaConsumer;
     import org.apache.kafka.common.serialization.StringDeserializer;
    	
     public class KafkaListenerTest {
    	
         public static void main(String[] args) {
             // kafka 配置
             Properties properties = new Properties();
             properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.62:9092");
             properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
             properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
             properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
    	
             KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
             kafkaConsumer.subscribe(Arrays.asList("topictest"));
             while (true) {
                 ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                 for (ConsumerRecord<String, String> record : records) {
                     System.out.println(record.value());
                 }
             }
         }
     }
    
  3. 配置本地 hosts

    上述代码运行时报错:

     11:50:40.536 [main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-group-1, groupId=group] Error connecting to node server1:9092 (id: 2147483646 rack: null)
     java.net.UnknownHostException: server1: nodename nor servname provided, or not known
         at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
         at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928)
         at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323)
         ……
    

    需要在本地的 hosts 文件中配置一下:vi /etc/hosts
    新增一行:192.168.0.62 server1
    就可以正常运行代码了

  4. 关闭日志

    Kafka 自带的日志特别多,可以通过如下配置关闭所有 Kafka 的日志:

     <logger name="org.apache.kafka" level="off" />
    

扩展阅读

  1. 这里