### 前言
在运维环境中,管理员通常面对大量的服务器,对于这些服务器的维护,一个很重要的工作就是查看每台服务器的日志信息,如果使用elk,可以将所有的日志集中到一个地方,并且通过图形化、可视化分析日志,实时监控业务状态
### elk日志分析平台架构(三种)
1. datasource -> logstash -> elasticsearch -> kibana
- 优点:搭建简单,容易上手
- 缺点:logstash消耗资源大,运行占用cpu和内存高,另外没有消息队列缓存,存在数据丢失隐患
2. datasource -> filebeat -> logstash -> elasticsearch -> kibana
- 优点:filebeat轻量级开源日志文件数据搜集器,相比logstash,filebeat占用的资源可以忽略不计,解决了logstash占用系统资源较高的问题
- 缺点:依然没有消息队列缓存,存在数据丢失隐患
3. datasource -> filebeat/logstash -> mq(redis/kafka) -> logstash ->elasticsearch -> kibana
- 优点:引入了消息队列机制,均衡了网络传输,从而降低了网络闭塞尤其是丢失数据的可能性。
### ELK架构图
![elk架构图.png](https://blog.zs-fighting.cn//upload/2021/07/elk%E6%9E%B6%E6%9E%84%E5%9B%BE-8b4bb37393284779b750aa9d6dcf405e.png)
### elasticsearch、logstash、kibana、kafka版本
- elasticsearch-7.8.1
- logstash-7.8.1
- kibana-7.8.1
- filebeat-7.8.1
- kafka-2.11-1.1.1
- zookeeper-3.4.14
### elasticsearch安装,需要提前安装java
```
1、基础环境搭建
Centos:
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.8.1-x86_64.rpm
rpm -ivh elasticsearch-7.8.1-x86_64.rpm
systemctl daemon-reload
systemctl enable elasticsearch.service
mkdir -p /data/elk_data
chown elasticsearch:elasticsearch /data/elk_data
2、修改elasticsearch配置文件
# 集群名称
cluster.name: "es_cluster"
# 节点名称 master1
node.name: master1
# 是否可以成为master节点
node.master: true
# 是否允许该节点存储数据,默认开启
node.data: true
# 网络绑定,这里我绑定 0.0.0.0,支持外网访问
network.host: ["0.0.0.0"]
# 设置对外服务的http端口,默认为9200
http.port: 9200
# 支持跨域访问
http.cors.enabled: true
http.cors.allow-origin: "*"
# 设置节点间交互的tcp端口,默认是9300
transport.tcp.port: 9300
# 集群发现的节点ip
discovery.seed_hosts: ["dc_es1","dc_es2","dc_es3"]
# 手动指定可以成为 mater 的所有节点的 name 或者 ip,这些配置将会在第一次选举中进行计算
cluster.initial_master_nodes: ["dc_es1","dc_es2","dc_es3"]
# 数据仓储位置
path.data: /data/elk_data
path.logs: /var/log/elasticsearch
# 用户权限配置,不止的用户的可以忽略,
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: elastic-certificates.p12
xpack.security.transport.ssl.truststore.path: elastic-certificates.p12
# es 应用是不锁住jvm内存
bootstrap.memory_lock: false
bootstrap.system_call_filter: false
indices.query.bool.max_clause_count: 8192
search.max_buckets: 100000
如果你的集群是三个节点,需要高可用配置。那么其他两个节点的配置只需要修改下node.name,另外cluster.initial_master_nodes 允许为主节点的ip需要设置为单数
3、证书配置与xpack生成系统用户
生成ca证书
/usr/share/elasticsearch/bin/elasticsearch-certutil ca -v
签发node证书
/usr/share/elasticsearch/bin/elasticsearch-certutil cert -ca /usr/share/elasticsearch/elastic-stack-ca.p12
elastic-certificates.p12 的keystore文件,包含node证书、私钥、CA证书。
cp -a /usr/share/elasticsearch/elastic-* /etc/elasticsearch/
chown elasticsearch:elasticsearch /etc/elasticsearch/elastic-certificates.p12
chown elasticsearch:elasticsearch /etc/elasticsearch/elastic-stack-ca.p12
将ca证书跟node证书拷贝到其他节点,修改节点的配置文件
systemctl start elasticsearch.service
运行 Elasticsearch 的密码配置工具,为各种内置用户生成随机的密码。
/usr/share/elasticsearch/bin/elasticsearch-setup-passwords auto
4、检查集群状态
curl http://127.0.0.1:9200
查看本节点的node1信息
curl http://192.168.0.1:9200/_cluster/health?pretty
查看集群的健康情况(green代表集群正常,yellow代表主分片正常、副分片异常,red代表主分片丢失)
curl http://192.168.0.1:9200/_cluster/state?pretty
查看集群的状态信息
```
**安装elasticsearch-head插件**
```
```
### filebeat安装
```
1、curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.8.1-x86_64.rpm
rpm -vi filebeat-7.8.1-x86_64.rpm
2、修改/etc/hosts
192.168.32.215 kafka-01
192.168.32.216 kafka-02
192.168.32.217 kafka-03
3、配置:
vim /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /tmp/antifraud_access.log
fields:
log_topic: antifraud_access
tags: ["antifraud_access"]
- type: log
enabled: true
paths:
- /tmp/antifraud_applog.log
multiline.pattern: '^[A-Z]'
multiline.negate: true
multiline.match: after
fields:
log_topic: antifraud_applog
tags: ["antifraud_applog"]
output.kafka:
hosts: ["kafka-01:9092","kafka-02:9092","kafka-03:9092"]
topic: '%{[fields.log_topic]}' #会根据input自定义的log_topic值写入kafka的topic
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
systemctl enable filebeat
systemctl start filebeat
```
### kafka安装
**一、安装zookeeper**
```
1. 下载tar包并解压
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz -P /usr/local/src/
tar zxf /usr/local/src/zookeeper-3.4.14.tar.gz -C /usr/local/
mv /usr/local/zookeeper-3.4.14 /usr/local/zookeeper
2. 创建数据目录跟日志目录
mkdir -p /data/zk/data
mkdir -p /data/zk/log
3. 给myid赋值,每个节点不能重复,大小在1~255
echo "1">/data/zk/data/myid
4. 修改配置
vim /usr/local/zookeeper/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zk/data
dataLogDir=/data/zk/log
clientPort=2181
# zk集群配置,2888代表follower连接到leader机器的端口,3888用来进行leader选举时所用的端口
server.1=192.168.32.215:2888:3888
server.2=192.168.32.216:2888:3888
server.3=192.168.32.217:2888:3888
6. zk集群操作命令
启动:/usr/local/zookeeper/bin/zkServer.sh start
查看状态:/usr/local/zookeeper/bin/zkServer.sh status
```
**二、安装kafka**
```
1. 下载tar包并解压
wget https://archive.apache.org/dist/kafka/1.1.1/kafka_2.11-1.1.1.tgz -P /usr/local/src/
tar zxf /usr/local/src/kafka_2.11-1.1.1.tgz -C /usr/local/
mv /usr/local/kafka_2.11-1.1.1 /usr/local/kafka
2. 创建数据目录
mkdir -p /data/kafka/data/
3. 修改配置文件
vim /usr/local/kafka/config/server.properties
#broker 的全局唯一编号,不能重复
broker.id=0
#删除 topic 功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接 Zookeeper 集群地址
zookeeper.connect=192.168.32.215:2181,192.168.32.216:2181,192.168.32.217:2181
4. 分别在 192.168.32.216 和 192.168.32.217 上修改配置文件/opt/module/kafka/config/server.properties
中的 broker.id=1、broker.id=2,注意:broker.id不能重复
5. 修改hosts
vim /etc/hosts
192.168.32.215 kafka-01
192.168.32.216 kafka-02
192.168.32.217 kafka-03
6. 启动集群
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
```
[kafka相关介绍](https://blog.zs-fighting.cn/2020/08/14/kafka%E4%BB%8E%E5%85%A5%E9%97%A8%E5%88%B0%E6%94%BE%E5%BC%83-%E4%B8%80/)
### logstash安装
```
1、wget https://artifacts.elastic.co/downloads/logstash/logstash-7.8.1.rpm
2、rpm -ivh logstash-7.8.1.rpm
3、将JAVA_HOME加入logstash中
vi /etc/sysconfig/logstash
JAVA_HOME=/opt/jdk1.8.0_102
4、修改/etc/hosts
192.168.32.215 kafka-01
192.168.32.216 kafka-02
192.168.32.217 kafka-03
5、修改logstash的jvm配置
vim /etc/logstash/jvm.options
-Xms4g
-Xmx4g
6、配置logstash
vim /etc/logstash/conf.d/project-01.conf
input {
kafka {
bootstrap_servers => "kafka-01:9092,kafka-02:9092,kafka-03:9092"
group_id => "logstash" #指定kafka消费者组id
topics => ["antifraud_access"]
consumer_threads => 2
client_id => "logstash"
decorate_events => false
auto_offset_reset => "earliest"
request_timeout_ms => "300000"
session_timeout_ms => "20000"
max_poll_interval_ms => "600000"
}
}
filter {
json {
source => "message"
}
}
output {
if "antifraud_access" in [tags] {
elasticsearch {
user => "elastic"
password => "iVqYwbB4pg2bOmrtrwW2"
hosts => ["192.168.32.210:9200","192.168.32.211:9200","192.168.32.212:9200"]
index => "antifraud_access_%{+YYYY.MM}"
}
}
}
7、systemctl daemon-reload
systemctl enable logstash.service
systemctl start logstash.service
```
### kibana安装
```
wget https://artifacts.elastic.co/downloads/kibana/kibana-7.8.1-x86_64.rpm
rpm -ivh kibana-7.8.1-x86_64.rpm
vim /etc/kibana/kibana.yml
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.hosts: ["http://192.168.32.210:9200","http://192.168.32.211:9200","http://192.168.32.212:9200"]
kibana.index: ".kibana"
i18n.locale: "zh-CN"
xpack.security.enabled: true
elasticsearch.username: "kibana_system"
elasticsearch.password: "KxxREpLB5K4hFG0mkIpp"
systemctl enable kibana.service
systemctl start kibana
访问http://192.168.0.1:5601即可访问kibana
```
### logstash插件
logstash流程可分解为:事件 -> input -> codec -> filter -> codec -> output
```
1、logstash_input_file 从文件中读取
input {
file {
path => "/var/www/zzc/log/localhost_access_log.*.txt"
type => "tomcat_access"
start_position => "beginning" //beginning或end,默认end
}
}
2、logstash_output_elasticsearch 输出到es
output {
if [type] == "tomcat_access" {
elasticsearch {
hosts => ["192.168.0.1:9200","192.168.0.2:9200"]
index => "tomcat_access-%{+YYYY.MM.dd}"
}
}
}
3、logstash_input_multiline(多行) 将日志文件多行合并
input {
file {
path => "/data/logs/app*.log"
type => "tomcat_applog"
start_position => "beginning"
codec => multiline {
pattern => "^\[" //以[开头为分界
negate => true
what => 'previous' //上面的所有内容为一个事件
}
}
}
4、logstash_codec_json
如果日志格式为json的,codec_json插件会自动分解json里的字段,并输入
input {
file {
path => "/var/www/zzc/log/localhost_access_log.*.txt"
codec => "json"
type => "tomcat_access"
}
}
5、logstash_input_syslog
input {
syslog {
type => "system-syslog"
port => 514
}
}
监听514端口,接受其他机器传过来的syslog,需要在其他机器上修改一下rsyslog
vim /etc/rsyslog.conf
*.* @@192.168.0.1:514
* //系统日志类型
* //日志的level
@@192.168.0.1:514 //传输到192.168.0.1:514端口
6、logstash_input_beats
input {
beats {
port => 5044
}
}
7、logstash_filter_json filebeat_tags 根据指定条件过滤日志
filter {
if "raptor_access" in [tags] {
json {
source => "message"
}
}
}
8、logstash_input_kafka 从kafka中消费数据
input {
kafka {
bootstrap_servers => "kafka-01:9092,kafka-02:9092,kafka-03:9092"
group_id => "logstash"
topics => "antifraud_access"
consumer_threads => 2
client_id => "logstash"
decorate_events => false
auto_offset_reset => "earliest"
request_timeout_ms => "300000"
session_timeout_ms => "20000"
max_poll_interval_ms => "600000"
}
}
9、filebeat_output_kafka 生产数据到kafka集群
output.kafka:
hosts: ["kafka-01:9092","kafka-02:9092","kafka-03:9092"]
topic: '%{[fields.log_topic]}' #根据input中定义的log_topic值
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
```
ELK介绍