日志收集与分析 Demo

Posted by danner on May 15, 2018

本文记录日志收集与分析小项目的架构和流程

介绍

一条埋点数据经 nginx 网关分发到对应的 webServerwebServer 处理埋点数据后以文本的形式存储在本地磁盘;离线处理时,Flume 会收集本机的处理后的埋点日志并存储到 HDFS,此时 Hive/Spark 进行数据的处理和分析;实时计算,对接 Flume 的是 Kafka 组件,然后实时处理框架 Spark/Flink/storm 直接从 Kafka 中读取数据处理。至此,一个典型的大数据处理框架就出来了。

本案例处理流程如下

  • 数据生成:日志产生器请求服务器接口
  • 数据采集: SpringBoot 服务存储日志到本地, Flume 采集新增的日志信息到 HDFS
  • 数据存储: HDFS
  • 数据处理(ETL)
    • 数据清洗:删除不符合规范的脏数据
    • 解析字段:time 解析成年、月、日;ip 解析成具体省市
    • 把后续统计分析要用到的所有字段都补齐
  • 数据分析
    • 基于 ETL 结果生成的大宽表来分析
  • 数据展示

日志生成

nginx

nginx 安装步骤网上参考资料很多,这里只看下配置文件,默认是在 /usr/local/nginx/conf/nginx.conf

...
upstream danner {
         server danner000:7777;
    }

# other server listen
server {
    listen       80;
    server_name  localhost;

   location / {
        #root   html;
        #index  index.html index.htm;
	# 此处不要有 "_",被坑了很久,不知道为什么
        proxy_pass http://danner;
    }
}
... 

每个 server 都是单独配置,location 是本机接收的请求,proxy_pass 是转发到何处。本例中是将 http://localhost:80/ 转发到 http://danner000:7777/ (这里是同个机子,生产上肯定是不同机子)。

nginx 在本例中就一个配置,它规定两个东西:客户端请求的端口是 80web Server 必须有 7777 端口来响应。

日志生成器

普通的 java 代码随机生成一些日志,通过 nginx 机子的 80 端口数据直达到 web server7777 端口。

Spring Boot

web Server 是用 Spring Boot 服务来实现,它就实现一个 sendAction 接口,并将接口收到的内容通过 log4j 存到本地文件。很简单都是常规操作,就贴下配置

application.yml

# 定义端口
server:
  port: 7777
  servlet:
    context-path: /log

log4j.properties

### set log levels ###
log4j.rootLogger = INFO,D
### log file ###
log4j.appender.D = org.apache.log4j.DailyRollingFileAppender
log4j.appender.D.file =../log/access.log
log4j.appender.D.Append = true
log4j.appender.D.Threshold = INFO
log4j.appender.D.DatePattern='.'yyyy-MM-dd
log4j.appender.D.layout = org.apache.log4j.PatternLayout
log4j.appender.D.layout.ConversionPattern = %m%n

接口

 @RequestMapping(value = "sendAction", method = RequestMethod.POST)
@ResponseBody
public String sendAction(@RequestBody AccessLog accessLog){
    // 接收 accesslog 对象
    logger.info(JSON.toJSONString(accessLog));
    return accessLog.toString();
}

配置成每日创建 access.log 文件,若不是当日的文件会被修改为 access.log.2019-09-16 后缀名是日期。

日志采集

日志生成部分,通过 web server 已经将数据保存到日志文件中,接下来就是通过 Flume 将日志文件采集到 HDFS,目录 /project/hadoop/access/log/%Y%m%d/

taildir-hdfs-ng.sources = r1
taildir-hdfs-ng.sinks = k1
taildir-hdfs-ng.channels = c1

taildir-hdfs-ng.sources.r1.type = TAILDIR
taildir-hdfs-ng.sources.r1.positionFile = /home/hadoop/tmp/data/flume/p.json
taildir-hdfs-ng.sources.r1.filegroups = f1
taildir-hdfs-ng.sources.r1.filegroups.f1 = /home/hadoop/project/hadoop/log/access.log

taildir-hdfs-ng.channels.c1.type = memory

