一看必会系列:kafka和zookeeper集群搭建及测试

来源:本站原创 Linux 超过264 views围观 0条评论

——————–注意开启开机启动

——————–三台服务器安装JDK
yum  install  java-1.8.0-openjdk   java-1.8.0-openjdk-devel -y

[root@centos010 ~]# java -v
Unrecognized option: -v
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
[root@centos010 ~]# java -version
openjdk version "1.8.0_171"
OpenJDK Runtime Environment (build 1.8.0_171-b10)
OpenJDK 64-Bit Server VM (build 25.171-b10, mixed mode)

配置环境
vim /etc/profile
     #set java environment 
    JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk-1.8.0.171-8.b10.el7_5.x86_64
    PATH=$PATH:$JAVA_HOME/bin
    CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
    export JAVA_HOME  CLASSPATH  PATH 

重读变量
[root@centos010 ~]# source /etc/profile

看是否正常
打印java_hosm
[root@centos010 ~]# echo $JAVA_HOME
/usr/lib/jvm/jre-1.8.0-openjdk-1.8.0.171-8.b10.el7_5.x86_64
打印calsspath
[root@centos010 ~]# echo $CLASSPATH 
.:/usr/lib/jvm/jre-1.8.0-openjdk-1.8.0.171-8.b10.el7_5.x86_64/lib/dt.jar:/usr/lib/jvm/jre-1.8.0-openjdk-1.8.0.171-8.b10.el7_5.x86_64/lib/tools.jar

————————安装zookeeper

#我的目录统一放在/opt下面
mkdir /opt
#首先创建Zookeeper项目目录
#项目目录
mkdir -p /opt/zookeeper
#存放快照日志
mkdir -p /opt/zookeeper/zkdata
#存放事物日志
mkdir -p /opt/zookeeper/zkdatalog

wget https://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.12/
cd /opt/zookeeper/
tar -zxvf zookeeper-3.4.12.tar.gz
cp zookeeper-3.4.12/conf/zoo_sample.cfg zookeeper-3.4.12/conf/zoo.cfg

#三台配置一样
vim zookeeper-3.4.12/conf/zoo.cfg
# example sakes.
dataDir=/opt/zookeeper/zkdata
#新加配置
dataLogDir=/opt/zookeeper/zkdatalog
# the port at which the clients will connect
clientPort=2181
server.1=192.168.142.130:2888:3888
server.2=192.168.142.131:2888:3888
server.3=192.168.142.132:2888:3888
#server.1 这个1是服务器的标识也可以是其他的数字, 表示这个是第几号服务器,用来标识服务器,这个标识要写到快照目录下面myid文件里
#192.168.142.107为集群里的IP地址,第一个端口是master和slave之间的通信端口,默认是2888,第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口默认是3888

创建myid文件

#server1
echo "1" > /opt/zookeeper/zkdata/myid
#server2
echo "2" > /opt/zookeeper/zkdata/myid
#server3
echo "3" > /opt/zookeeper/zkdata/myid

三台服务器启动服务
/opt/zookeeper/zookeeper-3.4.12/bin/zkServer.sh start

查看三台的状态
[root@centos010 ~]# /opt/zookeeper/zookeeper-3.4.12/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/zookeeper-3.4.12/bin/../conf/zoo.cfg
Mode: follower
[root@centos011 ~]# /opt/zookeeper/zookeeper-3.4.12/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/zookeeper-3.4.12/bin/../conf/zoo.cfg
Mode: leader     ——这个是leader随机生成
[root@centos012 ~]# /opt/zookeeper/zookeeper-3.4.12/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/zookeeper-3.4.12/bin/../conf/zoo.cfg
Mode: follower

可以用“jps”查看zk的进程,这个是zk的整个工程的main
[root@centos012 ~]# jps
1442 Jps
1348 QuorumPeerMain

———-启动排错

[root@centos010 ~]# /opt/zookeeper/zookeeper-3.4.12/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/zookeeper-3.4.12/bin/../conf/zoo.cfg
Error contacting service. It is probably not running.  ——–出现这个不要慌,只要配置正确,再一台一台启动就OK了

