基于Spark的农产品分析系统
本篇文章为大学毕业实习项目-基于Spark的农产品分析系统的文档,从搭建环境到代码编写以及成果展示。
Hadoop环境搭建
集群规划
服务器IP | 192.168.18.101 | 192.168.18.102 | 192.168.18.103 |
---|---|---|---|
主机名 | hadoop01 | hadoop02 | hadoop03 |
NameNode | 是 | 否 | 否 |
SecondaryNameNode | 是 | 否 | 否 |
dataNode | 是 | 是 | 是 |
ResourceManager | 是 | 否 | 否 |
NodeManager | 是 | 是 |
1.上传hadoop安装包并解压
tar -zxvf hadoop-2.7.5.tar.gz -C /opt/module/
最后使用集群分发脚本xsync分发到其他服务器
xsync /opt/application/hadoop-2.7.3/
现在,每个服务器都有hadoop安装包,目录为/opt/module,另外hadoop运行需要java环境。
2.配置环境变量
[root@hadoop101 hadoop-2.7.5]# pwd
/opt/application/hadoop-2.7.3
[root@hadoop101 hadoop-2.7.5]# vim /etc/profile
export HADOOP_HOME=/opt/application/hadoop-2.7.3
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
[root@hadoop101 hadoop-2.7.5]# source /etc/profile
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4zlRlZJU-1649489437027)(/Users/guopuwen/Desktop/C:\Users\VSUS\Desktop\笔记\大数据\img\4.png)]
到这,第一台机器的hadoop已经安装完毕,然后使用集群分发脚本将/etc/profile文件分发至各台机器,不要忘记重新加载配置文件
3.修改配置文件
配置文件都在etc/hadoop中
3.1 core-site.xml
<property>
<name>fs.default.name</name>
<value>hdfs://hadoop01:8020</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/application/hadoop-2.7.3/hadoopDatas/tempDatas</value>
</property>
<!-- 缓冲区大小,实际工作中根据服务器性能动态调整 -->
<property>
<name>io.file.buffer.size</name>
<value>4096</value>
</property>
<!-- 开启hdfs的垃圾桶机制,删除掉的数据可以从垃圾桶中回收,单位分钟 -->
<property>
<name>fs.trash.interval</name>
<value>10080</value>
</property>
3.2 hdfs-site.xml
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>hadoop101:50090</value>
</property>
<property>
<name>dfs.namenode.http-address</name>
<value>hadoop101:50070</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:///opt/application/hadoop-2.7.3/hadoopDatas/datanodeDatas,file:///opt/application/hadoop-2.7.3/hadoopDatas/datanodeDatas2</value>
</property>
<property>
<name>dfs.namenode.edits.dir</name>
<value>file:///opt/application/hadoop-2.7.3/hadoopDatas/nn/edits</value>
</property>
<property>
<name>dfs.namenode.checkpoint.dir</name>
<value>file:///opt/application/hadoop-2.7.3/hadoopDatas/snn/name</value>
</property>
<property>
<name>dfs.namenode.checkpoint.edits.dir</name>
<value>file:///opt/application/hadoop-2.7.3/hadoopDatas/dfs/snn/edits</value>
</property>
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
3.3 hadoop-env.sh
export JAVA_HOME=/opt/application/jdk1.8.0_131
3.4 mapred-site.xml
<property>
<name>mapreduce.job.ubertask.enable</name>
<value>true</value>
</property>
<property>
<name>mapreduce.jobhistory.address</name>
<value>hadoop101:10020</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>hadoop101:19888</value>
</property>
3.5 yarn-site.xml
<property>
<name>yarn.resourcemanager.hostname</name>
<value>hadoop101</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>20480</value>
</property>
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>2048</value>
</property>
<property>
<name>yarn.nodemanager.vmem-pmem-ratio</name>
<value>2.1</value>
</property>
3.6 mapred-env.sh
export JAVA_HOME=/opt/application/jdk1.8.0_131
3.7 修改slaves
hadoop101
hadoop102
hadoop103
3.8 创建上面配置文件出现的一些文件
mkdir -p /opt/application/hadoop-2.7.3/hadoopDatas/tempDatas
mkdir -p /opt/application/hadoop-2.7.3/hadoopDatas/namenodeDatas
mkdir -p /opt/application/hadoop-2.7.3/hadoopDatas/namenodeDatas2
mkdir -p /opt/application/hadoop-2.7.3/hadoopDatas/datanodeDatas
mkdir -p /opt/application/hadoop-2.7.3/hadoopDatas/datanodeDatas2
mkdir -p /opt/application/hadoop-2.7.3/hadoopDatas/snn/name
mkdir -p /opt/application/hadoop-2.7.3/hadoopDatas/nn/edits
mkdir -p /opt/application/hadoop-2.7.3/hadoopDatas/dfs/snn/edits
3.9 分发配置文件到其他机器
4. 启动集群
bin/hdfs namenode -format
sbin/start-dfs.sh
sbin/start-yarn.sh
Scala-spark环境搭建
在idea下配置好scala环境,这里只给出相应的pom坐标
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>SparkDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.12.12</scala.version>
<spark.version>2.4.5</spark.version>
<hadoop.version>2.9.2</hadoop.version>
<encoding>UTF-8</encoding>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<!-- 编译scala的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<!-- 编译java的插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compilerplugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>METAINF/*.SF</exclude>
<exclude>METAINF/*.DSA</exclude>
<exclude>METAINF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
实例代码:
package cn.noteblogs
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Demo1 {
def main(args: Array[String]): Unit = {
//1、构建sparkConf对象 设置application名称和master地址
val conf: SparkConf = new
SparkConf().setAppName("WordCount").setMaster("local[2]")
//2、构建sparkContext对象,该对象非常重要,它是所有spark程序的执行入口
// 它内部会构建 DAGScheduler和 TaskScheduler 对象
val sc = new SparkContext(conf)
//设置日志输出级别
sc.setLogLevel("WARN")
//3、读取数据文件
val data: RDD[String] = sc.textFile("/Users/guopuwen/IdeaProjects/SparkProject/src/main/java/cn/noteblogs/word.txt")
//4、 切分每一行,获取所有单词
val words: RDD[String] = data.flatMap(x => x.split(" "))
//5、每个单词计为1
val wordAndOne: RDD[(String, Int)] = words.map(x => (x, 1))
//6、相同单词出现的1累加
val result: RDD[(String, Int)] = wordAndOne.reduceByKey((x, y) => x + y)
//按照单词出现的次数降序排列 第二个参数默认是true表示升序,设置为false表示降序
val sortedRDD: RDD[(String, Int)] = result.sortBy(x => x._2, false)
//7、收集数据打印
val finalResult: Array[(String, Int)] = sortedRDD.collect()
finalResult.foreach(println)
//8、关闭sc
sc.stop()
}
}
RDD算子基本介绍
功能需求统计
本实训项目将对农产品市场和农产品进行以下的功能统计分析。
(1)农产品市场分析
统计每个省份的农产品市场总数
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 统计每个省份的农产品总数
*/
object MarketAnalysis1 {
private val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local").setAppName("dataframe"))
private val productRDD: RDD[String] = sc.textFile("/Users/guopuwen/Documents/项目说明/data/product.txt")
def main(args: Array[String]): Unit = {
//1.按照一个或者多个tab键切割(\s匹配任何空白字符,包括空格、制表符、换页符等等)
val provinceRDD: RDD[(String, String)] = productRDD.filter(_.split("\\s+").length == 6)
.map(line => {
val parts: Array[String] = line.split("\\s+")
(parts(4), parts(3))
})
//2.去除重复数据,按照key分组计算
val result: collection.Map[String, Long] = provinceRDD.distinct()
.countByKey()
for ((k, v) <- result) {
println(s"省份:$k ,数量:$v")
}
}
}
统计数据中缺失农产品市场的省份有哪些
/**
* 统计数据中缺失农产品市场的省份有哪些
*/
object MarketAnalysis2 {
private val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local").setAppName("dataframe"))
private val productRDD: RDD[String] = sc.textFile("/Users/guopuwen/Documents/项目说明/data/product.txt")
private val allProvinceRDD: RDD[String] = sc.textFile("/Users/guopuwen/Documents/项目说明/data/allprovince.txt")
def main(args: Array[String]): Unit = {
//1.按照一个或者多个tab键切割(\s匹配任何空白字符,包括空格、制表符、换页符等等)
val provinceRDD: RDD[String] = productRDD.filter(_.split("\\s+").length == 6)
.map(line => {
val parts: Array[String] = line.split("\\s+")
(parts(4))
})
val allProvinceRDDs: RDD[String] = allProvinceRDD.map(line => line.split("\\s+")(0))
//取差集
val resRDD: RDD[String] = allProvinceRDDs.subtract(provinceRDD)
resRDD.foreach(print)
}
}
根据农产品种类数量,统计每个省份排名前3名的农产品市场
/**
* 根据农产品种类数量,统计每个省份排名前3名的农产品市场
* 总体思路:
* 1、将数据清洗为下列格式: 冬瓜 1.50 2021/1/1 新疆石河子西部绿珠果蔬菜批发市场 北京 朝阳
* {
* ((山西汾阳市晋阳农副产品批发市场, 山西),香菜),
* ((山西汾阳市晋阳农副产品批发市场, 山西), 大葱),
* ((北京朝阳区大洋路综合市场, 北京), 黄豆芽)) ,
* ((新疆石河子西部绿珠果蔬菜批发市场, 北京), 冬瓜)
* }
* 2、去重,因为是按照种类数量
* 3、替换
* {
* ( (山西汾阳市晋阳农副产品批发市场, 山西),1),
* ((山西汾阳市晋阳农副产品批发市场, 山西), 1),
* ((北京朝阳区大洋路综合市场, 北京), 1)),
* ((新疆石河子西部绿珠果蔬菜批发市场, 北京), 1)
* }
* 4、根据省份 和 (批发市场, 省份)进行group分组 最终得到
* {
* (山西, (山西汾阳市晋阳农副产品批发市场, 山西), 2 ),
* (北京, ((北京朝阳区大洋路综合市场, 北京), 1 ), ((新疆石河子西部绿珠果蔬菜批发市场, 北京), 1))
* }
*
* 5、排序 按照每个省份数据进行排序 得到最终数据
*/
object MarketAnalysis3 {
private val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local").setAppName("dataframe"))
private val productRDD: RDD[String] = sc.textFile("/Users/guopuwen/Documents/项目说明/data/product.txt")
def main(args: Array[String]): Unit = {
productRDD.filter(_.split("\\s+").length == 6)
.map(line => {
val product = line.split("\\s+")
(product(3), product(4)) -> product(0)
})
.distinct() // ((北京顺义石门蔬菜批发市场,北京),生姜)
.map(item => item._1 -> 1) //((北京顺义石门蔬菜批发市场,北京),1)
.reduceByKey(_+_) //
.groupBy(_._1._2) // (青海,CompactBuffer(((西宁仁杰粮油批发市场有限公司,青海),3)))
.flatMap(item => {
val province = item._1
val arr = item._2.toBuffer
arr.sortBy(_._2).reverse.take(3)
.map(item => (
province, item._1._1, item._2 // (辽宁,辽宁阜新市瑞轩蔬菜农副产品综合批发市场,53)
))
})
.foreach(println(_))
}
}
统计山东和山西两省售卖土豆的农产品市场总数
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 统计 山东 和 山西 两省售卖 土豆 的农产品市场总数
* 总体思路:
* 1、筛选出 山东 山西 土豆 的行数
* 2、根据省份分组
* 3、得出结果
*
*/
object MarketAnalysis4 {
private val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("dataframe"))
private val productRDD: RDD[String] = sc.textFile("/Users/guopuwen/Documents/项目说明/data/product.txt")
def main(args: Array[String]): Unit = {
productRDD.filter(_.split("\\s+").length == 6)
.map(line => {
val product = line.split("\\s+")
(product(4), product(3), product(0)) //
})
.distinct()
.filter(item => {
(item._1 == "山东" || item._1 == "山西") && item._3 == "土豆"
})
.groupBy(_._1) // (山东,CompactBuffer((山东,山东章丘批发市场,土豆), (山东,山东中国寿光农产品物流园,土豆), (山东,山东青岛城阳蔬菜批发市场,土豆), (山东,山东青岛李沧区沧口蔬菜副食品,土豆), (山东,山东威海市农副产品批发市场,土豆), (山东,山东德州农业蔬菜局,土豆), (山东,山东青岛平度市南村蔬菜批发市场,土豆)))
.map(item => {
(item._1, item._2.size)
})
.foreach(println(_))
}
}
(2)农产品分析
统计每个省份的农产品种类总数
/**
* 统计 统计每个省份的农产品种类总数
* 总体思路:
* 1、获取 (省份, 农产品类型) 二元组
* 2、去重
* 3、按照省份分组
* 4、得出结果
*
*/
object ProductsAnalysis1 {
private val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("dataframe"))
private val productRDD: RDD[String] = sc.textFile("/Users/guopuwen/Documents/项目说明/data/product.txt")
def main(args: Array[String]): Unit = {
productRDD.filter(_.split("\\s+").length == 6)
.map(line => {
val product = line.split("\\s+")
(product(4), product(0)) //
})
.distinct()
.groupBy(_._1) // (青海,CompactBuffer((青海,粳米), (青海,面粉), (青海,玉米)))
.map(item => {
(item._1, item._2.toBuffer.size)
})
.foreach(println(_))
}
}
统计哪些农产品市场在售卖樱桃,要标明农产品市场所在省份与城市
/**
* 统计哪些农产品市场在售卖樱桃,要标明农产品市场所在省份与城市
* 总体思路:比较简单 略
*
*/
object ProductsAnalysis2 {
private val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("dataframe"))
private val productRDD: RDD[String] = sc.textFile("/Users/guopuwen/Documents/项目说明/data/product.txt")
def main(args: Array[String]): Unit = {
productRDD.filter(_.split("\\s+").length == 6)
.map(line => {
val product = line.split("\\s+")
(product(4), product(5), product(3), product(0)) //
})
.filter(_._4 == "樱桃")
.distinct()
.foreach(println(_))
}
}
统计山东省售卖蛤蜊的农产品市场占全省农产品市场的比例
/**
* 统计山东省售卖蛤蜊的农产品市场占全省农产品市场的比例
* 总体思路: 比较简单 略
*
*/
object ProductsAnalysis3 {
private val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("dataframe"))
private val productRDD: RDD[String] = sc.textFile("/Users/guopuwen/Documents/项目说明/data/product.txt")
def main(args: Array[String]): Unit = {
val RDD1: RDD[(String, String, String)] = productRDD.filter(_.split("\\s+").length == 6)
.map(line => {
val product = line.split("\\s+")
(product(4), product(3), product(0)) //
})
.distinct()
.filter(_._1 == "山东") //(山东,山东威海市农副产品批发市场,橙)
val allCount = RDD1.count() //总数
val geliCount = RDD1.filter(_._3 == "蛤蜊").count()
println(allCount)
println(geliCount)
println("比例为" + (geliCount.toFloat / allCount) * 100 + "%")
// RDD1.foreach(println)
}
}
计算山西省的每种农产品的价格波动趋势,即计算价格均值。
/**
* 计算山西省的每种农产品的价格波动趋势,即计算价格均值。
* 总体思路:
*
*/
object ProductsAnalysis4 {
private val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("dataframe"))
private val productRDD: RDD[String] = sc.textFile("/Users/guopuwen/Documents/项目说明/data/product.txt")
def main(args: Array[String]): Unit = {
val RDD1: RDD[(String, Double)] = productRDD.filter(_.split("\\s+").length == 6)
.map(line => {
val product = line.split("\\s+")
(product(4), product(0), product(1).toDouble) // (山西 菜品 价格)
}).filter(_._1.equals("山西"))
.map(item => item._2 -> item._3) // ((香菜,2.80))
RDD1
.groupByKey()
.map(t => {
if (t._2.size>2)
(t._1, ((t._2.sum - t._2.max - t._2.min)/t._2.size).formatted("%.2f"))
else
(t._1, (t._2.sum/t._2.size).formatted("%.2f"))
}) // 农产品分类
.foreach(println)
}
}
统计排名前3的省份共同拥有的农产品类型
/**
* 统计排名前3的省份共同拥有的农产品类型
* 理解:根据农产品类型数量排序,得到前三,然后统计共同拥有的农产品类型
* 1、得到农产品类型排名前三的省份
* 2、得出该省份的所有去重后的农产品
* 3、仿照WordCount案例 对每个农产品类型数据进行统计
* 4、筛选出等于3的农产品,那么这就是共同拥有的 ??? 这是一个不太聪明的办法!按道理应该有方法可以直接得出 但是没有找到
*/
object ProductsAnalysis5 {
private val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("dataframe"))
private val productRDD: RDD[String] = sc.textFile("/Users/guopuwen/Documents/项目说明/data/product.txt")
def main(args: Array[String]): Unit = {
productRDD.filter(_.split("\\s+").length == 6)
.map(line => {
val product = line.split("\\s+")
(product(4), product(0)) // (山西 农产品类型)
})
.distinct()
.groupBy(_._1) // (青海,CompactBuffer((青海,粳米), (青海,面粉), (青海,玉米)))
.map(item => {
(item._1, item._2.size, item._2) // (青海, 3, CompactBuffer((青海,粳米), (青海,面粉), (青海,玉米)))
})
.sortBy(_._2, false).take(3)
.flatMap(item => {
(item._3).map(item => (item._2))
})
.map(item => item -> 1)
.groupBy(_._1)
.filter(item => item._2.size == 3)
.foreach(item => {
println(item._1)
})
}
}
根据农产品类型数量,统计排名前5名的省份
/**
* 根据农产品类型数量,统计排名前5名的省份
* 参照第5题即可
*/
object ProductsAnalysis6 {
private val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("dataframe"))
private val productRDD: RDD[String] = sc.textFile("/Users/guopuwen/Documents/项目说明/data/product.txt")
def main(args: Array[String]): Unit = {
productRDD.filter(_.split("\\s+").length == 6)
.map(line => {
val product = line.split("\\s+")
(product(4), product(0)) // (山西 农产品类型)
})
.distinct()
.groupBy(_._1) // (青海,CompactBuffer((青海,粳米), (青海,面粉), (青海,玉米)))
.map(item => {
(item._1, item._2.size, item._2) // (青海, 3, CompactBuffer((青海,粳米), (青海,面粉), (青海,玉米)))
})
.sortBy(_._2, false).take(5)
.foreach(item => {
println(item._1)
})
}
}
东北三省农产品最高价格降序排列,统计前十名的农产品有哪些
/**
* 东北三省农产品最高价格降序排列,统计前十名的农产品有哪些 (黑龙江 吉林 辽宁)
*/
object ProductsAnalysis7 {
private val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("dataframe"))
private val productRDD: RDD[String] = sc.textFile("/Users/guopuwen/Documents/项目说明/data/product.txt")
def main(args: Array[String]): Unit = {
productRDD.filter(_.split("\\s+").length == 6)
.map(line => {
val product = line.split("\\s+")
(product(4), product(0), product(1).toDouble) // (山西 农产品类型)
})
.distinct()
.filter(item => {
item._1.equals("黑龙江") || item._1.equals("吉林") || item._1.equals("辽宁")
})
.sortBy(_._3, false).take(10)
.foreach(println)
}
}