世界新冠疫情大数据案例

news/2024/7/24 13:04:35 标签: 大数据, hive, hbase, hadoop, spark

一、环境要求

Hadoop+Hive+Spark+HBase 开发环境。

二、数据描述

countrydata.csv 是世界新冠疫情数,数据中记录了从疫情开始至 7 月 2 日,以国家为单位的每日新冠疫情感染人数的数据统计。字段说明如下:

世界新冠疫情数据

countrydata.csv

中文名称

英文名称

序号(1)

id

累计确诊人数(2)

confirmedCount

当日新增人数(列 3

confirmedIncr

时间(4)

recordDate

国家名称(列 5

countryName

国名代码(6)

countryShortCode

大洲(7)

continent

三、功能要求

1.数据准备

请在 HDFS 中创建目录/countrydata,并将 countrydata.csv 传到该目录。

[root@kb135 examdata]# hdfs dfs -mkdir -p /countrydata
[root@kb135 examdata]# hdfs dfs -put countrydata.csv /countrydata
[root@kb135 examdata]# hdfs dfs -cat /countrydata/countrydata.csv | wc -l

2.Spark-Shell 中,加载 HDFS 文件系统 countrydata.csv 文件,并使用 RDD 完成以下统计计算。

scala> val fileRdd = sc.textFile("hdfs://kb135:9000/countrydata/countrydata.csv")
scala> val yqRdd = fileRdd.map(x=>x.split(","))

①统计每个国家在数据截止统计时的累计确诊人数。

scala> yqRdd
        .map(x=>(x(4),x(1).toInt))
        .reduceByKey((v1,v2)=>Math.max(v1,v2))
        .collect.foreach(println)

scala> yqRdd.map(x=>(x(4),x(2).toInt))
        .reduceByKey(_+_)
        .collect.foreach(println)

②统计全世界在数据截止统计时的总感染人数。

scala> yqRdd
        .map(x=>(x(4),x(2).toInt))
        .reduceByKey(_+_)
        .reduce((x,y)=>("allworld",x._2+y._2))

③统计每个大洲中每日新增确诊人数最多的国家及确诊人数,并输出 20200408 这一天各大洲当日新增确诊人数最多的国家及确诊人数。

scala> yqRdd
        .map(x=>((x(6),x(3)),(x(1),x(2),x(4))))
        .reduceByKey((v1,v2)=>{if (v1._2>v2._2) v1 else v2})
        .filter(x=>x._1._2=="20200408")
        .map(x=>(x._1._1,x._1._2,x._2._3,x._2._1,x._2._2))
        .collect.foreach(println)

④统计每个大洲中每日累计确诊人数最多的国家及确诊人数,并输出 20200607 这一天各大洲当日累计确诊人数最多的国家及确诊人数。

scala> yqRdd
        .map(x=>((x(6),x(3)),(x(1),x(2),x(4))))
        .reduceByKey((v1,v2)=>{if (v1._1>v2._1) v1 else v2})
        .filter(x=>x._1._2=="20200607")
        .map(x=>(x._1._1,x._1._2,x._2._3,x._2._1,x._2._2))
        .collect.foreach(println)

⑤统计每个大洲每月累计确诊人数,显示 202006 这个月每个大洲的累计确诊人数。

scala> yqRdd
        .map(x=>((x(6),x(3).substring(0,6)),x(2).toInt))
        .reduceByKey((v1,v2)=>v1+v2)
        .filter(x=>x._1._2=="202006")
        .collect.foreach(println)

3.创建 HBase 数据表,在 HBase 中创建命名空间(namespaceexam1018,在该命名空间下创建 covid19_world 表,使用大洲和统计日期的组合作为 RowKey(如“亚洲 20200520”),该表下有 1 个列族 recordrecord 列族用于统计疫情数据(每个大洲当日新增确诊人数最多的国家

record:maxIncreaseCountry 及其新增确诊人数 record:maxIncreaseCount)。

create_namespace 'exam1018'
create 'exam1018:covid19_world','record'

4.请在 Hive 中创建数据库 exam,在该数据库中创建外部表 ex_exam_record 指向 /app/data/exam 下的疫情数据 ;创建外部表 ex_exam_covid19_record 映射至 HBase 中的 exam:covid19_world 表的 record 列族, ex_exam_record 表结构如下:

字段名称

字段类型

字段含义

id

string

记录 ID

confirmedCount

int

累计确诊人数

confirmedIncr

int

新增确诊人数

recordDate

string

记录时间

countryName

string

国家名

countryShortCode

string

国家代码

continent

string

大洲

ex_exam_covid19_record 表结构如下:

字段名称

字段类型

字段含义

key

string

rowkey

maxIncreaseCountry

string

当日新增确诊人数最多的国家

maxIncreaseCount

int

新增确诊人数

 

create external table ex_exam_record
(
    id string,
    confirmedCount int,
    confirmedIncr int,
    recordDate string,
    countryName string,
    countryShortCode string,
    continent string
)
row format delimited fields terminated by ","
stored as textfile location "/countrydata";
create external table ex_exam_covid19_record(
    key string,
    maxIncreaseCountry string,
    maxIncreaseCount int
)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with
serdeproperties ("hbase.columns.mapping"=":key,record:maxIncreaseCountry,record:maxIncreaseCount")
tblproperties ("hbase.table.name"="exam1018:covid19_world");

5. 使用 ex_exam_record 表中的数据

①统计每个大洲中每日新增确诊人数最多的国家,将 continent recordDate 合并成

rowkey,并保存到 ex_exam_covid19_record 表中。

with
    tb as (
select continent,recordDate,countryName,confirmedIncr,
       rank() over (partition by continent,recordDate order by confirmedIncr desc) as increaseId
from ex_exam_record)
insert into table ex_exam_covid19_record 
	select concat(continent,recordDate) key
		,countryName maxIncreaseCountry
		,confirmedIncr maxIncreaseCount
	from tb where increaseId=1;

完成统计后,在 HBase Shell 中遍历 exam:covid19_world 表中的前 20 条数据。

scan 'exam1018:covid19_world',{LIMIT=>20}


http://www.niftyadmin.cn/n/5099931.html

相关文章

selenium多窗口、多iframe切换、等待、

1、多标签/多窗口之间的切换 场景: 在页面操作过程中有时候点击某个链接会弹出新的窗口,这时就需要切换到新打开的窗口上进行操作。这种情况下,需要识别多标签或窗口的情况。 操作方法: switch_to.window()方法:切换…

微信小程序4

一自定义组件应用 1.介绍 微信小程序自定义组件是指开发者可以自定义组件,将一些常用的 UI 元素封装成一个自定义组件,然后在多个页面中复用该组件,实现代码复用和页面性能优化的效果。 2.自定义组件分为两种类型 组件模板类型:…

百度最强大模型发布,百度网盘和文库的实测体验

🍁 展望:若本篇讲解内容帮助到您,请帮忙点个赞吧, 您的支持是我继续写作的最大动力. 关注我, 带您了解更多 AI 资讯和 AI 小技巧. 引言 2023年百度世界大会在10月17日的春光中于北京的首钢园精彩召开。这次大会的核心主题——“生成未来 PRO…

如何实现线程安全?

简单描述一下线程安全问题:在程序并发执行的过程中,对于临界区的一些共享数据,可能同时会有多个线程对其进行修改,造成数据覆盖、脏读等一系列问题 如何实现线程安全? 首先想到的就是实现线程同步,让并发…

荣耀推送服务业务介绍

概述 荣耀推送服务(HONOR Push)是荣耀公司向开发者提供的消息推送服务,通过服务端与客户端建立一条稳定、可靠的长连接通道,向荣耀手机系统上的APP应用客户端实时推送消息的服务。无论应用进程是否存在,均可正常收到消…

Pika v3.5.1发布!

导读Pika 社区很高兴宣布,我们今天发布已经过我们生产环境验证 v3.5.1 版本,https://github.com/OpenAtomFoundation/pika/releases/tag/v3.5.1 。 该版本不仅做了很多优化工作,还引入了多项新功能。这些新功能包括 动态关闭 WAL、Replicati…

众和策略:多少成交量才算放大?

成交量是股市中非常重要的指标,常常被用于判别商场活跃度及股票价格涨跌起伏等。可是,要想了解一个股票何时扩展,又需求考虑哪些要素呢?在本文中,我们将从多个角度进行剖析,以便更好地了解多少成交量才算扩…

智慧公厕:探索未来城市环境卫生设施建设新标杆

智慧公厕是当代城市建设的一项重要举措,它集先进技术、人性化设计和智能管理于一体,为人们提供更为舒适、便捷和卫生的厕所环境。现代智慧公厕的功能异常丰富,从厕位监测到多媒体信息交互,从自动化清洁到环境调控,每一…