如果不是不行 如下操作
mv  /opt/zookeeper/zkdata/zookeeper_server.pid  /tmp/
tail -f /root/zookeeper.out  查看日志

java.net.ConnectException: Connection refused (Connection refused)
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:558)
    at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:610)
    at org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(FastLeaderElection.java:838)
    at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:957)
2018-07-02 11:19:05,937 [myid:1] – INFO  [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:QuorumPeer$QuorumServer@184] – Resolved hostname: 192.168.142.132 to address: /192.168.142.132
2018-07-02 11:19:05,937 [myid:1] – INFO  [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:FastLeaderElection@847] – Notification time out: 51200
2018-07-02 11:19:57,139 [myid:1] – WARN  [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:QuorumCnxManager@584] – Cannot open channel to 2 at election address /192.168.142.131:3888
java.net.ConnectException: Connection refused (Connection refused)   ——-

检查防火墙,是否开放端口
检查对端服务器 ZK是否起来

———————————-安装KAFKA

下载包
https://mirrors.cnnic.cn/apache/kafka/1.1.0/kafka_2.12-1.1.0.tgz  这个快

mkdir -p /opt/kafka/kafkalogs

tar -zxvf kafka_2.12-1.1.0.tgz
cd /opt/kafka/kafka_2.12-1.1.0/config
cp server.properties server.properties.bak
cp zookeeper.properties zookeeper.properties.bak

vim server.properties

启动kafka
cd /opt/kafka/kafka_2.12-1.1.0/bin
./kafka-server-start.sh -daemon ../config/server.properties
               :::*                    LISTEN      1062/master        

三台服务器查看
[root@centos010 bin]# jps
1526 QuorumPeerMain
1974 Kafka
1997 Jps

[root@centos012 bin]# !net
netstat -ntlp
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name   
tcp        0      0 0.0.0.0:22              0.0.0.0:*               LISTEN      978/sshd           
tcp        0      0 127.0.0.1:25            0.0.0.0:*               LISTEN      1062/master        
tcp6       0      0 :::2181                 :::*                    LISTEN      1348/java          
tcp6       0      0 :::37797                :::*                    LISTEN      1348/java          
tcp6       0      0 :::36009                :::*                    LISTEN      1756/java          
tcp6       0      0 192.168.142.132:3888    :::*                    LISTEN      1348/java          
tcp6       0      0 :::22                   :::*                    LISTEN      978/sshd           
tcp6       0      0 ::1:25  

创建#创建Topic
./kafka-topics.sh –create –zookeeper 192.168.142.130:2181 –replication-factor 2 –partitions 1 –topic niux
#解释
–replication-factor 2   #复制两份
–partitions 1 #创建1个分区
–topic #主题为shuaige

执行结果
[root@centos010 bin]#
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Created topic "niux".   ——表示成功
[root@centos010 bin]#

#创建一个broker,发布者
[root@centos011 bin]# ./kafka-console-producer.sh –broker-list 192.168.142.130:9092,192.168.142.131:9092,192.168.142.132:9092 –topic niux
OpenJDK 64-Bit Server VM warning: If the number of procenfigure the number of parallel GC threads appropriately
>d     ———这里生产消息
d

”’在一台服务器上创建一个订阅者”’
[root@centos012 bin]# ./kafka-console-consumer.sh –zookeeper 192.168.142.130:2181,192.168.142.131:2181,192.168.142.132:2181 –topic niux –from-beginning
OpenJDK 64-Bit Server VM warning: If the number of procenfigure the number of parallel GC threads appropriately
Using the ConsoleConsumer with old consumer is deprecateusing the new consumer by passing [bootstrap-server] ins
4    —-这里及以下为收到的消息
d       
d

——正确的测试方案
其中一台服务器运行 做为生产者
./kafka-console-producer.sh –broker-list IP:9092 –topic niux
另两台服务器运行 做为消费者
./kafka-console-consumer.sh –zookeeper IP:2181 –topic niux –from-beginning