taildir-hdfs-ng.sinks.k1.type = hdfs
taildir-hdfs-ng.sinks.k1.hdfs.fileType = DataStream 
taildir-hdfs-ng.sinks.k1.hdfs.writeFormat = Text
taildir-hdfs-ng.sinks.k1.hdfs.useLocalTimeStamp = true
taildir-hdfs-ng.sinks.k1.hdfs.path = /project/hadoop/access/log/%Y%m%d/
# 1000 行为一个文件
taildir-hdfs-ng.sinks.k1.hdfs.rollCount = 1000
taildir-hdfs-ng.sinks.k1.hdfs.rollSize = 0
taildir-hdfs-ng.sinks.k1.hdfs.rollInterval = 0
# 文件压缩 bzip2
taildir-hdfs-ng.sinks.k1.hdfs.codeC = bzip2
taildir-hdfs-ng.sinks.k1.hdfs.fileType = CompressedStream

taildir-hdfs-ng.sources.r1.channels = c1
taildir-hdfs-ng.sinks.k1.channel = c1

执行命令

flume-ng agent -c $FLUME_HOME/conf -f taildir-hdfs.conf -n taildir-hdfs-ng -Dflume.root.logger=INFO,console

至此,日志文件按天归档为一个目录存储在 HDFS

ETL

做两件事情:删除没用的脏数据扩展字段;简单的 MR 程序(只涉及 MapperReducer),省略。

Hive

MR 的输出数据加载到 Hive 表供后续分析。首先创建外部表

create external table danner_db.access_wide(
> user string,
> platform string,
> version string,
> ip string,
> traffic bigint,
> time string,
> duration bigint,
> appid string,
> province string,
> city string,
> isp string,
> y string,
> m string,
> d string
> ) partitioned by(day string)
> row format delimited fields terminated by '\t'
> location "hdfs://192.168.22.147:9000/project/hadoop/access/wide";

MR 输出数据载入表

hive -e “ALTER TABLE danner_db.access_wide ADD IF NOT EXISTS PARTITION(day=20190916);”

执行成功则 access_wide 表有数据可查询,当然前提是 hdfs://192.168.22.147:9000/project/hadoop/access/wide/day=20190916 目录下有对应内容的文件

hive> select * from access_wide limit 10;
OK
john    iOS     4.0     222.33.119.74   1871    2019-09-16      137     qyId    辽宁    抚顺    铁通    2019    9       16      20190916
zhang   iOS     4.0     123.234.2.114   746     2019-09-16      370     wwwId   山东    青岛    联通    2019    9       16      20190916
li      Android 2.0     210.44.41.131   2260    2019-09-16      354     comId   山东    济南    教育网  2019    9       16      20190916
wang    Android 3.0     139.215.11.4    3493    2019-09-16      399     httpId  吉林            联通    2019    9       16      20190916
john    iOS     1.0     222.94.195.61   6455    2019-09-16      117     httpId  江苏    南京    电信    2019    9       16      20190916
zhao    Android 1.0     61.235.190.1    2718    2019-09-16      351     qyId    广西    南宁    铁通    2019    9       16      20190916
zhao    iOS     1.0     36.56.35.166    6307    2019-09-16      194     orgId   安徽    亳州    电信    2019    9       16      20190916
zhao    iOS     5.0     210.41.241.220  8187    2019-09-16      105     qyId    四川    成都    教育网  2019    9       16      20190916
zhao    Android 1.0     182.91.241.65   2702    2019-09-16      163     webId   广西    桂林    联通    2019    9       16      20190916
zhao    Android 5.0     210.39.250.184  6249    2019-09-16      129     qyId    广东    广州    教育网  2019    9       16      20190916
Time taken: 0.148 seconds, Fetched: 10 row(s)

脚本

ETL 做的操作还是比较零散且每次都要手工去操作肯定是不行的。在实际工作中,我们倾向于用脚本将步骤串联起来,来看看具体有哪些步骤要操作

  • MR 清洗数据
  • MR 结果文件移动到 hive 表目录下
  • 更新 hive


vi etl.sh

#! /bin/bash

