Hive

Posted by danner on May 2, 2017

简介

Hive 是一种用 类SQL 语句来协助读写、管理那些存储在分布式存储系统上大数据集,构建在 Hadoop 之上的的数据仓库工具。

特点

  •  Hive 最大的特点是通过 类SQL分析大数据,而避免了写 MapReduce 程序来分析数据,这样使得分析数据更容易
  •  数据是存储在 HDFS 上的,Hive 本身并不提供数据的存储功能
  •  Hive 将数据映射成数据库和一张张的表,库和表的元数据信息一般存在关系型数据库上(比如 MySQL), 统一的元数据管理
  • 数据存储方面:它能够存储很大的数据集,并且对数据完整性、格式要求并不严格
  • 数据处理方面:因为Hive 语句最终会生成 MapReduce 任务去计算,所以不适用于实时计算的场景,它适用于离线分析
  • 在数据查询时进行模式验证,而不是加载的时候验证

架构

核心

Hive 的核心是驱动引擎,驱动引擎由四部分组成:

  • 解释器:解释器的作用是将 HiveSQL 语句转换为语法树(AST)。
  • 编译器:编译器是将语法树编译为逻辑执行计划
  • 优化器:优化器是对逻辑执行计划进行优化
  • 执行器:执行器是调用底层的运行框架执行逻辑执行计划

Hive客户端

Hive 有很多种客户端:

  • cli 命令行客户端:采用交互窗口,用 hive 命令行和 Hive 进行通信
  • HiveServer2 客户端:用 Thrift 协议进行通信,Thrift 是不同语言之间的转换器,是连接不同语言程序间的协议,通过 JDBC 或者ODBC 去访问 Hive
  • HWI 客户端:hive 自带的一个客户端,但是比较粗糙,一般不用
  • HUE 客户端:通过Web 页面来和 Hive 进行交互,使用的比较多

数据模型

语法

创建数据库

创建一个数据库会在 HDFS 上创建一个目录Hive 里数据库的概念类似于程序中的命名空间,用数据库来组织表,在大量表的情况下,用数据库来分开可以避免表名冲突。Hive 默认的数据库是 default

create database if not exists user_db;

上述命令会在 HDFS 上创建 user_db 目录

删除数据库

删除数据库时,如果库中存在数据表,是不能删除的,要删除所有表,删除数据库。添加上 cascade 后,就可以先自动删除所有表后,再删除数据库。(慎用!)删除数据库后,HDFS 上数据库对应的目录就被删除掉了。

drop database if exists testdb cascade;

内部表

创建

create table rel.employee(
user_id string,
salary int,
worked_citys array<string>,
social_security map<string,float>,
welfare struct<meal_allowance:float,if_regular:boolean,commercial_insurance:float>
)
row format delimited fields terminated by '\t' 
collection items terminated by ','
map keys terminated by ':'
lines terminated by '\n'
stored as textfile;

create table 直接创建内部表,本例中创建成功后会在 HDFS/user/hive/warehouse/rel.db 下创建 employee 目录。

载入数据

load data local inpath ‘/home/hadoop/work/hive/xiaoxdata/data/employee_data.txt’ into table employee;

employee_data.txt 数据插入到 employee 表,同时 employee_data.txt 被上传到 HDFSemployee 目录下。 当删除 employee 表时,HDFS 上的 employee 目录被自动删除

外部表

创建

create external table rel.student_school_info(
student_id string,
name string,
institute_id string,
major_id string,
school_year string
)
row format delimited 
fields terminated by '\t' 
lines terminated by '\n'
stored as textfile
location '/user/hive/warehouse/data/student_school_info';

create external table 创建外部表,创建成功会在 HDFS 上生成 student_school_info 目录(注意,该路径是指定的而不是 hive 默认路径)。

载入数据

hadoop fs -put /home/hadoop/work/hive/xiaoxdata/data/student_school_info_external_data.txt /user/hive/warehouse/data/student_school_info;

不同于内部表load 操作,对于外部表的插入数据只需将数据文件上传到对应的 HDFS 目录,就会自动插入表。 当删除表时, HDFS 上对应的数据不会被删除(外部表的优势)。

创建分区

