本文记录日志收集与分析小项目的架构和流程
介绍
一条埋点数据经 nginx
网关分发到对应的 webServer
,webServer
处理埋点数据后以文本的形式存储在本地磁盘;离线处理时,Flume
会收集本机的处理后的埋点日志并存储到 HDFS
,此时 Hive/Spark
进行数据的处理和分析;实时计算,对接 Flume
的是 Kafka
组件,然后实时处理框架 Spark/Flink/storm
直接从 Kafka
中读取数据处理。至此,一个典型的大数据处理框架就出来了。
本案例处理流程如下
- 数据生成:日志产生器请求服务器接口
- 数据采集:
SpringBoot
服务存储日志到本地,Flume
采集新增的日志信息到HDFS
- 数据存储:
HDFS
- 数据处理(ETL)
- 数据清洗:删除不符合规范的脏数据
- 解析字段:
time
解析成年、月、日;ip
解析成具体省市 - 把后续统计分析要用到的所有字段都补齐
- 数据分析
- 基于
ETL
结果生成的大宽表来分析
- 基于
- 数据展示
日志生成
nginx
nginx
安装步骤网上参考资料很多,这里只看下配置文件,默认是在 /usr/local/nginx/conf/nginx.conf
...
upstream danner {
server danner000:7777;
}
# other server listen
server {
listen 80;
server_name localhost;
location / {
#root html;
#index index.html index.htm;
# 此处不要有 "_",被坑了很久,不知道为什么
proxy_pass http://danner;
}
}
...
每个 server
都是单独配置,location
是本机接收的请求,proxy_pass
是转发到何处。本例中是将 http://localhost:80/
转发到 http://danner000:7777/
(这里是同个机子,生产上肯定是不同机子)。
nginx
在本例中就一个配置,它规定两个东西:客户端请求的端口是 80
,web Server
必须有 7777
端口来响应。
日志生成器
普通的 java
代码随机生成一些日志,通过 nginx
机子的 80
端口数据直达到 web server
的 7777
端口。
Spring Boot
web Server
是用 Spring Boot
服务来实现,它就实现一个 sendAction
接口,并将接口收到的内容通过 log4j
存到本地文件。很简单都是常规操作,就贴下配置
application.yml
# 定义端口
server:
port: 7777
servlet:
context-path: /log
log4j.properties
### set log levels ###
log4j.rootLogger = INFO,D
### log file ###
log4j.appender.D = org.apache.log4j.DailyRollingFileAppender
log4j.appender.D.file =../log/access.log
log4j.appender.D.Append = true
log4j.appender.D.Threshold = INFO
log4j.appender.D.DatePattern='.'yyyy-MM-dd
log4j.appender.D.layout = org.apache.log4j.PatternLayout
log4j.appender.D.layout.ConversionPattern = %m%n
接口
@RequestMapping(value = "sendAction", method = RequestMethod.POST)
@ResponseBody
public String sendAction(@RequestBody AccessLog accessLog){
// 接收 accesslog 对象
logger.info(JSON.toJSONString(accessLog));
return accessLog.toString();
}
配置成每日创建 access.log
文件,若不是当日的文件会被修改为 access.log.2019-09-16
后缀名是日期。
日志采集
在日志生成部分,通过 web server
已经将数据保存到日志文件中,接下来就是通过 Flume
将日志文件采集到 HDFS
,目录 /project/hadoop/access/log/%Y%m%d/
taildir-hdfs-ng.sources = r1
taildir-hdfs-ng.sinks = k1
taildir-hdfs-ng.channels = c1
taildir-hdfs-ng.sources.r1.type = TAILDIR
taildir-hdfs-ng.sources.r1.positionFile = /home/hadoop/tmp/data/flume/p.json
taildir-hdfs-ng.sources.r1.filegroups = f1
taildir-hdfs-ng.sources.r1.filegroups.f1 = /home/hadoop/project/hadoop/log/access.log
taildir-hdfs-ng.channels.c1.type = memory
taildir-hdfs-ng.sinks.k1.type = hdfs
taildir-hdfs-ng.sinks.k1.hdfs.fileType = DataStream
taildir-hdfs-ng.sinks.k1.hdfs.writeFormat = Text
taildir-hdfs-ng.sinks.k1.hdfs.useLocalTimeStamp = true
taildir-hdfs-ng.sinks.k1.hdfs.path = /project/hadoop/access/log/%Y%m%d/
# 1000 行为一个文件
taildir-hdfs-ng.sinks.k1.hdfs.rollCount = 1000
taildir-hdfs-ng.sinks.k1.hdfs.rollSize = 0
taildir-hdfs-ng.sinks.k1.hdfs.rollInterval = 0
# 文件压缩 bzip2
taildir-hdfs-ng.sinks.k1.hdfs.codeC = bzip2
taildir-hdfs-ng.sinks.k1.hdfs.fileType = CompressedStream
taildir-hdfs-ng.sources.r1.channels = c1
taildir-hdfs-ng.sinks.k1.channel = c1
执行命令
flume-ng agent -c $FLUME_HOME/conf -f taildir-hdfs.conf -n taildir-hdfs-ng -Dflume.root.logger=INFO,console
至此,日志文件按天归档为一个目录存储在 HDFS
ETL
做两件事情:删除没用的脏数据、扩展字段;简单的 MR
程序(只涉及 Mapper
无 Reducer
),省略。
Hive
将 MR
的输出数据加载到 Hive
表供后续分析。首先创建外部表
create external table danner_db.access_wide(
> user string,
> platform string,
> version string,
> ip string,
> traffic bigint,
> time string,
> duration bigint,
> appid string,
> province string,
> city string,
> isp string,
> y string,
> m string,
> d string
> ) partitioned by(day string)
> row format delimited fields terminated by '\t'
> location "hdfs://192.168.22.147:9000/project/hadoop/access/wide";
将 MR
输出数据载入表
hive -e “ALTER TABLE danner_db.access_wide ADD IF NOT EXISTS PARTITION(day=20190916);”
执行成功则 access_wide
表有数据可查询,当然前提是 hdfs://192.168.22.147:9000/project/hadoop/access/wide/day=20190916
目录下有对应内容的文件
hive> select * from access_wide limit 10;
OK
john iOS 4.0 222.33.119.74 1871 2019-09-16 137 qyId 辽宁 抚顺 铁通 2019 9 16 20190916
zhang iOS 4.0 123.234.2.114 746 2019-09-16 370 wwwId 山东 青岛 联通 2019 9 16 20190916
li Android 2.0 210.44.41.131 2260 2019-09-16 354 comId 山东 济南 教育网 2019 9 16 20190916
wang Android 3.0 139.215.11.4 3493 2019-09-16 399 httpId 吉林 联通 2019 9 16 20190916
john iOS 1.0 222.94.195.61 6455 2019-09-16 117 httpId 江苏 南京 电信 2019 9 16 20190916
zhao Android 1.0 61.235.190.1 2718 2019-09-16 351 qyId 广西 南宁 铁通 2019 9 16 20190916
zhao iOS 1.0 36.56.35.166 6307 2019-09-16 194 orgId 安徽 亳州 电信 2019 9 16 20190916
zhao iOS 5.0 210.41.241.220 8187 2019-09-16 105 qyId 四川 成都 教育网 2019 9 16 20190916
zhao Android 1.0 182.91.241.65 2702 2019-09-16 163 webId 广西 桂林 联通 2019 9 16 20190916
zhao Android 5.0 210.39.250.184 6249 2019-09-16 129 qyId 广东 广州 教育网 2019 9 16 20190916
Time taken: 0.148 seconds, Fetched: 10 row(s)
脚本
ETL
做的操作还是比较零散且每次都要手工去操作肯定是不行的。在实际工作中,我们倾向于用脚本将步骤串联起来,来看看具体有哪些步骤要操作
MR
清洗数据MR
结果文件移动到hive
表目录下- 更新
hive
表
vi etl.sh
#! /bin/bash
if [ $# -eq 1 ];then
time=$1
else
time=`date -d "yesterday" +%Y%m%d`
fi
echo ${time}
# mr
hadoop jar ../lib/G7-41-dwt.jar com.danner.bigdata.hadoop.homework.ETLApp /project/hadoop/access/log/${time} /project/hadoop/access/out/${time} /tmp/data/ip.txt
echo "step 1: mr job finish"
# 移动 out 下文件到 hive 路径下
hadoop fs -rmr /project/hadoop/access/wide/day=${time}
echo "stpe 2: 删除之前 hive 路径下之前存在的分区数据"
hadoop fs -mkdir -p /project/hadoop/access/wide/day=${time}
hadoop fs -mv /project/hadoop/access/out/${time}/part* /project/hadoop/access/wide/day=${time}/
echo "stpe 3: 移动 ETL 后的数据到 hive 路径"
hadoop fs -rmr /project/hadoop/access/out/${time}
echo "stpe 3: 删除 ETL 后的数据"
# 刷新 hive
hive -e "ALTER TABLE danner_db.access_wide ADD IF NOT EXISTS PARTITION(day=${time});"
echo "step 4: 刷新分区元数据: ${time}"
只要执行此 shell
直接可以将 flume
保存 HDFS
数据 ETL
后刷新到 hive
数据分析
在 ETL
后我们就要进行数据分析,本例中只要简单的统计每类终端平台的数量:
select platform,count(1) from access_wide where day=20190916 group by platform;
统计出来只是打印没卵用,还是要建表存储
create external table platform_stat(
platform string,
cnt int,
d string
) partitioned by(day string)
location 'hdfs://192.168.22.147:9000/project/hadoop/access/platform';
分析结果刷到新建的 platform_stat
表
insert overwrite table platform_stat partition(day='20190916')
select platform, count(1) cnt, day as d from access_wide where day='20190916' group by platform, day;
至此,数据分析结果已保存在 platform_stat
表。
注意:分析结果中有冗余字段 d
,下面会讲解为何会有这个字段
优化
在我们的案例中,数据分析只用简单的 SQL
就可以搞定了。但上面的 SQL
在生产中往往会发生数据倾斜问题,用一个小技巧来解决:key 随机
,将之前的一个 MR
改写成两个 MR
=> 第一个 MR
中给 key 加上随机数再统计,第二个 MR
去除随机数后再统计即可解决倾斜问题。
insert overwrite table platform_stat partition(day='20190916')
select del_prefix(m.platform_add) platform ,sum(m.cnt) cnt,m.d d from (
select add_prefix(platform) platform_add, count(1) cnt, day as d from access_wide where day='20190916' group by add_prefix(platform), day ) m
group by del_prefix(m.platform_add),m.d;
数据展示
项目进行到这里,数据处理相关部分都已结束,接下来是要进行数据的展示。到现在为止,所有的数据都是存在大数据平台上,但大数据平台是不向外提供任务数据和服务。本例是将数据分析结果导出到 MySQL
。
MySql
在 MySql
创建表存储 Hive
中 platform_stat
表导出的数据。
create table platform_stat(
platform varchar(32),
cnt int,
d varchar(8)
)engine=innodb default charset=utf8;
MySql
表需要日期 d
这个字段的。
Sqoop
Sqoop
是一个数据同步工具,用于关系型数据库和各种大数据存储比如 Hive
之间的数据相互同步;底层是 MR job
,命令执行时在当前目录自动生成 java
文件。
以下命令,将 Hive
中 platform_stat
分区表中的数据导入到 MySql
(可能会缺少 jar,自行拷贝)
sqoop export \
--connect jdbc:mysql://localhost:3306/rz_db \
--username root \
--password 123456 \
--mapreduce-job-name platform2MySql \
--table platform_stat \
--export-dir /project/hadoop/access/platform/day=20190916 \
--columns "platform,cnt,d" \
--input-fields-terminated-by '\001'
有几点需要说明:
--table
:MySQL
中的表--input-fields-terminated-by
:Hive
表中数据分割符,’\001’ 是Hive
默认分隔符--export-dir
:Hive
表在HDFS
上的地址-
冗余字段
d
:hive
表的字段为何要加冗余字段d
,我们看下--export-dir
内容就知道Android 400 20190916
iOS 382 20190916
以上是有字段 d
才有的,不然文件中只有两个字段,这样的话导入 MySQL
时也就只有 2
个字段,这显然不符合要求(Hive 分区字段
体现在目录而不是文件中)。
数据到 MySQL
中了,正常的工作流程已结束。但工作时我们要保证数据不重复,在 sqoop
之前,我们再增加一步 MySQL
的删除操作。
mysql -uroot -p123456 -e”use rz_db;delete from platform_stat where d=20190916;”