spark案例解析(全国农产品市场与省份)
需求
(一) 数据描述
1、数据参数
该数据每日进行采集汇总。数据范围涵盖全国主要省份(港澳台西藏海南暂无数
据)的 180+的大型农产品批发市场,380+的农产品品类(由于季节性和地域性
等特点,每日的数据中不一定会涵盖全部的农产品品类)。
2、数据类型

(二)功能需求
1、农产品市场个数统计
- 统计每个省份的农产品市场总数
- 统计没有农产品市场的省份有哪些
2、农产品种类统计
- 根据农产品类型数量,统计排名前 3 名的省份
- 根据农产品类型数量,统计每个省份排名前 3 名的农产品市场
3、价格区间统计,
- 计算山西省的每种农产品的价格波动趋势,即计算每天价格均值。
- 某种农产品的价格均值计算公式:
PAVG = (PM1+PM2+…+PMn-max§-min§)/(N-2)
其中,P 表示价格,Mn 表示 market,即农产品市场。PM1 表示 M1 农产品
市场的该产品价格,max§表示价格最大值,min§价格最小值。
数据展示
allprovince.txt
河北
山西
辽宁
吉林
黑龙江
江苏
浙江
安徽
福建
江西
山东
河南
...
product.txt
生菜 2.00 2018/1/1 山西汾阳市晋阳农副产品批发市场 山西 汾阳
芹菜 2.40 2018/1/1 山西汾阳市晋阳农副产品批发市场 山西 汾阳
菜花 3.80 2018/1/1 北京朝阳区大洋路综合市场 北京 朝阳
生姜 10.00 2018/1/1 北京朝阳区大洋路综合市场 北京 朝阳
山药 8.00 2018/1/1 北京朝阳区大洋路综合市场 北京 朝阳
芋头 5.50 2018/1/1 北京朝阳区大洋路综合市场 北京 朝阳
小葱 1.50 2018/1/1 北京朝阳区大洋路综合市场 北京 朝阳
...
用RDD算子解法:
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.junit
/**
* Description :
* CreateTime : 2019/10/916:41
*
* @author TuYouXian
* @since JDK1.8
*/
class Test {
val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
val source: RDD[String] = sc.textFile("data/product.txt")
/**
* 统计每个省份农产品批次市场个数
* 已有数据:省份 市场名称
* 目标数据:省份 市场数量
*/
@junit.Test
def product1(): Unit ={
//1、读取数据
//2、去重、过滤、列裁剪
val result: collection.Map[String, Long] = source
.filter(_.split("\t").size >= 5)
.map(line =>{
val arr: Array[String] = line.split("\t")
val province = arr(4)
val product = arr(3)
province-> product
})
.distinct()
//3、数据处理
//.map(item => item._1 -> 1).reduceByKey(_+_).collect()
//.groupByKey().map(item => item._1->item._2.size)
.countByKey()
//4、结果展示
println(result)
}
/**
* 统计没有农产品的省份有哪些
* 已知数据:有农产品省份
* 目标数据:无农产品省份
*/
@junit.Test
def product2(): Unit ={
val allprovinces = sc.textFile("data/allprovince.txt")
val provinces = source
.filter(_.split("\t").size >= 5)
.map(line => {
val arr = line.split("\t")
val province = arr(4)
province
})
.distinct()
val noprovinces: Array[String] = allprovinces.subtract(provinces).collect()
println(noprovinces.toBuffer)
}
/**
* 根据农产品的类型数量,统计排名前三的省份
* 已有数据:农产品类型,省份
* 中间数据:农产品类型数量,省份
* 目标数据:排名前三省份
*/
@junit.Test
def product3(): Unit ={
val result: Array[(String, Int)] = source
.filter(_.split("\t").size >= 5)
.map(line => {
val arr: Array[String] = line.split("\t")
val province = arr(4)
val productType = arr(0)
province -> productType
})
.distinct()
.groupByKey()
.map(item => item._1 -> item._2.size)
.sortBy(_._2, false)
.take(3)
println(result.toBuffer)
}
/**
* 根据农产品类型数量,统计每个省份排名前三的批发市场
* 已有数据:农产品类型,省份,批发市场
* 中间数据:(省份,((省份,批发市场),农产品类型数量))
* 目标数据:(省份,批发市场,农产品类型数量)前三数据
* 也就是要根据省份分区,求每个分区的前三
*/
@junit.Test
def product4(): Unit ={
source
.filter(_.split("\t").size >= 5)
.map(line =>{
val arr = line.split("\t")
val province = arr(4)
val product = arr(3)
val productType = arr(0)
((province,product),productType)
})
.distinct()
.map(item => item._1 -> 1)
.reduceByKey(_+_)
.groupBy(_._1._1)//(省份,[((省份,批发市场),农产品类型数量),((省份,批发市场),农产品类型数量)])
.flatMap(item =>{//需要压扁操作
val province = item._1
//val arr = item._2.toArray
val arr = item._2.toBuffer
arr.sortBy(_._2).reverse.take(3)
.map(item => (province,item._1._2,item._2))
})
.foreach(println(_))
}
/**
* 计算山西省的每种农产品的价格波动趋势,即计算每天价格均值
* PAVG = (PM1+PM2+...+PMn-max(P)-min(P))/(N-2)
* 其中,P 表示价格,Mn 表示 market,即农产品市场。PM1 表示 M1 农产品
* 市场的该产品价格,max(P)表示价格最大值,min(P)价格最小值。
* 已知数据:农产品类型,批发价格
* 目标数据:农产品类型,价格均值
* 注意有部分农产品的数量并不能满足上面公式可能会出NAN,无穷数等结果
*/
@junit.Test
def product5(): Unit ={
//1、过滤 去重 列裁剪
source.filter(item=> item.split("\t").size>=5 && item.split("\t")(4)=="山西")
.map(item=>{
val arr = item.split("\t")
val name = arr(0)
val price = arr(1).toDouble//避免后续出现1/3 = 0的情况
(name,price)
})
//2、分组
//(面粉,CompactBuffer(3.44, 2.9, 3.44))
//(大葱,CompactBuffer(2.8, 2.6, 2.4, 3.0, 3.1, 2.8, 3.0, 2.5))
.groupByKey()
//.foreach(println(_))
.map(item=>{
val data: Iterable[Double] = item._2
val maxprice = data.max
val minprice = data.min
val sumPrice = data.sum
val size = data.size
val avgPrice = (sumPrice-maxprice-minprice)/(size-2)
(item._1,avgPrice)
})
//3、计算平均价格
.foreach(println(_))
//4、结果
}
}