Hive 查询一般是扫描整个目录,但是有时候我们关心的数据只是集中在某一部分数据,比如我们一个 Hive 查询,往往是只是查询某一天的数据,这样的情况下,可以使用分区表来优化,一天是一个分区,查询时候,Hive 只扫描指定分区的数据。 普通表和分区表的区别在于:一个 Hive 表在 HDFS 上是有一个对应的目录来存储数据,普通表的数据直接存储在这个目录下,而分区表数据存储时,是再划分子目录来存储的,一个分区一个子目录。主要作用是来优化查询性能

create table student_school_info_partition(
student_id string,
name string,
institute_id string,
major_id string
)
partitioned by(school_year string) 
row format delimited
fields terminated by '\t' 
lines terminated by '\n'
stored as textfile;	

增加 partitioned by 字段就是创建分区表,外部分区表类同。在不使用动态分区下

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;

INSERT OVERWRITE TABLE emp_dynamic_partition PARTITION (deptno) select * from emp;

注意此时,无序指定分区值,会自动赋值

必须使用如下命令增加分区,才能刷新对应的分区下数据

insert into table student_school_info_partition partition(school_year=’2017’) select * from table1

alter table student_school_partition ADD IF NOT EXISTS PARTITION partition(school_year=’2017’);

不准使用 MSCK 来刷分区数据,这个会全量刷新

分桶表

桶表也是一种用于优化查询而设计的表类型。创建桶表时,指定桶的个数、分桶的依据字段hive 就可以自动将数据分桶存储。查询时只需要遍历一个桶里的数据,或者遍历部分桶,这样就提高了查询效率。

创建

create table rel.student_info_bucket(
student_id string,
name string,
age int,
origin string
)
clustered by (student_id) sorted by (student_id asc) into 4 buckets
row format delimited 
fields terminated by '\t' 
lines terminated by '\n' 
stored as textfile;

载入数据

分桶表一般不使用 load 向分桶表中导入数据,因为 load 导入数据只是将数据复制到表的数据存储目录下,hive 并不会在 load 的时候对数据进行分析然后按照分桶字段分桶,load 只会将一个文件全部导入到分桶表中,并没有分桶。一般采用 insert 从其他表向分桶表插入数据。分桶表在创建表的时候只是定义表的模型,插入的时候需要做如下操作:

  • 在每次执行分桶插入的时候在当前执行的 session 会话中要设置 hive.enforce.bucketing = true; 声明本次执行的是一次分桶操作。
  • 需要指定reduce 个数与分桶的数量相同 set mapreduce.job.reduces=4,这样才能保证有多少桶就生成多少个文件。 如果定义了按照分桶字段排序,需要在从其他表查询数据过程中将数据按照分区字段排序之后插入各个桶中,分桶表并不会将各分桶中的数据排序。排序和分桶的字段相同的时候使用 Cluster by (字段),cluster by 默认按照分桶字段在桶内升序排列,如果需要在桶内降序排列使用 distribute by (col) sort by (col desc) 组合实现。

set hive.enforce.bucketing = true;
set mapreduce.job.reduces=4;
insert overwrite table student_info_bucket
select student_id,name,age,origin
from student_info
cluster by(student_id);

分区和分桶最大的区别就是分桶随机分割数据库,分区非随机分割数据库。

  • 分桶是按照列的哈希函数进行分割的,相对比较平均;而分区是按照列的值来进行分割的,容易造成数据倾斜
  • 分桶是对应不同的文件(细粒度),分区是对应不同的文件夹(粗粒度)。

JOIN

inner join

join 等同于 Inner Join

select * from a inner join b on a.id=b.id;

两个表通过id关联,只把id值相等的数据查询出来。

full join

两个表通过id关联,把两个表的数据全部查询出来;缺失的值直接用 NULL 填充

select * from a full join b on a.id=b.id;

left join

左连接时,左表中出现的join字段都保留,右表没有连接上的都为NULL

select * from a left join b on a.id=b.id;

right join

右连接时,右表中出现的join字段都保留,左表没有连接上的都是NULL

select * from a right join b on a.id=b.id;

left semi join

左半连接实现了类似 IN/EXISTS 的查询语义,输出符合条件的左表内容。hive 不支持 in …exists 这种关系型数据库中的子查询结构,hive 暂时不支持右半连接。例如:

select a.id, a.name from a where a.idin (select b.id from b);

使用 Hive 对应于如下语句:

select a.id,a.name from a left semi join b on a.id = b.id;

map side join

使用分布式缓存将小表数据加载都各个 map 任务中,在 map 端完成 joinmap 任务输出后,不需要将数据拷贝到 reducer 阶段再进行 join降低的数据在网络节点之间传输的开销。多表关联数据倾斜优化的一种手段。多表连接,如果只有一个表比较大,其他表都很小,则 join 操作会转换成一个只包含 mapJob 。运行日志中会出现 Number of reduce tasks is set to 0 since there's no reduce operator 没有 reduce task 的提示。

select /*+ mapjoin(b) */ a.id, a.name from a join b on a.id = b.id

编辑表

like

根据已存在的表结构,使用 like 关键字,复制一个表结构一模一样的新表

create table student_info2 like student_info;

as

根据已经存在的表,使用 as 关键字,创建一个与查询结果字段一致的表,同时将查询结果数据插入到新表

create table student_info4 as select student_id,name from student_info;

alter

rename

表名重命名

alter table student_info4 rename to student_id_name;

add

添加性别列,新添加的字段会在所有列最后,分区列之前

alter table student_info3 add columns (gender string comment ‘性别’);

replace

重新定义表的列

  • 替换列 将继续存在的列再定义一遍,需要替换的列重新定义

    alter table student_info3 replace columns(student_id string,name string,age int,origin string,gender2 int);

  • 删除列 将继续存在的列再定义一遍,需要删除的列不再定义

    alter table student_info3 replace columns(student_id string,name string,age int,origin string);

导出数据

导出数据到本地

hive -e “select * from rel.student_info” > ./student_info_data.txt

默认结果分隔符:'\t'

常用文件格式

TEXTFILE

  • 默认文件格式,建表时用户需要显示指定分隔符   
  • 存储方式:存储   
  • 优点:最简单的数据格式,便于和其他工具(Pig, grep, sed, awk)共享数据,便于查看和编辑;加载较快。   
  • 缺点:耗费存储空间,I/O性能较低;Hive不进行数据切分合并,不能进行并行操作,查询效率低。   
  • 适用场景:适用于小型查询,查看具体数据内容的测试操作。

SequenceFile

  • 二进制键值对序列化文件格式   
  • 存储方式:存储   
  • 优点:可压缩、可分割,优化磁盘利用率和I/O;可并行操作数据,查询效率高。   
  • 缺点:存储空间消耗最大;对于Hadoop生态系统之外的工具不适用,需要通过 text 文件转化加载。   
  • 适用场景:适用于数据量较、大部分列的查询。

RCFILE

  • RCFileHive 推出的一种专门面向列的数据格式,它遵循“先按列划分,再垂直划分”的设计理念。   
  • 存储方式:行列式存储   
  • 优点:可压缩,高效的列存取;查询效率较高。   
  • 缺点:加载时性能消耗较大,需要通过 text 文件转化加载;读取全量数据性能低。

ORCFILE

  • 优化后的 rcfile  
  • 存储方式:行列式存储。   
  • 优缺点:优缺点与 rcfile 类似,查询效率最高。   
  • 适用场景:适用于 Hive 中大型的存储、查询。

数据类型

基本数据类型

整数类型

  • TINYINT、SMALLINT、INT、BIGINT   
  • 空间占用分别是1字节、2字节、4字节、8字节

浮点类型

  • FLOAT、DOUBLE   
  • 空间占用分别是32位和64位浮点数

布尔类型

  • BOOLEAN   
  • 用于存储true和false

字符串文本类型

  • STRING   
  • 存储变长字符串,对类型长度没有限制

时间戳类型

  • TIMESTAMP   
  • 存储精度为纳秒的时间戳
类型 长度 描述
TINYINT 1字节 有符号整型
SMALLINT 2字节 有符号整型
INT 4字节 有符号整型
BIGINT 8字节 有符号整型
FLOAT 4字节 有符号单精度浮点数
DOUBLE 8字节 有符号双精度浮点数
BOOLEAN   布尔类型 TRUE/FALSE
STRING   字符串
TIMESTAMP   时间戳,内容格式:yyyy-mm-dd hh:mm:ss[.f…]

复杂数据类型

ARRAY

  • 存储相同类型的数据,可以通过下标获取数据   
  • 定义:ARRAY   
  • 查询:array[index]