生产者进行信息输入的同时 另两台消费者如能同时出现信息即为正常

查看topic

./kafka-topics.sh –list –zookeeper localhost:2181
#就会显示我们创建的所有topic
[root@centos010 bin]# ./kafka-topics.sh –list –zookeeper localhost:2181
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
niux
[root@centos010 bin]#

查看topic状态
./kafka-topics.sh –describe –zookeeper localhost:2181 –topic niux

[root@centos010 bin]# ./kafka-topics.sh –describe –zookeeper localhost:2181 –topic niux
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Topic:niux    PartitionCount:1    ReplicationFactor:2    Configs:
    Topic: niux    Partition: 0    Leader: 131    Replicas: 131,132    Isr: 131,132
#分区为为1  复制因子为2   他的  niux的分区为0
#Replicas: 131,132   复制的为131,132
#

——————-配置文件解释  zoo.cfg

#tickTime:
这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
#initLimit:
这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒
#syncLimit:
这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是5*2000=10秒
#dataDir:
快照日志的存储路径
#dataLogDir:
事物日志的存储路径,如果不配置这个那么事物日志会默认存储到dataDir制定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事物日志、快照日志太多
#clientPort:
这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。修改他的端口改大点

重要配置说明

1、myid文件和server.myid  在快照目录下存放的标识本台服务器的文件,他是整个zk集群用来发现彼此的一个重要标识。

2、zoo.cfg 文件是zookeeper配置文件 在conf目录里。

3、log4j.properties文件是zk的日志输出文件 在conf目录里用java写的程序基本上有个共同点日志都用log4j,来进行管理。

——————-配置文件解释  server.properties
broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
port=9092 #当前kafka对外提供服务的端口默认是9092
host.name=192.168.142.100 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880  #消息保存的最大值5M
default.replication.factor=2  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880  #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=192.168.142.100:12181,192.168.142.101:12181,192.168.142.107:1218 #设置zookeeper的连接端口

——————-Kafka初识

1、Kafka使用背景

在我们大量使用分布式数据库、分布式计算集群的时候,是否会遇到这样的一些问题:

    我们想分析下用户行为(pageviews),以便我们设计出更好的广告位
    我想对用户的搜索关键词进行统计,分析出当前的流行趋势
    有些数据,存储数据库浪费,直接存储硬盘效率又低

这些场景都有一个共同点:
数据是由上游模块产生,上游模块,使用上游模块的数据计算、统计、分析,这个时候就可以使用消息系统,尤其是分布式消息系统!
2、Kafka的定义
What is Kafka:它是一个分布式消息系统,由linkedin使用scala编写,用作LinkedIn的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。具有高水平扩展和高吞吐量。
3、Kafka和其他主流分布式消息系统的对比
定义解释:
1、Java 和 scala都是运行在JVM上的语言。
2、erlang和最近比较火的和go语言一样是从代码级别就支持高并发的一种语言,所以RabbitMQ天生就有很高的并发性能,但是 有RabbitMQ严格按照AMQP进行实现,受到了很多限制。kafka的设计目标是高吞吐量,所以kafka自己设计了一套高性能但是不通用的协议,他也是仿照AMQP( Advanced Message Queuing Protocol   高级消息队列协议)设计的。
3、事物的概念:在数据库中,多个操作一起提交,要么操作全部成功,要么全部失败。举个例子, 在转账的时候付款和收款,就是一个事物的例子,你给一个人转账,你转成功,并且对方正常行收到款项后,这个操作才算成功,有一方失败,那么这个操作就是失败的。
对应消在息队列中,就是多条消息一起发送,要么全部成功,要么全部失败。3个中只有ActiveMQ支持,这个是因为,RabbitMQ和Kafka为了更高的性能,而放弃了对事物的支持 。
4、集群:多台服务器组成的整体叫做集群,这个整体对生产者和消费者来说,是透明的。其实对消费系统组成的集群添加一台服务器减少一台服务器对生产者和消费者都是无感之的。
5、负载均衡,对消息系统来说负载均衡是大量的生产者和消费者向消息系统发出请求消息,系统必须均衡这些请求使得每一台服务器的请求达到平衡,而不是大量的请求,落到某一台或几台,使得这几台服务器高负荷或超负荷工作,严重情况下会停止服务或宕机。
6、动态扩容是很多公司要求的技术之一,不支持动态扩容就意味着停止服务,这对很多公司来说是不可以接受的。
注:
阿里巴巴的Metal,RocketMQ都有Kafka的影子,他们要么改造了Kafka或者借鉴了Kafka,最后Kafka的动态扩容是通过Zookeeper来实现的。
Zookeeper是一种在分布式系统中被广泛用来作为:分布式状态管理、分布式协调管理、分布式配置管理、和分布式锁服务的集群。kafka增加和减少服务器都会在Zookeeper节点上触发相应的事件kafka系统会捕获这些事件,进行新一轮的负载均衡,客户端也会捕获这些事件来进行新一轮的处理。
Kafka相关概念

