kafka

概览

http://kafka.apache.org/

http://kafka.apache.org/intro

Kafka is run as a cluster on one or more servers that can span multiple datacenters.
The Kafka cluster stores streams of records in categories called topics.
Each record consists of a key, a value, and a timestamp.

Kafka has five core APIs: Producer API , Consumer API , Streams API , Connector API , Admin API

单机安装

###################################################### 安装jdk ######################################################
core-services >> jdk.md

#################################### 下载并解压程序 ########################################################################
mkdir -p /xdata && cd /xdata && ls
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz
OR http://192.168.1.8/chfs/shared/linux-pkg/kafka_2.12-2.5.0.tgz

tar xf kafka_2.12-2.5.0.tgz
cd /xdata/kafka_2.12-2.5.0

######################################## 启动 zookeeper 和 kafka ############################################################
bin/zookeeper-server-start.sh
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties OR
/xdata/kafka_2.12-2.5.0/bin/zookeeper-server-start.sh -daemon /xdata/kafka_2.12-2.5.0/config/zookeeper.properties

bin/kafka-server-start.sh
bin/kafka-server-start.sh -daemon config/server.properties OR
/xdata/kafka_2.12-2.5.0/bin/kafka-server-start.sh -daemon /xdata/kafka_2.12-2.5.0/config/server.properties

######################################## 生产消费数据验证 ####################################################################
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
> bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
This is a message
This is another message
另外启动一个窗口消费
cd /xdata/kafka_2.12-2.5.0
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

#################################################### 停服务 ############################################################# 

先停止kafka,再停止zookeeper
/xdata/kafka_2.12-2.5.0/bin/kafka-server-stop.sh
需要过几秒才能停掉,可以使用jps查看
/xdata/kafka_2.12-2.5.0/bin/zookeeper-server-stop.sh

####################################################### 日志信息 ########################################################
[root@node1 logs]# ls -rlht --full-time /xdata/kafka_2.12-2.5.0/logs
total 992K
-rw-r--r-- 1 root root    0 2020-05-25 18:02:21.376252853 +0800 kafka-request.log
-rw-r--r-- 1 root root    0 2020-05-25 18:02:21.377252792 +0800 kafka-authorizer.log
-rw-r--r-- 1 root root  168 2020-05-25 18:02:21.415250500 +0800 state-change.log
-rw-r--r-- 1 root root  12K 2020-05-25 18:03:02.443775639 +0800 zookeeper.out
-rw-r--r-- 1 root root 1.7K 2020-05-25 18:03:02.628764480 +0800 zookeeper-gc.log.0.current
-rw-r--r-- 1 root root  172 2020-05-25 18:03:03.531710014 +0800 log-cleaner.log
-rw-r--r-- 1 root root 5.5K 2020-05-25 18:03:06.353539800 +0800 kafkaServer-gc.log.0.current
-rw-r--r-- 1 root root 143K 2020-05-25 18:03:55.521573959 +0800 server.log
-rw-r--r-- 1 root root 132K 2020-05-25 18:03:55.521573959 +0800 kafkaServer.out
-rw-r--r-- 1 root root 287K 2020-05-25 18:03:55.521573959 +0800 controller.log