if [ $# -eq 1 ];then
    time=$1
else
    time=`date -d "yesterday" +%Y%m%d`
fi

echo ${time}    

# mr
hadoop jar ../lib/G7-41-dwt.jar com.danner.bigdata.hadoop.homework.ETLApp /project/hadoop/access/log/${time} /project/hadoop/access/out/${time} /tmp/data/ip.txt 
echo "step 1: mr job finish"

# 移动 out 下文件到 hive 路径下
hadoop fs -rmr /project/hadoop/access/wide/day=${time}
echo "stpe 2: 删除之前 hive 路径下之前存在的分区数据"

hadoop fs -mkdir -p /project/hadoop/access/wide/day=${time}
hadoop fs -mv /project/hadoop/access/out/${time}/part* /project/hadoop/access/wide/day=${time}/
echo "stpe 3: 移动 ETL 后的数据到 hive 路径"

hadoop fs -rmr  /project/hadoop/access/out/${time}
echo "stpe 3: 删除 ETL 后的数据"

# 刷新 hive 
hive -e "ALTER TABLE danner_db.access_wide ADD IF NOT EXISTS PARTITION(day=${time});"
echo "step 4: 刷新分区元数据: ${time}"

只要执行此 shell 直接可以将 flume 保存 HDFS 数据 ETL 后刷新到 hive

数据分析

ETL 后我们就要进行数据分析,本例中只要简单的统计每类终端平台的数量

select platform,count(1) from access_wide where day=20190916 group by platform;

统计出来只是打印没卵用,还是要建表存储

create external table platform_stat(
platform string,
cnt int,
d string
) partitioned by(day string)
location 'hdfs://192.168.22.147:9000/project/hadoop/access/platform';

分析结果刷到新建的 platform_stat

insert overwrite table platform_stat partition(day='20190916') 
select platform, count(1) cnt, day as d from access_wide where day='20190916' group by platform, day;

至此,数据分析结果已保存在 platform_stat 表。

注意:分析结果中有冗余字段 d,下面会讲解为何会有这个字段

优化

在我们的案例中,数据分析只用简单的 SQL 就可以搞定了。但上面的 SQL 在生产中往往会发生数据倾斜问题,用一个小技巧来解决:key 随机,将之前的一个 MR 改写成两个 MR => 第一个 MR 中给 key 加上随机数再统计,第二个 MR 去除随机数后再统计即可解决倾斜问题。

insert overwrite table platform_stat partition(day='20190916') 
select del_prefix(m.platform_add) platform ,sum(m.cnt) cnt,m.d d from (
select add_prefix(platform) platform_add, count(1) cnt, day as d from access_wide where day='20190916' group by add_prefix(platform), day ) m
group by del_prefix(m.platform_add),m.d;

数据展示

项目进行到这里,数据处理相关部分都已结束,接下来是要进行数据的展示。到现在为止,所有的数据都是存在大数据平台上,但大数据平台是不向外提供任务数据和服务。本例是将数据分析结果导出到 MySQL

MySql

MySql 创建表存储 Hiveplatform_stat 表导出的数据。

create table platform_stat(
platform varchar(32),
cnt int,
d varchar(8)
)engine=innodb default charset=utf8;

MySql 表需要日期 d 这个字段的。

Sqoop

Sqoop 是一个数据同步工具,用于关系型数据库和各种大数据存储比如 Hive 之间的数据相互同步;底层是 MR job,命令执行时在当前目录自动生成 java 文件。

以下命令,将 Hiveplatform_stat 分区表中的数据导入到 MySql(可能会缺少 jar,自行拷贝)

sqoop export \
--connect jdbc:mysql://localhost:3306/rz_db \
--username root \
--password 123456 \
--mapreduce-job-name platform2MySql \
--table platform_stat \
--export-dir /project/hadoop/access/platform/day=20190916 \
--columns "platform,cnt,d" \
--input-fields-terminated-by '\001'

有几点需要说明:

  • --tableMySQL 中的表
  • --input-fields-terminated-byHive 表中数据分割符,’\001’ 是 Hive 默认分隔符
  • --export-dirHive 表在 HDFS 上的地址
  • 冗余字段 dhive 表的字段为何要加冗余字段 d,我们看下 --export-dir 内容就知道

    Android 400 20190916
    iOS 382 20190916

以上是有字段 d 才有的,不然文件中只有两个字段,这样的话导入 MySQL 时也就只有 2 个字段,这显然不符合要求(Hive 分区字段 体现在目录而不是文件中)。

数据到 MySQL 中了,正常的工作流程已结束。但工作时我们要保证数据不重复,在 sqoop 之前,我们再增加一步 MySQL 的删除操作。

mysql -uroot -p123456 -e”use rz_db;delete from platform_stat where d=20190916;”

UI

调度平台

Azkaban

待续

讨论

Flume 能不能直接部署在nginx 集群的机器上

Flume 到 HDFS 会有什么问题

参考资料

Nginx系列(一)–nginx是什么
nginx常用代理配置