Elk日志分析平台

Elasticsearch + Logstash + Kibana,这三个工具组合形成了一套实用、易用的监控架构。

Posted by zhangshun on July 22, 2019

前言

在运维环境中,管理员通常面对大量的服务器,对于这些服务器的维护,一个很重要的工作就是查看每台服务器的日志信息,如果使用elk,可以将所有的日志集中到一个地方,并且通过图形化、可视化分析日志,实时监控业务状态

elk日志分析平台架构(三种)

  1. datasource -> logstash -> elasticsearch -> kibana
    • 优点:搭建简单,容易上手
    • 缺点:logstash消耗资源大,运行占用cpu和内存高,另外没有消息队列缓存,存在数据丢失隐患
  2. datasource -> filebeat -> logstash -> elasticsearch -> kibana
    • 优点:filebeat轻量级开源日志文件数据搜集器,相比logstash,filebeat占用的资源可以忽略不计,解决了logstash占用系统资源较高的问题
    • 缺点:依然没有消息队列缓存,存在数据丢失隐患
  3. datasource -> filebeat/logstash -> mq(redis/kafka) -> logstash ->elasticsearch -> kibana
    • 优点:引入了消息队列机制,均衡了网络传输,从而降低了网络闭塞尤其是丢失数据的可能性。

ELK架构图

elasticsearch、logstash、kibana、kafka版本

  • elasticsearch-7.8.1
  • logstash-7.8.1
  • kibana-7.8.1
  • filebeat-7.8.1
  • kafka-2.11-1.1.1
  • zookeeper-3.4.14

elasticsearch安装,需要提前安装java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
1、基础环境搭建
Centos:
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.8.1-x86_64.rpm
rpm -ivh elasticsearch-7.8.1-x86_64.rpm
systemctl daemon-reload
systemctl enable elasticsearch.service

mkdir -p /data/elk_data
chown elasticsearch:elasticsearch /data/elk_data

2、修改elasticsearch配置文件
# 集群名称
cluster.name: "es_cluster"
# 节点名称 master1
node.name: master1
# 是否可以成为master节点
node.master: true
# 是否允许该节点存储数据,默认开启
node.data: true
# 网络绑定,这里我绑定 0.0.0.0,支持外网访问
network.host: ["0.0.0.0"]
# 设置对外服务的http端口,默认为9200
http.port: 9200
# 支持跨域访问
http.cors.enabled: true
http.cors.allow-origin: "*"
# 设置节点间交互的tcp端口,默认是9300
transport.tcp.port: 9300
# 集群发现的节点ip
discovery.seed_hosts: ["dc_es1","dc_es2","dc_es3"]
# 手动指定可以成为 mater 的所有节点的 name 或者 ip,这些配置将会在第一次选举中进行计算
cluster.initial_master_nodes: ["dc_es1","dc_es2","dc_es3"]
# 数据仓储位置
path.data: /data/elk_data
path.logs: /var/log/elasticsearch
# 用户权限配置,不止的用户的可以忽略,
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: elastic-certificates.p12
xpack.security.transport.ssl.truststore.path: elastic-certificates.p12
# es 应用是不锁住jvm内存
bootstrap.memory_lock: false
bootstrap.system_call_filter: false
indices.query.bool.max_clause_count: 8192
search.max_buckets: 100000
如果你的集群是三个节点,需要高可用配置。那么其他两个节点的配置只需要修改下node.name,另外cluster.initial_master_nodes 允许为主节点的ip需要设置为单数

3、证书配置与xpack生成系统用户
生成ca证书
/usr/share/elasticsearch/bin/elasticsearch-certutil ca -v
签发node证书
/usr/share/elasticsearch/bin/elasticsearch-certutil cert -ca /usr/share/elasticsearch/elastic-stack-ca.p12
elastic-certificates.p12 的keystore文件,包含node证书、私钥、CA证书。

cp -a /usr/share/elasticsearch/elastic-* /etc/elasticsearch/
chown elasticsearch:elasticsearch /etc/elasticsearch/elastic-certificates.p12
chown elasticsearch:elasticsearch /etc/elasticsearch/elastic-stack-ca.p12
将ca证书跟node证书拷贝到其他节点,修改节点的配置文件

systemctl start elasticsearch.service