1、 AMQP协议
Advanced Message Queuing Protocol (高级消息队列协议)
The Advanced Message Queuing Protocol (AMQP):是一个标准开放的应用层的消息中间件(Message Oriented Middleware)协议。AMQP定义了通过网络发送的字节流的数据格式。因此兼容性非常好,任何实现AMQP协议的程序都可以和与AMQP协议兼容的其他程序交互,可以很容易做到跨语言,跨平台。
上面说的3种比较流行的消息队列协议,要么支持AMQP协议,要么借鉴了AMQP协议的思想进行了开发、实现、设计。
2、 一些基本的概念
1、消费者:(Consumer):从消息队列中请求消息的客户端应用程序
2、生产者:(Producer)  :向broker发布消息的应用程序
3、AMQP服务端(broker):用来接收生产者发送的消息并将这些消息路由给服务器中的队列,便于fafka将生产者发送的消息,动态的添加到磁盘并给每一条消息一个偏移量,所以对于kafka一个broker就是一个应用程序的实例
kafka支持的客户端语言:Kafka客户端支持当前大部分主流语言,包括:C、C++、Erlang、Java、.net、perl、PHP、Python、Ruby、Go、Javascript
可以使用以上任何一种语言和kafka服务器进行通信(即辨析自己的consumer从kafka集群订阅消息也可以自己写producer程序)

——————还有一个需要注意
ZooKeeper server will not remove old snapshots and log files when using the default configuration (see autopurge below), this is the responsibility of the operator
zookeeper不会主动的清除旧的快照和日志文件,这个是操作者的责任。

但是可以通过命令去定期的清理

————脚本示例
#!/bin/bash
#snapshot file dir
dataDir=/opt/zookeeper/zkdata/version-2
#tran log dir
dataLogDir=/opt/zookeeper/zkdatalog/version-2

#Leave 66 files
count=66
count=$[$count+1]
ls -t $dataLogDir/log.* | tail -n +$count | xargs rm -f
ls -t $dataDir/snapshot.* | tail -n +$count | xargs rm -f

#以上这个脚本定义了删除对应两个目录中的文件,保留最新的66个文件,可以将他写到crontab中,设置为每天凌晨2点执行一次就可以了。

#zk log dir   del the zookeeper log
#logDir=
#ls -t $logDir/zookeeper.log.* | tail -n +$count | xargs rm -f

参考
https://www.cnblogs.com/zhaojiankai/p/7181910.html?utm_source=itdadao&utm_medium=referral
http://www.cnblogs.com/smartloli/p/4538173.html
https://blog.csdn.net/my_bai/article/details/68490632

文章出自:CCIE那点事 http://www.jdccie.com/ 版权所有。本站文章除注明出处外,皆为作者原创文章,可自由引用,但请注明来源。 禁止全文转载。
本文链接:http://www.jdccie.com/?p=3798转载请注明转自CCIE那点事
如果喜欢:点此订阅本站