################################################### 相关冗余信息 ################################################################
[root@node1 kafka_2.12-2.5.0]# ps -ef | grep kafka
root      2446     1  0 18:02 pts/2    00:00:01 /opt/jdk1.8.0_231//bin/java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/xdata/kafka_2.12-2.5.0/bin/../logs/zookeeper-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/xdata/kafka_2.12-2.5.0/bin/../logs -Dlog4j.configuration=file:/xdata/kafka_2.12-2.5.0/bin/../config/log4j.properties -cp /opt/jdk1.8.0_231//lib:/opt/jdk1.8.0_231//jre/lib::/xdata/kafka_2.12-2.5.0/bin/../libs/activation-1.1.1.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/aopalliance-repackaged-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/argparse4j-0.7.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/audience-annotations-0.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/commons-cli-1.4.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/commons-lang3-3.8.1.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/connect-api-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/connect-basic-auth-extension-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/connect-file-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/connect-json-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/connect-mirror-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/connect-mirror-client-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/connect-runtime-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/connect-transforms-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/hk2-api-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/hk2-locator-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/hk2-utils-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jackson-annotations-2.10.2.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jackson-core-2.10.2.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jackson-databind-2.10.2.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jackson-dataformat-csv-2.10.2.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jackson-datatype-jdk8-2.10.2.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jackson-jaxrs-base-2.10.2.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jackson-jaxrs-json-provider-2.10.2.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jackson-module-jaxb-annotations-2.10.2.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jackson-module-paranamer-2.10.2.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jackson-module-scala_2.12-2.10.2.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jakarta.activation-api-1.2.1.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jakarta.annotation-api-1.3.4.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jakarta.inject-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jakarta.ws.rs-api-2.1.5.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/javassist-3.22.0-CR2.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/javassist-3.26.0-GA.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/javax.servlet-api-3.1.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/javax.ws.rs-api-2.1.1.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jaxb-api-2.3.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jersey-client-2.28.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jersey-common-2.28.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jersey-container-servlet-2.28.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jersey-container-servlet-core-2.28.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jersey-hk2-2.28.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jersey-media-jaxb-2.28.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jersey-server-2.28.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jetty-client-9.4.24.v20191120.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jetty-continuation-9.4.24.v20191120.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jetty-http-9.4.24.v20191120.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jetty-io-9.4.24.v20191120.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jetty-security-9.4.24.v20191120.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jetty-server-9.4.24.v20191120.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jetty-servlet-9.4.24.v20191120.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jetty-servlets-9.4.24.v20191120.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jetty-util-9.4.24.v20191120.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jopt-simple-5.0.4.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/kafka_2.12-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/kafka_2.12-2.5.0-sources.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/kafka-clients-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/kafka-log4j-appender-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/kafka-streams-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/kafka-streams-examples-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/kafka-streams-scala_2.12-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/kafka-streams-test-utils-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/kafka-tools-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/log4j-1.2.17.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/lz4-java-1.7.1.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/maven-artifact-3.6.3.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/metrics-core-2.2.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/netty-buffer-4.1.45.Final.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/netty-codec-4.1.45.Final.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/netty-common-4.1.45.Final.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/netty-handler-4.1.45.Final.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/netty-resolver-4.1.45.Final.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/netty-transport-4.1.45.Final.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/netty-transport-native-epoll-4.1.45.Final.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/netty-transport-native-unix-common-4.1.45.Final.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/osgi-resource-locator-1.0.1.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/paranamer-2.8.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/plexus-utils-3.2.1.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/reflections-0.9.12.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/rocksdbjni-5.18.3.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/scala-collection-compat_2.12-2.1.3.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/scala-java8-compat_2.12-0.9.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/scala-library-2.12.10.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/scala-logging_2.12-3.9.2.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/scala-reflect-2.12.10.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/slf4j-api-1.7.30.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/slf4j-log4j12-1.7.30.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/snappy-java-1.1.7.3.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/validation-api-2.0.1.Final.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/zookeeper-3.5.7.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/zookeeper-jute-3.5.7.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/zstd-jni-1.4.4-7.jar org.apache.zookeeper.server.quorum.QuorumPeerMain /xdata/kafka_2.12-2.5.0/config/zookeeper.properties