运行 Elasticsearch 的密码配置工具,为各种内置用户生成随机的密码。
/usr/share/elasticsearch/bin/elasticsearch-setup-passwords auto

4、检查集群状态
curl http://127.0.0.1:9200
查看本节点的node1信息

curl http://192.168.0.1:9200/_cluster/health?pretty
查看集群的健康情况(green代表集群正常,yellow代表主分片正常、副分片异常,red代表主分片丢失)

curl http://192.168.0.1:9200/_cluster/state?pretty
查看集群的状态信息

安装elasticsearch-head插件

1

filebeat安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
1、curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.8.1-x86_64.rpm
rpm -vi filebeat-7.8.1-x86_64.rpm
2、修改/etc/hosts
192.168.32.215  kafka-01
192.168.32.216  kafka-02
192.168.32.217  kafka-03
3、配置:
vim /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /tmp/antifraud_access.log
  fields:
    log_topic: antifraud_access
  tags: ["antifraud_access"]
- type: log
  enabled: true
  paths:
    - /tmp/antifraud_applog.log
  multiline.pattern: '^[A-Z]'
  multiline.negate: true
  multiline.match: after
  fields:
    log_topic: antifraud_applog
  tags: ["antifraud_applog"]
output.kafka:
  hosts: ["kafka-01:9092","kafka-02:9092","kafka-03:9092"]
  topic: '%{[fields.log_topic]}'	#会根据input自定义的log_topic值写入kafka的topic
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

systemctl enable filebeat
systemctl start filebeat

kafka安装

一、安装zookeeper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1. 下载tar包并解压
wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz -P /usr/local/src/
tar zxf /usr/local/src/zookeeper-3.4.14.tar.gz -C /usr/local/
mv /usr/local/zookeeper-3.4.14 /usr/local/zookeeper
2. 创建数据目录跟日志目录
mkdir -p /data/zk/data
mkdir -p /data/zk/log
3. 给myid赋值,每个节点不能重复,大小在1~255
echo "1">/data/zk/data/myid
4. 修改配置
vim /usr/local/zookeeper/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/opt/zk/data
dataLogDir=/data/opt/zk/log
clientPort=2181
# zk集群配置,2888代表follower连接到leader机器的端口,3888用来进行leader选举时所用的端口
server.1=192.168.32.215:2888:3888
server.2=192.168.32.216:2888:3888
server.3=192.168.32.217:2888:3888
6. zk集群操作命令
启动:/usr/local/zookeeper/bin/zkServer.sh start
查看状态:/usr/local/zookeeper/bin/zkServer.sh status

二、安装kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
1. 下载tar包并解压
wget https://archive.apache.org/dist/kafka/1.1.1/kafka_2.11-1.1.1.tgz -P /usr/local/src/
tar zxf /usr/local/src/kafka_2.11-1.1.1.tgz -C /usr/local/
mv /usr/local/kafka_2.11-1.1.1 /usr/local/kafka
2. 创建数据目录
mkdir -p /data/kafka/data/
3. 修改配置文件
vim /usr/local/kafka/config/server.properties
#broker 的全局唯一编号,不能重复
broker.id=0
#删除 topic 功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接 Zookeeper 集群地址
zookeeper.connect=192.168.32.215:2181,192.168.32.216:2181,192.168.32.217:2181
4. 分别在 192.168.32.216 和 192.168.32.217 上修改配置文件/opt/module/kafka/config/server.properties
中的 broker.id=1、broker.id=2,注意:broker.id不能重复
5. 修改hosts
vim /etc/hosts
192.168.32.215  kafka-01
192.168.32.216  kafka-02
192.168.32.217  kafka-03
6. 启动集群
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

kafka相关介绍

logstash安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
1、wget  https://artifacts.elastic.co/downloads/logstash/logstash-7.8.1.rpm
2、rpm -ivh logstash-7.8.1.rpm
3、将JAVA_HOME加入logstash中
vi /etc/sysconfig/logstash
JAVA_HOME=/opt/jdk1.8.0_102
4、修改/etc/hosts
192.168.32.215  kafka-01
192.168.32.216  kafka-02
192.168.32.217  kafka-03
5、修改logstash的jvm配置
vim /etc/logstash/jvm.options
-Xms4g
-Xmx4g
6、配置logstash
vim /etc/logstash/conf.d/project-01.conf
input {
	kafka {
		bootstrap_servers => "kafka-01:9092,kafka-02:9092,kafka-03:9092"
		group_id => "logstash"	#指定kafka消费者组id
		topics => ["antifraud_access"]
		consumer_threads => 2
		client_id => "logstash"
		decorate_events => false
		auto_offset_reset => "earliest"
		request_timeout_ms => "300000"
		session_timeout_ms => "20000"
		max_poll_interval_ms => "600000"
	}
}

