Spark程序完成一些简单指标的计算
需求指标:
五个相关的统计指标
1.统计每个国家在2015的死亡人数并按照死亡人数降序排序
2.统计某个国家在该数据集中每年的死亡人数并按照死亡人数降序排序
3.统计每个国家上传的总数据量筛选前十条进行查看
4.通过虚拟机监听端口模拟发送国家死亡人数的json数据并进行实时处理统计得到关于年份的数据条数
5.
目录
I :统计每个国家在2015的死亡人数并按照死亡人数降序排序
II :统计某个国家在该数据集中每年的死亡人数并按照死亡人数降序排序
IIII :通过虚拟机监听端口模拟发送国家死亡人数的json数据并进行实时处理统计得到关于年份的总数据条数
IIIII :考虑时间单位的不同处理先数据集中挑选时间单位为月的数据,不考虑人民基数写一个自定义函数判断该国家的当前月份人民健康状况(自定)
考虑时间单位的不同处理先数据集中挑选时间单位为月的数据,不考虑人民基数写一个自定义函数判断该国家的当前月份人民健康状况(自定)
数据来源:
爱数科
数据科学科研和教学一体化平台 (idatascience.cn)
数据处理:
从网站中下载的数据为csv格式需要进行转换我们使用CSV转JSON - 在线转换文档文件转成JSON格式的数据集文件方便使用spark处理数据
统计指标
I :统计每个国家在2015的死亡人数并按照死亡人数降序排序
package com.lzzy;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class I {
public static void main(String[] args) throws AnalysisException {
SparkSession ss = SparkSession.builder().appName("I").master("local").getOrCreate();
Dataset<Row> data = ss.read().json("D:\\spark\\Global mortality analysis\\1.json");
// data.show();
data.createTempView("Death");
//统计每个国家在2015的死亡人数并按照死亡人数降序排序
ss.sql("select sum(deaths),country_name,year from Death group by year,country_name order by sum(deaths) desc ")
.where("year = 2015")
.show();
}
}
简单的读取一下数据,创建视图用sql语句进行最后的指标统计:
可以很直观的看到每个国家在2015的死亡人数排名简单排了个序(当然可以把年份更改为查询其他年份每个国家死亡人数)
II :统计某个国家在该数据集中每年的死亡人数并按照死亡人数降序排序
package com.lzzy
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Dataset, Row, SparkSession}
object ll {
def main(args: Array[String]): Unit = {
val ss = SparkSession.builder.appName("II").master("local").getOrCreate
val df = ss.read.json("D:\\spark\\Global mortality analysis\\1.json")
df.createTempView("Death")
//统计某个国家在该数据集中每年的死亡人数并按照死亡人数降序排序
val df1 = ss.sql("select sum(deaths),country_name,year from Death group by year,country_name order by sum(deaths) desc ")
df1.where("country_name = 'Norway'").show()//可直接替换国家名字来完成查询其他国家这里用挪威演示
}
}
这里使用了新学习的Scala语言来处理统计指标
相同的可以可直接替换国家名字来完成查询想看的其他国家的数据这里用Norway(挪威)演示
III :统计每个国家上传的总数据量筛选前十条进行查看
package com.lzzy;
public class Deathdata {
public String getCountry_name() {
return country_name;
}
public void setCountry_name(String country_name) {
this.country_name = country_name;
}
public String getYear() {
return year;
}
public void setYear(String year) {
this.year = year;
}
public Integer getTime() {
return time;
}
public void setTime(Integer time) {
this.time = time;
}
public String getTime_unit() {
return time_unit;
}
public void setTime_unit(String time_unit) {
this.time_unit = time_unit;
}
public double getDeaths() {
return deaths;
}
public void setDeaths(Integer deaths) {
this.deaths = deaths;
}
public Deathdata(String country_name, String year, Integer time, String time_unit, double deaths) {
this.country_name = country_name;
this.year = year;
this.time = time;
this.time_unit = time_unit;
this.deaths = deaths;
}
public Deathdata() {
}
//国家
private String country_name;
//年份
private String year;
//时间
private Integer time;
@Override
public String toString() {
return "Deathdata{" +
"country_name='" + country_name + '\'' +
", year=" + year +
", time=" + time +
", time_unit='" + time_unit + '\'' +
", deaths=" + deaths +
'}';
}
//时间单位
private String time_unit;
//死亡人数
private double deaths;
}
package com.lzzy;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import java.util.List;
import static org.apache.hadoop.yarn.webapp.hamlet.HamletSpec.Media.print;
public class III {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("III").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD<String> srcRdd = sc.textFile("D:\\spark\\Global mortality analysis\\1.json");
JavaRDD<Deathdata >DeathdataRdd = srcRdd.map(new Function<String, Deathdata>() {
@Override
public Deathdata call(String s) throws Exception {
Gson gson = new Gson();
return gson.fromJson(s, Deathdata.class);
}
});
JavaPairRDD<String, Integer> kvRdd = DeathdataRdd.mapToPair(new PairFunction<Deathdata, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Deathdata deathdata) throws Exception {
String country_name = deathdata.getCountry_name();
String key = country_name;
return new Tuple2<>(key, 1);
}
});
JavaPairRDD<String, Iterable<Integer>> gRdd = kvRdd.groupByKey();
JavaRDD<Tuple2<String,Integer>> resRdd = gRdd.map(new Function<Tuple2<String, Iterable<Integer>>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, Iterable<Integer>> t) throws Exception {
String key = t._1;
Iterable<Integer> val = t._2;
int sum = 0;
for (Integer a :val ){
sum += a;
}
return new Tuple2<>(key, sum);
}
});
//统计每个国家上数传的总据量筛选前十条进行查看
List<Tuple2<String, Integer>> take = resRdd.take(10);
for(Tuple2<String,Integer> t : take){
System.out.println(t);
}
sc.stop();
}
}
读取数据,将元素为String类型的RDD转换为Deathdata类型的RDD再将Deathdata类型的RDD转成键值对,key为国家名称,value为1再分组将相同国家的数据合并到一起使用map算子将分组后的键值对转成一个Tuple2对象,key为国家名称,value为这个国家在这个数据集的总数据条数,最后获取十个国家的总数据条数并输出到控制台
IIII :通过虚拟机监听端口模拟发送国家死亡人数的json数据并进行实时处理统计得到关于年份的总数据条数
package com.lzzy;
import com.google.gson.Gson;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
public class IIII {
public static void main(String[] args) throws InterruptedException {
//通过虚拟机监听端口模拟发送国家死亡人数的json数据并进行实时处理统计得到关于年份的数据条数
SparkConf s = new SparkConf().setAppName("IIII").setMaster("local[*]");
JavaStreamingContext jsc = new JavaStreamingContext(s, new Duration(1000*10));
//接收数据
JavaReceiverInputDStream<String> dStream = jsc.socketTextStream("192.168.211.3",9001);
//处理数据
dStream.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String s) throws Exception {
if(null != s && s.length() > 0){
return true;
}
return false;
}
}).map(new Function<String, Deathdata>() {
@Override
public Deathdata call(String s) throws Exception{
return new Gson().fromJson(s, Deathdata.class);
}
}).mapToPair(new PairFunction<Deathdata, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Deathdata deathdata) throws Exception {
return new Tuple2<>(deathdata.getYear(),1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer a, Integer b) throws Exception {
return a+b;
}
}).print();
jsc.start();
jsc.awaitTermination();
jsc.stop();
}
}
模拟数据为:
{"country_name":"Albania","year":2015,"time":4,"time_unit":"monthly","deaths":1906}
{"country_name":"Croatia","year":2016,"time":5,"time_unit":"monthly","deaths":1709}
{"country_name":"Croatia","year":2016,"time":6,"time_unit":"monthly","deaths":1561}
{"country_name":"Ecuador","year":2017,"time":7,"time_unit":"monthly","deaths":2008}
{"country_name":"Ecuador","year":2017,"time":8,"time_unit":"monthly","deaths":1687}
{"country_name":"Ecuador","year":2017,"time":9,"time_unit":"monthly","deaths":1569}
首先是接受数据基于sparkContext对象,设置StreamingContext对象。第二个参数指定每隔多少秒,收集一个批次的数据然后用筛出接收到符合数据需求的数据然后转换为Deathdata类型的RDD
再查出年份key为年份,value为1再分组将相同国家的数据合并到一起最后获取我们需要的年份和关于年份的总数据条数并输出到控制台
IIIII :考虑时间单位的不同处理先数据集中挑选时间单位为月的数据,不考虑人民基数写一个自定义函数判断该国家的当前月份人民健康状况(自定)
package com.lzzy;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;
public class IIIII {
public static void main(String[] args) throws AnalysisException {
SparkSession ss = SparkSession.builder().appName("IIIII").master("local").getOrCreate();
ss.udf().register("dt", new UDF1<Double,String>(){
@Override
public String call(Double deaths) throws Exception{
return deaths >= 2000 ? "差" : "好";
}
}, DataTypes.StringType);
Dataset<Row> data = ss.read().json("D:\\spark\\Global mortality analysis\\1.json");
//考虑时间单位的不同处理先数据集中挑选时间单位为月的数据
Dataset<Row> df = data.where("time_unit = 'monthly'");
df.createTempView("Deathdata");
//(不考虑人民基数)
ss.sql("select *, dt(deaths) as `该国家的当前月份人民健康状况` from Deathdata").show();
ss.stop();
}
}
自定义函数然后用sql语言调用函数插入字段即可
总结:
从Spark的上手到最后的项目,整个过程我一路磕磕绊绊的时常遇到一些奇怪的问题,还好很多问题都能在百度中解决(遇事不决,可问度娘!)
Spark最突出的表现在于它能将作业与作业之间产生的大规模数据集存储在内存中,spark的应用、算子、Scala语言的学习等等很多很多,我也仅仅刚入门,只能说继续努力吧。
最后的最后,感谢老师的耐心教导。