root      2808     1  4 18:02 pts/2    00:00:08 /opt/jdk1.8.0_231//bin/java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/xdata/kafka_2.12-2.5.0/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/xdata/kafka_2.12-2.5.0/bin/../logs -Dlog4j.configuration=file:/xdata/kafka_2.12-2.5.0/bin/../config/log4j.properties -cp /opt/jdk1.8.0_231//lib:/opt/jdk1.8.0_231//jre/lib::/xdata/kafka_2.12-2.5.0/bin/../libs/activation-1.1.1.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/aopalliance-repackaged-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/argparse4j-0.7.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/audience-annotations-0.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/commons-cli-1.4.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/commons-lang3-3.8.1.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/connect-api-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/connect-basic-auth-extension-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/connect-file-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/connect-json-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/connect-mirror-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/connect-mirror-client-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/connect-runtime-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/connect-transforms-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/hk2-api-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/hk2-locator-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/hk2-utils-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jackson-annotations-2.10.2.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jackson-core-2.10.2.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jackson-databind-2.10.2.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jackson-dataformat-csv-2.10.2.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jackson-datatype-jdk8-2.10.2.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jackson-jaxrs-base-2.10.2.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jackson-jaxrs-json-provider-2.10.2.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jackson-module-jaxb-annotations-2.10.2.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jackson-module-paranamer-2.10.2.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jackson-module-scala_2.12-2.10.2.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jakarta.activation-api-1.2.1.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jakarta.annotation-api-1.3.4.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jakarta.inject-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jakarta.ws.rs-api-2.1.5.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/javassist-3.22.0-CR2.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/javassist-3.26.0-GA.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/javax.servlet-api-3.1.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/javax.ws.rs-api-2.1.1.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jaxb-api-2.3.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jersey-client-2.28.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jersey-common-2.28.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jersey-container-servlet-2.28.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jersey-container-servlet-core-2.28.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jersey-hk2-2.28.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jersey-media-jaxb-2.28.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jersey-server-2.28.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jetty-client-9.4.24.v20191120.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jetty-continuation-9.4.24.v20191120.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jetty-http-9.4.24.v20191120.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jetty-io-9.4.24.v20191120.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jetty-security-9.4.24.v20191120.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jetty-server-9.4.24.v20191120.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jetty-servlet-9.4.24.v20191120.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jetty-servlets-9.4.24.v20191120.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jetty-util-9.4.24.v20191120.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/jopt-simple-5.0.4.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/kafka_2.12-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/kafka_2.12-2.5.0-sources.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/kafka-clients-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/kafka-log4j-appender-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/kafka-streams-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/kafka-streams-examples-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/kafka-streams-scala_2.12-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/kafka-streams-test-utils-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/kafka-tools-2.5.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/log4j-1.2.17.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/lz4-java-1.7.1.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/maven-artifact-3.6.3.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/metrics-core-2.2.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/netty-buffer-4.1.45.Final.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/netty-codec-4.1.45.Final.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/netty-common-4.1.45.Final.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/netty-handler-4.1.45.Final.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/netty-resolver-4.1.45.Final.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/netty-transport-4.1.45.Final.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/netty-transport-native-epoll-4.1.45.Final.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/netty-transport-native-unix-common-4.1.45.Final.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/osgi-resource-locator-1.0.1.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/paranamer-2.8.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/plexus-utils-3.2.1.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/reflections-0.9.12.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/rocksdbjni-5.18.3.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/scala-collection-compat_2.12-2.1.3.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/scala-java8-compat_2.12-0.9.0.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/scala-library-2.12.10.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/scala-logging_2.12-3.9.2.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/scala-reflect-2.12.10.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/slf4j-api-1.7.30.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/slf4j-log4j12-1.7.30.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/snappy-java-1.1.7.3.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/validation-api-2.0.1.Final.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/zookeeper-3.5.7.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/zookeeper-jute-3.5.7.jar:/xdata/kafka_2.12-2.5.0/bin/../libs/zstd-jni-1.4.4-7.jar kafka.Kafka /xdata/kafka_2.12-2.5.0/config/server.properties

[root@node3 kafka_2.12-2.5.0]# netstat -lntup | grep java
tcp6       0      0 :::45166                :::*                    LISTEN      5243/java
tcp6       0      0 :::32771                :::*                    LISTEN      4881/java
tcp6       0      0 :::9092                 :::*                    LISTEN      5243/java
tcp6       0      0 :::2181                 :::*                    LISTEN      4881/java
[root@node3 kafka_2.12-2.5.0]# jps
4881 QuorumPeerMain
5243 Kafka

集群安装

集群环境可以提供高可用,奇数机器,多数原则,3台机器可以允许1台挂掉,下面以3台机器为例

###################################################### 机器IP信息 ######################################################
192.168.1.113 node1
192.168.1.114 node2
192.168.1.118 node3

如果后面配置集群信息写IP或者别名的时候,最好用主机名,并且这个主机名可以解析成对应的IP

[root@node1 ~]# cat /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6

192.168.1.113 node1
192.168.1.114 node2
192.168.1.118 node3

###################################################### 安装jdk(3台) ######################################################
core-services >> jdk.md

###################################################### 下载并解压二进制文件(3台) ###########################################
mkdir -p /xdata && cd /xdata && ls
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz
OR http://192.168.1.8/chfs/shared/linux-pkg/kafka_2.12-2.5.0.tgz

tar xf kafka_2.12-2.5.0.tgz
cd /xdata/kafka_2.12-2.5.0

