本文记录日志收集与分析小项目的架构和流程
介绍
一条埋点数据经 nginx 网关分发到对应的 webServer,webServer 处理埋点数据后以文本的形式存储在本地磁盘;离线处理时,Flume 会收集本机的处理后的埋点日志并存储到 HDFS,此时 Hive/Spark 进行数据的处理和分析;实时计算,对接 Flume 的是 Kafka 组件,然后实时处理框架 Spark/Flink/storm 直接从 Kafka 中读取数据处理。至此,一个典型的大数据处理框架就出来了。

本案例处理流程如下
- 数据生成:日志产生器请求服务器接口
- 数据采集:
SpringBoot服务存储日志到本地,Flume采集新增的日志信息到HDFS - 数据存储:
HDFS - 数据处理(ETL)
- 数据清洗:删除不符合规范的脏数据
- 解析字段:
time解析成年、月、日;ip解析成具体省市 - 把后续统计分析要用到的所有字段都补齐
- 数据分析
- 基于
ETL结果生成的大宽表来分析
- 基于
- 数据展示
日志生成
nginx
nginx 安装步骤网上参考资料很多,这里只看下配置文件,默认是在 /usr/local/nginx/conf/nginx.conf
...
upstream danner {
server danner000:7777;
}
# other server listen
server {
listen 80;
server_name localhost;
location / {
#root html;
#index index.html index.htm;
# 此处不要有 "_",被坑了很久,不知道为什么
proxy_pass http://danner;
}
}
...
每个 server 都是单独配置,location 是本机接收的请求,proxy_pass 是转发到何处。本例中是将 http://localhost:80/ 转发到 http://danner000:7777/ (这里是同个机子,生产上肯定是不同机子)。
nginx 在本例中就一个配置,它规定两个东西:客户端请求的端口是 80,web Server 必须有 7777 端口来响应。
日志生成器
普通的 java 代码随机生成一些日志,通过 nginx 机子的 80 端口数据直达到 web server 的 7777 端口。
Spring Boot
web Server 是用 Spring Boot 服务来实现,它就实现一个 sendAction 接口,并将接口收到的内容通过 log4j 存到本地文件。很简单都是常规操作,就贴下配置
application.yml
# 定义端口
server:
port: 7777
servlet:
context-path: /log
log4j.properties
### set log levels ###
log4j.rootLogger = INFO,D
### log file ###
log4j.appender.D = org.apache.log4j.DailyRollingFileAppender
log4j.appender.D.file =../log/access.log
log4j.appender.D.Append = true
log4j.appender.D.Threshold = INFO
log4j.appender.D.DatePattern='.'yyyy-MM-dd
log4j.appender.D.layout = org.apache.log4j.PatternLayout
log4j.appender.D.layout.ConversionPattern = %m%n
接口
@RequestMapping(value = "sendAction", method = RequestMethod.POST)
@ResponseBody
public String sendAction(@RequestBody AccessLog accessLog){
// 接收 accesslog 对象
logger.info(JSON.toJSONString(accessLog));
return accessLog.toString();
}
配置成每日创建 access.log 文件,若不是当日的文件会被修改为 access.log.2019-09-16 后缀名是日期。
日志采集
在日志生成部分,通过 web server 已经将数据保存到日志文件中,接下来就是通过 Flume 将日志文件采集到 HDFS,目录 /project/hadoop/access/log/%Y%m%d/
taildir-hdfs-ng.sources = r1
taildir-hdfs-ng.sinks = k1
taildir-hdfs-ng.channels = c1
taildir-hdfs-ng.sources.r1.type = TAILDIR
taildir-hdfs-ng.sources.r1.positionFile = /home/hadoop/tmp/data/flume/p.json
taildir-hdfs-ng.sources.r1.filegroups = f1
taildir-hdfs-ng.sources.r1.filegroups.f1 = /home/hadoop/project/hadoop/log/access.log
taildir-hdfs-ng.channels.c1.type = memory
taildir-hdfs-ng.sinks.k1.type = hdfs
taildir-hdfs-ng.sinks.k1.hdfs.fileType = DataStream
taildir-hdfs-ng.sinks.k1.hdfs.writeFormat = Text
taildir-hdfs-ng.sinks.k1.hdfs.useLocalTimeStamp = true
taildir-hdfs-ng.sinks.k1.hdfs.path = /project/hadoop/access/log/%Y%m%d/
# 1000 行为一个文件
taildir-hdfs-ng.sinks.k1.hdfs.rollCount = 1000
taildir-hdfs-ng.sinks.k1.hdfs.rollSize = 0
taildir-hdfs-ng.sinks.k1.hdfs.rollInterval = 0
# 文件压缩 bzip2
taildir-hdfs-ng.sinks.k1.hdfs.codeC = bzip2
taildir-hdfs-ng.sinks.k1.hdfs.fileType = CompressedStream
taildir-hdfs-ng.sources.r1.channels = c1
taildir-hdfs-ng.sinks.k1.channel = c1
执行命令
flume-ng agent -c $FLUME_HOME/conf -f taildir-hdfs.conf -n taildir-hdfs-ng -Dflume.root.logger=INFO,console
至此,日志文件按天归档为一个目录存储在 HDFS
ETL
做两件事情:删除没用的脏数据、扩展字段;简单的 MR 程序(只涉及 Mapper 无 Reducer),省略。
Hive
将 MR 的输出数据加载到 Hive 表供后续分析。首先创建外部表
create external table danner_db.access_wide(
> user string,
> platform string,
> version string,
> ip string,
> traffic bigint,
> time string,
> duration bigint,
> appid string,
> province string,
> city string,
> isp string,
> y string,
> m string,
> d string
> ) partitioned by(day string)
> row format delimited fields terminated by '\t'
> location "hdfs://192.168.22.147:9000/project/hadoop/access/wide";
将 MR 输出数据载入表
hive -e “ALTER TABLE danner_db.access_wide ADD IF NOT EXISTS PARTITION(day=20190916);”
执行成功则 access_wide 表有数据可查询,当然前提是 hdfs://192.168.22.147:9000/project/hadoop/access/wide/day=20190916 目录下有对应内容的文件
hive> select * from access_wide limit 10;
OK
john iOS 4.0 222.33.119.74 1871 2019-09-16 137 qyId 辽宁 抚顺 铁通 2019 9 16 20190916
zhang iOS 4.0 123.234.2.114 746 2019-09-16 370 wwwId 山东 青岛 联通 2019 9 16 20190916
li Android 2.0 210.44.41.131 2260 2019-09-16 354 comId 山东 济南 教育网 2019 9 16 20190916
wang Android 3.0 139.215.11.4 3493 2019-09-16 399 httpId 吉林 联通 2019 9 16 20190916
john iOS 1.0 222.94.195.61 6455 2019-09-16 117 httpId 江苏 南京 电信 2019 9 16 20190916
zhao Android 1.0 61.235.190.1 2718 2019-09-16 351 qyId 广西 南宁 铁通 2019 9 16 20190916
zhao iOS 1.0 36.56.35.166 6307 2019-09-16 194 orgId 安徽 亳州 电信 2019 9 16 20190916
zhao iOS 5.0 210.41.241.220 8187 2019-09-16 105 qyId 四川 成都 教育网 2019 9 16 20190916
zhao Android 1.0 182.91.241.65 2702 2019-09-16 163 webId 广西 桂林 联通 2019 9 16 20190916
zhao Android 5.0 210.39.250.184 6249 2019-09-16 129 qyId 广东 广州 教育网 2019 9 16 20190916
Time taken: 0.148 seconds, Fetched: 10 row(s)
脚本
ETL 做的操作还是比较零散且每次都要手工去操作肯定是不行的。在实际工作中,我们倾向于用脚本将步骤串联起来,来看看具体有哪些步骤要操作
MR清洗数据MR结果文件移动到hive表目录下- 更新
hive表
vi etl.sh
#! /bin/bash
if [ $# -eq 1 ];then
time=$1
else
time=`date -d "yesterday" +%Y%m%d`
fi
echo ${time}
# mr
hadoop jar ../lib/G7-41-dwt.jar com.danner.bigdata.hadoop.homework.ETLApp /project/hadoop/access/log/${time} /project/hadoop/access/out/${time} /tmp/data/ip.txt
echo "step 1: mr job finish"
# 移动 out 下文件到 hive 路径下
hadoop fs -rmr /project/hadoop/access/wide/day=${time}
echo "stpe 2: 删除之前 hive 路径下之前存在的分区数据"
hadoop fs -mkdir -p /project/hadoop/access/wide/day=${time}
hadoop fs -mv /project/hadoop/access/out/${time}/part* /project/hadoop/access/wide/day=${time}/
echo "stpe 3: 移动 ETL 后的数据到 hive 路径"
hadoop fs -rmr /project/hadoop/access/out/${time}
echo "stpe 3: 删除 ETL 后的数据"
# 刷新 hive
hive -e "ALTER TABLE danner_db.access_wide ADD IF NOT EXISTS PARTITION(day=${time});"
echo "step 4: 刷新分区元数据: ${time}"
只要执行此 shell 直接可以将 flume 保存 HDFS 数据 ETL 后刷新到 hive
数据分析
在 ETL 后我们就要进行数据分析,本例中只要简单的统计每类终端平台的数量:
select platform,count(1) from access_wide where day=20190916 group by platform;
统计出来只是打印没卵用,还是要建表存储
create external table platform_stat(
platform string,
cnt int,
d string
) partitioned by(day string)
location 'hdfs://192.168.22.147:9000/project/hadoop/access/platform';
分析结果刷到新建的 platform_stat 表
insert overwrite table platform_stat partition(day='20190916')
select platform, count(1) cnt, day as d from access_wide where day='20190916' group by platform, day;
至此,数据分析结果已保存在 platform_stat 表。
注意:分析结果中有冗余字段 d,下面会讲解为何会有这个字段
优化
在我们的案例中,数据分析只用简单的 SQL 就可以搞定了。但上面的 SQL 在生产中往往会发生数据倾斜问题,用一个小技巧来解决:key 随机,将之前的一个 MR 改写成两个 MR => 第一个 MR 中给 key 加上随机数再统计,第二个 MR 去除随机数后再统计即可解决倾斜问题。
insert overwrite table platform_stat partition(day='20190916')
select del_prefix(m.platform_add) platform ,sum(m.cnt) cnt,m.d d from (
select add_prefix(platform) platform_add, count(1) cnt, day as d from access_wide where day='20190916' group by add_prefix(platform), day ) m
group by del_prefix(m.platform_add),m.d;
数据展示
项目进行到这里,数据处理相关部分都已结束,接下来是要进行数据的展示。到现在为止,所有的数据都是存在大数据平台上,但大数据平台是不向外提供任务数据和服务。本例是将数据分析结果导出到 MySQL。
MySql
在 MySql 创建表存储 Hive 中 platform_stat 表导出的数据。
create table platform_stat(
platform varchar(32),
cnt int,
d varchar(8)
)engine=innodb default charset=utf8;
MySql 表需要日期 d 这个字段的。
Sqoop
Sqoop 是一个数据同步工具,用于关系型数据库和各种大数据存储比如 Hive 之间的数据相互同步;底层是 MR job,命令执行时在当前目录自动生成 java 文件。
以下命令,将 Hive 中 platform_stat 分区表中的数据导入到 MySql(可能会缺少 jar,自行拷贝)
sqoop export \
--connect jdbc:mysql://localhost:3306/rz_db \
--username root \
--password 123456 \
--mapreduce-job-name platform2MySql \
--table platform_stat \
--export-dir /project/hadoop/access/platform/day=20190916 \
--columns "platform,cnt,d" \
--input-fields-terminated-by '\001'
有几点需要说明:
--table:MySQL中的表--input-fields-terminated-by:Hive表中数据分割符,’\001’ 是Hive默认分隔符--export-dir:Hive表在HDFS上的地址-
冗余字段
d:hive表的字段为何要加冗余字段d,我们看下--export-dir内容就知道Android 400 20190916
iOS 382 20190916
以上是有字段 d 才有的,不然文件中只有两个字段,这样的话导入 MySQL 时也就只有 2 个字段,这显然不符合要求(Hive 分区字段 体现在目录而不是文件中)。
数据到 MySQL 中了,正常的工作流程已结束。但工作时我们要保证数据不重复,在 sqoop 之前,我们再增加一步 MySQL 的删除操作。
mysql -uroot -p123456 -e”use rz_db;delete from platform_stat where d=20190916;”