filter {
	json {
		source => "message"
	}
}

output {
        if "antifraud_access" in [tags] {
                elasticsearch {
                        user => "elastic"
                        password => "iVqYwbB4pg2bOmrtrwW2"
                        hosts => ["192.168.32.210:9200","192.168.32.211:9200","192.168.32.212:9200"]
                        index => "antifraud_access_%{+YYYY.MM}"
                }
        }
}
7、systemctl daemon-reload
systemctl enable logstash.service
systemctl start logstash.service

kibana安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
wget https://artifacts.elastic.co/downloads/kibana/kibana-7.8.1-x86_64.rpm
rpm -ivh  kibana-7.8.1-x86_64.rpm

vim /etc/kibana/kibana.yml
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.hosts: ["http://192.168.32.210:9200","http://192.168.32.211:9200","http://192.168.32.212:9200"]
kibana.index: ".kibana"
i18n.locale: "zh-CN"
xpack.security.enabled: true
elasticsearch.username: "kibana_system"
elasticsearch.password: "KxxREpLB5K4hFG0mkIpp"

systemctl enable kibana.service
systemctl start kibana

访问http://192.168.0.1:5601即可访问kibana

logstash插件

logstash流程可分解为:事件 -> input -> codec -> filter -> codec -> output

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
1、logstash_input_file 从文件中读取

input {
	file {
		path => "/var/www/zzc/log/localhost_access_log.*.txt"
		type => "tomcat_access"
		start_position => "beginning"	//beginning或end,默认end
	}
}



2、logstash_output_elasticsearch	输出到es

output {
	if [type] == "tomcat_access" {
		elasticsearch {
			hosts => ["192.168.0.1:9200","192.168.0.2:9200"]
			index => "tomcat_access-%{+YYYY.MM.dd}"
		}
	}
} 



3、logstash_input_multiline(多行)	将日志文件多行合并

input {
	file {
		path => "/data/logs/app*.log"
		type => "tomcat_applog"
		start_position => "beginning"
		codec => multiline {
			pattern => "^\["		//以[开头为分界
			negate => true
			what => 'previous'		//上面的所有内容为一个事件
		}
	}
}



4、logstash_codec_json

如果日志格式为json的,codec_json插件会自动分解json里的字段,并输入

input {
	file {
		path => "/var/www/zzc/log/localhost_access_log.*.txt"
		codec => "json"
		type => "tomcat_access"
	}
}



5、logstash_input_syslog

input {
	syslog {
		type => "system-syslog"
		port => 514
	}
}

监听514端口,接受其他机器传过来的syslog,需要在其他机器上修改一下rsyslog

vim /etc/rsyslog.conf
*.*		@@192.168.0.1:514

*               	//系统日志类型
*	                //日志的level
@@192.168.0.1:514	//传输到192.168.0.1:514端口



6、logstash_input_beats

input {
    beats {
	port => 5044
    }
}



7、logstash_filter_json  filebeat_tags	根据指定条件过滤日志

filter {   
	if "raptor_access" in [tags] {
		json {
			source => "message"
		}
	} 
}

8、logstash_input_kafka	从kafka中消费数据
input {
        kafka {
                bootstrap_servers => "kafka-01:9092,kafka-02:9092,kafka-03:9092"
                group_id => "logstash"
                topics => "antifraud_access"
                consumer_threads => 2
                client_id => "logstash"
                decorate_events => false
                auto_offset_reset => "earliest"
                request_timeout_ms => "300000"
                session_timeout_ms => "20000"
                max_poll_interval_ms => "600000"
        }
}

9、filebeat_output_kafka		生产数据到kafka集群
output.kafka:
  hosts: ["kafka-01:9092","kafka-02:9092","kafka-03:9092"]
  topic: '%{[fields.log_topic]}'	#根据input中定义的log_topic值
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000