一、环境要求
Hadoop+Hive+Spark+HBase 开发环境。
二、数据描述
countrydata.csv 是世界新冠疫情数,数据中记录了从疫情开始至 7 月 2 日,以国家为单位的每日新冠疫情感染人数的数据统计。字段说明如下:
世界新冠疫情数据 countrydata.csv | 中文名称 | 英文名称 |
序号(列 1) | id | |
累计确诊人数(列 2) | confirmedCount | |
当日新增人数(列 3) | confirmedIncr | |
时间(列 4) | recordDate | |
国家名称(列 5) | countryName | |
国名代码(列 6) | countryShortCode | |
大洲(列 7) | continent |
三、功能要求
1.数据准备
请在 HDFS 中创建目录/countrydata,并将 countrydata.csv 传到该目录。
[root@kb135 examdata]# hdfs dfs -mkdir -p /countrydata
[root@kb135 examdata]# hdfs dfs -put countrydata.csv /countrydata
[root@kb135 examdata]# hdfs dfs -cat /countrydata/countrydata.csv | wc -l
2.在 Spark-Shell 中,加载 HDFS 文件系统 countrydata.csv 文件,并使用 RDD 完成以下统计计算。
scala> val fileRdd = sc.textFile("hdfs://kb135:9000/countrydata/countrydata.csv")
scala> val yqRdd = fileRdd.map(x=>x.split(","))
①统计每个国家在数据截止统计时的累计确诊人数。
scala> yqRdd
.map(x=>(x(4),x(1).toInt))
.reduceByKey((v1,v2)=>Math.max(v1,v2))
.collect.foreach(println)
scala> yqRdd.map(x=>(x(4),x(2).toInt))
.reduceByKey(_+_)
.collect.foreach(println)
②统计全世界在数据截止统计时的总感染人数。
scala> yqRdd
.map(x=>(x(4),x(2).toInt))
.reduceByKey(_+_)
.reduce((x,y)=>("allworld",x._2+y._2))
③统计每个大洲中每日新增确诊人数最多的国家及确诊人数,并输出 20200408 这一天各大洲当日新增确诊人数最多的国家及确诊人数。
scala> yqRdd
.map(x=>((x(6),x(3)),(x(1),x(2),x(4))))
.reduceByKey((v1,v2)=>{if (v1._2>v2._2) v1 else v2})
.filter(x=>x._1._2=="20200408")
.map(x=>(x._1._1,x._1._2,x._2._3,x._2._1,x._2._2))
.collect.foreach(println)
④统计每个大洲中每日累计确诊人数最多的国家及确诊人数,并输出 20200607 这一天各大洲当日累计确诊人数最多的国家及确诊人数。
scala> yqRdd
.map(x=>((x(6),x(3)),(x(1),x(2),x(4))))
.reduceByKey((v1,v2)=>{if (v1._1>v2._1) v1 else v2})
.filter(x=>x._1._2=="20200607")
.map(x=>(x._1._1,x._1._2,x._2._3,x._2._1,x._2._2))
.collect.foreach(println)
⑤统计每个大洲每月累计确诊人数,显示 202006 这个月每个大洲的累计确诊人数。
scala> yqRdd
.map(x=>((x(6),x(3).substring(0,6)),x(2).toInt))
.reduceByKey((v1,v2)=>v1+v2)
.filter(x=>x._1._2=="202006")
.collect.foreach(println)
3.创建 HBase 数据表,在 HBase 中创建命名空间(namespace)exam1018,在该命名空间下创建 covid19_world 表,使用大洲和统计日期的组合作为 RowKey(如“亚洲 20200520”),该表下有 1 个列族 record。record 列族用于统计疫情数据(每个大洲当日新增确诊人数最多的国家
record:maxIncreaseCountry 及其新增确诊人数 record:maxIncreaseCount)。
create_namespace 'exam1018'
create 'exam1018:covid19_world','record'
4.请在 Hive 中创建数据库 exam,在该数据库中创建外部表 ex_exam_record 指向 /app/data/exam 下的疫情数据 ;创建外部表 ex_exam_covid19_record 映射至 HBase 中的 exam:covid19_world 表的 record 列族, ex_exam_record 表结构如下:
字段名称 | 字段类型 | 字段含义 |
id | string | 记录 ID |
confirmedCount | int | 累计确诊人数 |
confirmedIncr | int | 新增确诊人数 |
recordDate | string | 记录时间 |
countryName | string | 国家名 |
countryShortCode | string | 国家代码 |
continent | string | 大洲 |
ex_exam_covid19_record 表结构如下:
字段名称 | 字段类型 | 字段含义 |
key | string | rowkey |
maxIncreaseCountry | string | 当日新增确诊人数最多的国家 |
maxIncreaseCount | int | 新增确诊人数 |
create external table ex_exam_record
(
id string,
confirmedCount int,
confirmedIncr int,
recordDate string,
countryName string,
countryShortCode string,
continent string
)
row format delimited fields terminated by ","
stored as textfile location "/countrydata";
create external table ex_exam_covid19_record(
key string,
maxIncreaseCountry string,
maxIncreaseCount int
)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with
serdeproperties ("hbase.columns.mapping"=":key,record:maxIncreaseCountry,record:maxIncreaseCount")
tblproperties ("hbase.table.name"="exam1018:covid19_world");
5. 使用 ex_exam_record 表中的数据
①统计每个大洲中每日新增确诊人数最多的国家,将 continent 和 recordDate 合并成
rowkey,并保存到 ex_exam_covid19_record 表中。
with
tb as (
select continent,recordDate,countryName,confirmedIncr,
rank() over (partition by continent,recordDate order by confirmedIncr desc) as increaseId
from ex_exam_record)
insert into table ex_exam_covid19_record
select concat(continent,recordDate) key
,countryName maxIncreaseCountry
,confirmedIncr maxIncreaseCount
from tb where increaseId=1;
②完成统计后,在 HBase Shell 中遍历 exam:covid19_world 表中的前 20 条数据。
scan 'exam1018:covid19_world',{LIMIT=>20}