###################################################### 配置zookeeper集群 ######################################################
3台机器分别执行
cd /xdata/kafka_2.12-2.5.0
cp config/zookeeper.properties{,.bak}
egrep -v "^$|^#" config/zookeeper.properties.bak > config/zookeeper.properties

机器1
mkdir -p /xdata/kzookeeper-data; echo 1 > /xdata/kzookeeper-data/myid
机器2
mkdir -p /xdata/kzookeeper-data; echo 2 > /xdata/kzookeeper-data/myid
机器3
mkdir -p /xdata/kzookeeper-data; echo 3 > /xdata/kzookeeper-data/myid

3台机器分别执行
cat << 'EOF' > config/zookeeper.properties
dataDir=/xdata/kzookeeper-data
clientPort=2181
maxClientCnxns=0
admin.enableServer=false
server.1=node1:2888:3888
server.2=node2:2888:3888
server.3=node3:2888:3888
tickTime=2000
initLimit=10
syncLimit=5
EOF
echo; cat config/zookeeper.properties

###################################################### 启动zookeeper集群 ######################################################
/xdata/kafka_2.12-2.5.0/bin/zookeeper-server-start.sh -daemon /xdata/kafka_2.12-2.5.0/config/zookeeper.properties

###################################################### 验证zookeeper集群 ######################################################
ls -lrht --full-time /xdata/kafka_2.12-2.5.0/logs/
tailf /xdata/kafka_2.12-2.5.0/logs/server.log
/xdata/kafka_2.12-2.5.0/bin/zookeeper-shell.sh
/xdata/kafka_2.12-2.5.0/bin/zookeeper-shell.sh localhost:2181 create /zk_test my_data
/xdata/kafka_2.12-2.5.0/bin/zookeeper-shell.sh localhost:2181 ls /
/xdata/kafka_2.12-2.5.0/bin/zookeeper-shell.sh localhost:2181 delete /zk_test

###################################################### 配置kafka集群 ######################################################
3台机器分别执行
cd /xdata/kafka_2.12-2.5.0
cp config/server.properties{,.bak}
egrep -v "^$|^#" config/server.properties.bak > config/server.properties



机器1
cat << 'EOF' > config/server.properties
broker.id=111
listeners=PLAINTEXT://node1:9092
log.dirs=/xdata/kafka-data
zookeeper.connect=node1:2181,node2:2181,node3:2181
auto.create.topics.enable=false
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true
EOF

机器2
cat << 'EOF' > config/server.properties
broker.id=222
listeners=PLAINTEXT://node2:9092
log.dirs=/xdata/kafka-data
zookeeper.connect=node1:2181,node2:2181,node3:2181
auto.create.topics.enable=false
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true
EOF

机器3
cat << 'EOF' > config/server.properties
broker.id=333
listeners=PLAINTEXT://node3:9092
log.dirs=/xdata/kafka-data
zookeeper.connect=node1:2181,node2:2181,node3:2181
auto.create.topics.enable=false
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true
EOF

若果要增加集群中的机器,只要需要增加一份类似的配置文件,启动即可,扩展性很友好

###################################################### 启动kafka集群 ######################################################
/xdata/kafka_2.12-2.5.0/bin/kafka-server-start.sh -daemon /xdata/kafka_2.12-2.5.0/config/server.properties

###################################################### 验证kafka集群 ######################################################
ls -lrht --full-time /xdata/kafka_2.12-2.5.0/logs/
tailf /xdata/kafka_2.12-2.5.0/logs/server.log

bin/kafka-topics.sh --create --bootstrap-server node1:9092 --replication-factor 1 --partitions 1 --topic test
bin/kafka-topics.sh --list --bootstrap-server node1:9092

这里的查询必须使用node1 node2 node3这样来访问kafka server,如果使用localhost 127.0.0.1这种会报错,找不到server

###################################################### 相关冗余信息 ######################################################
[root@node1 kafka_2.12-2.5.0]# ls /xdata/
kafka_2.12-2.5.0  kafka_2.12-2.5.0.tgz  kafka-data  kzookeeper-data

[root@node1 kafka_2.12-2.5.0]# jps
6755 Jps
2152 QuorumPeerMain
6651 Kafka