MAP

  • 存储键值对数据,键或者值的类型必须相同,通过键获取值   
  • 定义:MAP<STRING,INT>   
  • 查询:map[‘key’]

STRUCT

  • 可以存储多种不同的数据类型,一旦声明好结构,各字段的位置不能改变   
  • 定义:STRUCT<city:STRING,address :STRING,door_num:STRING>   
  • 查询:struct.fieldname
类型 长度 描述
ARRAY   包含同类型元素的数组,索引从0开始 ARRAY
MAP   字典 MAP<primitive_type,data_type>
STRUCT 4字节 结构体 STRUCT<col_name:data_type[COMMENT col_comment], …>

函数

内置函数

  • 条件函数 case when 语法1:CASE a WHEN b THEN c [WHEN d THEN e]* [ELSE f] END 说明:如果a等于b,那么返回c;如果a等于d,那么返回e;否则返回f

    select case 1 when 2 then ‘two’ when 1 then ‘one’ else ‘zero’ end;

开发过程中应该尽量使用 Hive 内置函数,毕竟 Hive 内置函数经过了大量的测试,性能普遍较好,任何一点性能上的问题在大数据量上跑时候都会被放大。

自定义函数

编写一个 Hive 的自定义函数,需要新建一个 Java 类来继承 UDF 类并实现 evaluate() 函数,evaluate() 函数中编写自定义函数的实现逻辑,返回值给 Hive 使用,需要注意的是,evaluate() 函数的输入输出都必须是 Hadoop 的数据类型,以便可以 MapReduce 程序来进行序列化反序列化。编写完成后将 Java 程序打成 Jar 包,在 Hive 会话中载入 Jar 包来使用自定义函数。在执行 Hive 语句时,遇到一个自定义函数就会实例化一个类,并执行对应的 evaluate() 函数,每行输入都会调用一次 evaluate()函数,所以在编写自定义函数时,一定要注意大数据量时的资源占用问题。

使用步骤:

  • 先开发一个 java 类,继承 UDF,并重载 evaluate 方法

  • 打成 jar 包上传到服务器

  • 临时函数(只在当前窗口有效)

    • 在使用的时候将 jar 包添加到 hiveclasspath

      add jar /home/hadoop/apps/hive_test_data/HiveUdfPro-1.0-SNAPSHOT.jar;

    • 创建临时函数与开发好的 java class 关联

      create temporary function age_partition as ‘cn.chinahadoop.udf.AgePartitionFunction’;

    • 可在 hql 中使用自定义的函数 temporary function

  • 永久函数(需添加 jar)

    • jar上传到 HDFS

    • 创建函数与jar 关联

      CREATE FUNCTION add_prefix_new AS “com.ruozedata.bigdata.hive.udf.AddPrefixUDF” USING JAR “hdfs://ruozedata001:8020/lib/ruozedata-hadoop-1.0.jar”;

    • 可使用自定义函数 add_prefix_new

  • 永久函数(无需添加 jar,代码直接打入源码,Hive第一次启动即可使用)

    • org.apache.hadoop.hive.ql.exec.FunctionRegistry 类中,添加 system.registerUDF("add_prefix_new ", AddPrefixUDF.class, false);

UDF

一进一出,是最普通最常见的自定义函数,输入为一个值,输出也为一个值。

UDAF

自定义聚合函数,输入是多个值,输出是一个值;需要 hive sql 语句和 group by 联合使用的。聚合函数常常需要对大量数组进行操作,所以在编写程序时,一定要注意内存溢出问题。 UDAF 分为两种:简单 UDAF通用 UDAF。简单 UDAF 写起来比较简单,但是因为使用了 JAVA反射机制导致性能有所损失,另外有些特性不能使用,如可变参数列表,通用 UDAF 可以使用所有功能,但是写起来比较复杂

优化

Hadoop 启动开销大,如果每次只做小数量的输入输出,利用率将会很低。所以用好 Hadoop 的首要任务是增大每次任务所搭载的数据量。Hadoop 的核心能力是 paritionsort,因而这也是优化的根本。 Hive 优化时,把 hive Sql 当做 mapreduce 程序来读,而不是当做SQL来读。

参考资料

Hive函数大全
Hive 简单UDAF开发(extends UDAF)
Hive 快速入门