[root@node1 kafka_2.12-2.5.0]# netstat -lntup | grep java
tcp6       0      0 :::42154                :::*                    LISTEN      2152/java           
tcp6       0      0 192.168.1.113:3888      :::*                    LISTEN      2152/java           
tcp6       0      0 :::41504                :::*                    LISTEN      6651/java           
tcp6       0      0 192.168.1.113:9092      :::*                    LISTEN      6651/java           
tcp6       0      0 :::2181                 :::*                    LISTEN      2152/java    

[root@node1 kafka_2.12-2.5.0]# ls /xdata/kafka-data/
cleaner-offset-checkpoint  log-start-offset-checkpoint  meta.properties  recovery-point-offset-checkpoint  replication-offset-checkpoint
[root@node1 kafka_2.12-2.5.0]# ls /xdata/kzookeeper-data/
myid  version-2
[root@node1 kafka_2.12-2.5.0]# ls /xdata/kzookeeper-data/version-2/
acceptedEpoch  currentEpoch  log.100000001  snapshot.0

kafka 消费

docker exec -it schema_registry kafka-avro-console-consumer --bootstrap-server kafka1:9092 --timeout-ms 30000 --from-beginning --topic v2_dptask_426.42_moka_production.assignments.171541 &> /tmp/tmp.txt

docker exec $(docker ps | grep schema | awk '{print $1}') kafka-avro-console-consumer --topic dptask_123.crh.gt_interface_analyse --bootstrap-server kafka1:9092 --partition 0 --offset 70727 &> /tmp/tmp.txt

docker exec $(docker ps | grep schema | awk '{print $1}') kafka-avro-console-consumer --topic dptask_123.crh.gt_interface_analyse --bootstrap-server kafka1:9092 --from-beginning --timeout-ms 120000 &> /tmp/tmp.txt

更改topic回收时间 快速腾出磁盘空间

docker exec -it zk1 kafka-configs --zookeeper zk1:2181 --entity-type topics --entity-name topic-name --describe
docker exec -it zk1 kafka-configs --zookeeper zk1:2181 --entity-type topics --entity-name topic-name --alter --add-config retention.ms=2000

手动彻底删除topic

https://www.cnblogs.com/musen/p/11475695.html


############################ kafka 命令删除topic ############################
docker exec zk1 kafka-topics --delete --zookeeper zk1:2181 --topic topic-name

docker exec zk1 kafka-topics --delete --zookeeper zk1:2181 --topic topic-name

############################ 删除zookeeper注册路径 ############################
docker exec  zk1  zookeeper-shell  zk1:2181 rmr /config/topics/topic-name
docker exec  zk1  zookeeper-shell  zk1:2181 rmr /brokers/topics/topic-name
docker exec  zk1  zookeeper-shell zk1:2181  rmr /admin/delete_topics/topic-name

############################ 删除kafka log.dirs 实际目录 ############################
du -sh * | sort -rhk1 | head
du -sh * | fgrep "v2_dptask_1093.orcl.PRESALES.dp_person_lan0021"
rm -f dir

跳过kafka里面的脏数据

客户的任务报错,提示无法avro序列化,使用非avro把这个topic里的数据消费出来,可以用 &> /tmp/tmp.txt 进行定位,消费出来的数据是二进制文件,可以用strings命令序列化得到数据类似下图,可以看到有脏数据

解决思路是直接跳过这一条offset,参考命令如下

consumer group要inactive

1.观察consumer group 找到有问题的topic 记住offset
docker exec -it kafka1 kafka-consumer-groups --bootstrap-server kafka1:9092 --group connect-dp-tidb-connector-dptask_6_1 --describe
2.从有问题的offset开始消费,找到有几条脏数据
3.--shift-by 后面填写向后移的offset个数,不加--execute可以看运行结果但没有运行,确认运行结果没问题
docker exec -it kafka1 kafka-consumer-groups --bootstrap-server kafka1:9092 --group connect-dp-tidb-connector-dptask_6_1 --topic dptask_6.dp_test.UPDATE3123:0 --reset-offsets --shift-by 1
4.docker exec -it kafka1 kafka-consumer-groups --bootstrap-server kafka1:9092 --group connect-dp-tidb-connector-dptask_6_1 --topic dptask_6.dp_test.UPDATE3123:0 --reset-offsets --shift-by 1 --execute

参考资料

http://kafka.apache.org/quickstart

http://kafka.apache.org/documentation/#prodconfig

http://kafka.apache.org/quickstart