基于Spark的农产品分析系统

本篇文章为大学毕业实习项目-基于Spark的农产品分析系统的文档,从搭建环境到代码编写以及成果展示。

Hadoop环境搭建

集群规划

服务器IP192.168.18.101192.168.18.102192.168.18.103
主机名hadoop01hadoop02hadoop03
NameNode
SecondaryNameNode
dataNode
ResourceManager
NodeManager
1.上传hadoop安装包并解压
tar -zxvf hadoop-2.7.5.tar.gz -C /opt/module/

最后使用集群分发脚本xsync分发到其他服务器

xsync /opt/application/hadoop-2.7.3/

现在,每个服务器都有hadoop安装包,目录为/opt/module,另外hadoop运行需要java环境。

2.配置环境变量
[root@hadoop101 hadoop-2.7.5]# pwd
/opt/application/hadoop-2.7.3
[root@hadoop101 hadoop-2.7.5]# vim /etc/profile
export HADOOP_HOME=/opt/application/hadoop-2.7.3
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
[root@hadoop101 hadoop-2.7.5]# source /etc/profile

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4zlRlZJU-1649489437027)(/Users/guopuwen/Desktop/C:\Users\VSUS\Desktop\笔记\大数据\img\4.png)]

到这,第一台机器的hadoop已经安装完毕,然后使用集群分发脚本将/etc/profile文件分发至各台机器,不要忘记重新加载配置文件

3.修改配置文件

配置文件都在etc/hadoop中

3.1 core-site.xml
<property>
    <name>fs.default.name</name>
    <value>hdfs://hadoop01:8020</value>
</property>

<property>
    <name>hadoop.tmp.dir</name>
    <value>/opt/application/hadoop-2.7.3/hadoopDatas/tempDatas</value>
</property>

<!--  缓冲区大小,实际工作中根据服务器性能动态调整 -->

<property>
    <name>io.file.buffer.size</name>
    <value>4096</value>
</property>

<!--  开启hdfs的垃圾桶机制,删除掉的数据可以从垃圾桶中回收,单位分钟 -->
<property>
    <name>fs.trash.interval</name>
    <value>10080</value>
</property>

3.2 hdfs-site.xml
<property>
    <name>dfs.namenode.secondary.http-address</name>
    <value>hadoop101:50090</value>
</property>

<property>
    <name>dfs.namenode.http-address</name>
    <value>hadoop101:50070</value>
</property>
<property>
    <name>dfs.namenode.name.dir</name>
</property>

<property>
    <name>dfs.datanode.data.dir</name>
    <value>file:///opt/application/hadoop-2.7.3/hadoopDatas/datanodeDatas,file:///opt/application/hadoop-2.7.3/hadoopDatas/datanodeDatas2</value>
</property>

<property>
    <name>dfs.namenode.edits.dir</name>
    <value>file:///opt/application/hadoop-2.7.3/hadoopDatas/nn/edits</value>
</property>

<property>
    <name>dfs.namenode.checkpoint.dir</name>
    <value>file:///opt/application/hadoop-2.7.3/hadoopDatas/snn/name</value>
</property>

<property>
    <name>dfs.namenode.checkpoint.edits.dir</name>
    <value>file:///opt/application/hadoop-2.7.3/hadoopDatas/dfs/snn/edits</value>
</property>

<property>
    <name>dfs.replication</name>
    <value>3</value>
</property>

3.3 hadoop-env.sh
export JAVA_HOME=/opt/application/jdk1.8.0_131
3.4 mapred-site.xml
<property>
    <name>mapreduce.job.ubertask.enable</name>
    <value>true</value>
</property>

<property>
    <name>mapreduce.jobhistory.address</name>
    <value>hadoop101:10020</value>
</property>

<property>
    <name>mapreduce.jobhistory.webapp.address</name>
    <value>hadoop101:19888</value>
</property>

3.5 yarn-site.xml
<property>
		<name>yarn.resourcemanager.hostname</name>
		<value>hadoop101</value>
	</property>
	<property>
		<name>yarn.nodemanager.aux-services</name>
		<value>mapreduce_shuffle</value>
	</property>
	
	<property>
		<name>yarn.log-aggregation-enable</name>
		<value>true</value>
	</property>
	<property>
		<name>yarn.log-aggregation.retain-seconds</name>
		<value>604800</value>
	</property>
	<property>    
		<name>yarn.nodemanager.resource.memory-mb</name>    
		<value>20480</value>
	</property>
	<property>  
        	 <name>yarn.scheduler.minimum-allocation-mb</name>
         	<value>2048</value>
	</property>
	<property>
		<name>yarn.nodemanager.vmem-pmem-ratio</name>
		<value>2.1</value>
	</property>
3.6 mapred-env.sh
export JAVA_HOME=/opt/application/jdk1.8.0_131
3.7 修改slaves
hadoop101
hadoop102
hadoop103      
3.8 创建上面配置文件出现的一些文件
mkdir -p /opt/application/hadoop-2.7.3/hadoopDatas/tempDatas
mkdir -p /opt/application/hadoop-2.7.3/hadoopDatas/namenodeDatas
mkdir -p /opt/application/hadoop-2.7.3/hadoopDatas/namenodeDatas2
mkdir -p /opt/application/hadoop-2.7.3/hadoopDatas/datanodeDatas
mkdir -p /opt/application/hadoop-2.7.3/hadoopDatas/datanodeDatas2
mkdir -p /opt/application/hadoop-2.7.3/hadoopDatas/snn/name
mkdir -p /opt/application/hadoop-2.7.3/hadoopDatas/nn/edits
mkdir -p /opt/application/hadoop-2.7.3/hadoopDatas/dfs/snn/edits
3.9 分发配置文件到其他机器
4. 启动集群
bin/hdfs namenode -format
sbin/start-dfs.sh
sbin/start-yarn.sh

Scala-spark环境搭建

在idea下配置好scala环境,这里只给出相应的pom坐标

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>SparkDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <scala.version>2.12.12</scala.version>
        <spark.version>2.4.5</spark.version>
        <hadoop.version>2.9.2</hadoop.version>
        <encoding>UTF-8</encoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>
    <build>
        <pluginManagement>
            <plugins>
                <!-- 编译scala的插件 -->
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                </plugin>
                <!-- 编译java的插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compilerplugin</artifactId>
                    <version>3.5.1</version>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!-- 打jar插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>METAINF/*.SF</exclude>
                                        <exclude>METAINF/*.DSA</exclude>
                                        <exclude>METAINF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

实例代码:

package cn.noteblogs

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo1 {
  def main(args: Array[String]): Unit = {
    //1、构建sparkConf对象 设置application名称和master地址
    val conf: SparkConf = new
        SparkConf().setAppName("WordCount").setMaster("local[2]")
    //2、构建sparkContext对象,该对象非常重要,它是所有spark程序的执行入口
    // 它内部会构建 DAGScheduler和 TaskScheduler 对象
    val sc = new SparkContext(conf)
    //设置日志输出级别
    sc.setLogLevel("WARN")
    //3、读取数据文件
    val data: RDD[String] = sc.textFile("/Users/guopuwen/IdeaProjects/SparkProject/src/main/java/cn/noteblogs/word.txt")
    //4、 切分每一行,获取所有单词
    val words: RDD[String] = data.flatMap(x => x.split(" "))
    //5、每个单词计为1
    val wordAndOne: RDD[(String, Int)] = words.map(x => (x, 1))
    //6、相同单词出现的1累加
    val result: RDD[(String, Int)] = wordAndOne.reduceByKey((x, y) => x + y)
    //按照单词出现的次数降序排列 第二个参数默认是true表示升序,设置为false表示降序
    val sortedRDD: RDD[(String, Int)] = result.sortBy(x => x._2, false)
    //7、收集数据打印
    val finalResult: Array[(String, Int)] = sortedRDD.collect()
    finalResult.foreach(println)
    //8、关闭sc
    sc.stop()
  }
}

RDD算子基本介绍

功能需求统计

本实训项目将对农产品市场和农产品进行以下的功能统计分析。

(1)农产品市场分析

统计每个省份的农产品市场总数

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 统计每个省份的农产品总数
 */
object MarketAnalysis1 {
  private val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local").setAppName("dataframe"))
  private val productRDD: RDD[String] = sc.textFile("/Users/guopuwen/Documents/项目说明/data/product.txt")
  def main(args: Array[String]): Unit = {
    //1.按照一个或者多个tab键切割(\s匹配任何空白字符,包括空格、制表符、换页符等等)
    val provinceRDD: RDD[(String, String)] = productRDD.filter(_.split("\\s+").length == 6)
      .map(line => {
        val parts: Array[String] = line.split("\\s+")
        (parts(4), parts(3))
      })
    //2.去除重复数据,按照key分组计算
    val result: collection.Map[String, Long] = provinceRDD.distinct()
      .countByKey()

    for ((k, v) <- result) {
      println(s"省份:$k ,数量:$v")
    }
  }
}

统计数据中缺失农产品市场的省份有哪些

/**
 * 统计数据中缺失农产品市场的省份有哪些
 */
object MarketAnalysis2 {
  private val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local").setAppName("dataframe"))
  private val productRDD: RDD[String] = sc.textFile("/Users/guopuwen/Documents/项目说明/data/product.txt")
  private val allProvinceRDD: RDD[String] = sc.textFile("/Users/guopuwen/Documents/项目说明/data/allprovince.txt")
  def main(args: Array[String]): Unit = {
    //1.按照一个或者多个tab键切割(\s匹配任何空白字符,包括空格、制表符、换页符等等)
    val provinceRDD: RDD[String] = productRDD.filter(_.split("\\s+").length == 6)
      .map(line => {
        val parts: Array[String] = line.split("\\s+")
        (parts(4))
      })

    val allProvinceRDDs: RDD[String] = allProvinceRDD.map(line => line.split("\\s+")(0))

    //取差集
    val resRDD: RDD[String] = allProvinceRDDs.subtract(provinceRDD)
    resRDD.foreach(print)
  }
}

根据农产品种类数量,统计每个省份排名前3名的农产品市场

/**
 * 根据农产品种类数量,统计每个省份排名前3名的农产品市场
 * 总体思路:
 * 1、将数据清洗为下列格式:  冬瓜	1.50	2021/1/1	新疆石河子西部绿珠果蔬菜批发市场	北京	朝阳
 *   {
 *   ((山西汾阳市晋阳农副产品批发市场, 山西),香菜),
 *   ((山西汾阳市晋阳农副产品批发市场, 山西), 大葱),
 *   ((北京朝阳区大洋路综合市场, 北京), 黄豆芽)) ,
 *   ((新疆石河子西部绿珠果蔬菜批发市场, 北京), 冬瓜)
 *   }
 * 2、去重,因为是按照种类数量
 * 3、替换
 *  {
 *  ( (山西汾阳市晋阳农副产品批发市场, 山西),1),
 *  ((山西汾阳市晋阳农副产品批发市场, 山西), 1),
 *  ((北京朝阳区大洋路综合市场, 北京), 1)),
 *  ((新疆石河子西部绿珠果蔬菜批发市场, 北京), 1)
 *  }
 * 4、根据省份 和 (批发市场, 省份)进行group分组 最终得到
 *    {
 *      (山西, (山西汾阳市晋阳农副产品批发市场, 山西), 2 ),
 *      (北京, ((北京朝阳区大洋路综合市场, 北京), 1 ), ((新疆石河子西部绿珠果蔬菜批发市场, 北京), 1))
 *    }
 *
 * 5、排序 按照每个省份数据进行排序 得到最终数据
 */
object MarketAnalysis3 {
  private val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local").setAppName("dataframe"))
  private val productRDD: RDD[String] = sc.textFile("/Users/guopuwen/Documents/项目说明/data/product.txt")
  def main(args: Array[String]): Unit = {
    productRDD.filter(_.split("\\s+").length == 6)
      .map(line => {
        val product = line.split("\\s+")
        (product(3), product(4)) -> product(0)
      })
      .distinct()       //  ((北京顺义石门蔬菜批发市场,北京),生姜)
      .map(item => item._1 -> 1)    //((北京顺义石门蔬菜批发市场,北京),1)
      .reduceByKey(_+_)            //
      .groupBy(_._1._2)           // (青海,CompactBuffer(((西宁仁杰粮油批发市场有限公司,青海),3)))
      .flatMap(item => {
        val province = item._1
        val arr = item._2.toBuffer
        arr.sortBy(_._2).reverse.take(3)
          .map(item => (
            province, item._1._1, item._2       // (辽宁,辽宁阜新市瑞轩蔬菜农副产品综合批发市场,53)
          ))
      })
      .foreach(println(_))
  }
}

统计山东和山西两省售卖土豆的农产品市场总数

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 统计 山东 和 山西 两省售卖 土豆 的农产品市场总数
 * 总体思路:
 * 1、筛选出 山东 山西 土豆 的行数
 * 2、根据省份分组
 * 3、得出结果
 *
 */
object MarketAnalysis4 {
  private val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("dataframe"))
  private val productRDD: RDD[String] = sc.textFile("/Users/guopuwen/Documents/项目说明/data/product.txt")
  def main(args: Array[String]): Unit = {
    productRDD.filter(_.split("\\s+").length == 6)
      .map(line => {
        val product = line.split("\\s+")
        (product(4), product(3), product(0))        //
      })
      .distinct()
      .filter(item => {
        (item._1 == "山东" || item._1 == "山西") && item._3 == "土豆"
      })
      .groupBy(_._1)      // (山东,CompactBuffer((山东,山东章丘批发市场,土豆), (山东,山东中国寿光农产品物流园,土豆), (山东,山东青岛城阳蔬菜批发市场,土豆), (山东,山东青岛李沧区沧口蔬菜副食品,土豆), (山东,山东威海市农副产品批发市场,土豆), (山东,山东德州农业蔬菜局,土豆), (山东,山东青岛平度市南村蔬菜批发市场,土豆)))
      .map(item => {
        (item._1, item._2.size)
      })

      .foreach(println(_))
  }
}

(2)农产品分析

统计每个省份的农产品种类总数

/**
 * 统计 统计每个省份的农产品种类总数
 * 总体思路:
 * 1、获取 (省份, 农产品类型) 二元组
 * 2、去重
 * 3、按照省份分组
 * 4、得出结果
 *
 */
object ProductsAnalysis1 {
  private val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("dataframe"))
  private val productRDD: RDD[String] = sc.textFile("/Users/guopuwen/Documents/项目说明/data/product.txt")
  def main(args: Array[String]): Unit = {
    productRDD.filter(_.split("\\s+").length == 6)
      .map(line => {
        val product = line.split("\\s+")
        (product(4), product(0))        //
      })
      .distinct()
      .groupBy(_._1)      //  (青海,CompactBuffer((青海,粳米), (青海,面粉), (青海,玉米)))
      .map(item => {
        (item._1, item._2.toBuffer.size)
      })
      .foreach(println(_))
  }
}

统计哪些农产品市场在售卖樱桃,要标明农产品市场所在省份与城市

/**
 * 统计哪些农产品市场在售卖樱桃,要标明农产品市场所在省份与城市
 * 总体思路:比较简单 略
 *
 */
object ProductsAnalysis2 {
  private val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("dataframe"))
  private val productRDD: RDD[String] = sc.textFile("/Users/guopuwen/Documents/项目说明/data/product.txt")
  def main(args: Array[String]): Unit = {
    productRDD.filter(_.split("\\s+").length == 6)
      .map(line => {
        val product = line.split("\\s+")
        (product(4), product(5), product(3), product(0))        //
      })
      .filter(_._4 == "樱桃")
      .distinct()
      .foreach(println(_))
  }
}

统计山东省售卖蛤蜊的农产品市场占全省农产品市场的比例

/**
 * 统计山东省售卖蛤蜊的农产品市场占全省农产品市场的比例
 * 总体思路: 比较简单 略
 *
 */
object ProductsAnalysis3 {
  private val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("dataframe"))
  private val productRDD: RDD[String] = sc.textFile("/Users/guopuwen/Documents/项目说明/data/product.txt")
  def main(args: Array[String]): Unit = {
    val RDD1: RDD[(String, String, String)] = productRDD.filter(_.split("\\s+").length == 6)
      .map(line => {
        val product = line.split("\\s+")
        (product(4), product(3), product(0))        //
      })
      .distinct()
      .filter(_._1 == "山东")     //(山东,山东威海市农副产品批发市场,橙)

    val allCount = RDD1.count()   //总数
    val geliCount = RDD1.filter(_._3 == "蛤蜊").count()
    println(allCount)
    println(geliCount)
    println("比例为" + (geliCount.toFloat / allCount) * 100 + "%")
//    RDD1.foreach(println)
  }
}

计算山西省的每种农产品的价格波动趋势,即计算价格均值。

/**
 * 计算山西省的每种农产品的价格波动趋势,即计算价格均值。
 * 总体思路:
 *
 */
object ProductsAnalysis4 {
  private val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("dataframe"))
  private val productRDD: RDD[String] = sc.textFile("/Users/guopuwen/Documents/项目说明/data/product.txt")
  def main(args: Array[String]): Unit = {
    val RDD1: RDD[(String, Double)] = productRDD.filter(_.split("\\s+").length == 6)
      .map(line => {
        val product = line.split("\\s+")
        (product(4), product(0), product(1).toDouble)        // (山西 菜品 价格)
      }).filter(_._1.equals("山西"))
      .map(item => item._2 -> item._3)  // ((香菜,2.80))

    RDD1
      .groupByKey()
      .map(t => {
        if (t._2.size>2)
          (t._1, ((t._2.sum - t._2.max - t._2.min)/t._2.size).formatted("%.2f"))
        else
          (t._1, (t._2.sum/t._2.size).formatted("%.2f"))
      })       // 农产品分类
      .foreach(println)

  }
}

统计排名前3的省份共同拥有的农产品类型

/**
 * 统计排名前3的省份共同拥有的农产品类型
 * 理解:根据农产品类型数量排序,得到前三,然后统计共同拥有的农产品类型
 * 1、得到农产品类型排名前三的省份
 * 2、得出该省份的所有去重后的农产品
 * 3、仿照WordCount案例 对每个农产品类型数据进行统计
 * 4、筛选出等于3的农产品,那么这就是共同拥有的    ??? 这是一个不太聪明的办法!按道理应该有方法可以直接得出 但是没有找到
 */
object ProductsAnalysis5 {
  private val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("dataframe"))
  private val productRDD: RDD[String] = sc.textFile("/Users/guopuwen/Documents/项目说明/data/product.txt")
  def main(args: Array[String]): Unit = {
    productRDD.filter(_.split("\\s+").length == 6)
      .map(line => {
        val product = line.split("\\s+")
        (product(4), product(0))        // (山西 农产品类型)
      })
      .distinct()
      .groupBy(_._1)    // (青海,CompactBuffer((青海,粳米), (青海,面粉), (青海,玉米)))
      .map(item => {
        (item._1, item._2.size, item._2)  // (青海, 3, CompactBuffer((青海,粳米), (青海,面粉), (青海,玉米)))
      })
      .sortBy(_._2, false).take(3)
      .flatMap(item => {
        (item._3).map(item => (item._2))
      })
      .map(item => item -> 1)
      .groupBy(_._1)
      .filter(item => item._2.size == 3)
      .foreach(item => {
        println(item._1)
      })
  }
}

根据农产品类型数量,统计排名前5名的省份

/**
 * 根据农产品类型数量,统计排名前5名的省份
 * 参照第5题即可
 */
object ProductsAnalysis6 {
  private val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("dataframe"))
  private val productRDD: RDD[String] = sc.textFile("/Users/guopuwen/Documents/项目说明/data/product.txt")
  def main(args: Array[String]): Unit = {
    productRDD.filter(_.split("\\s+").length == 6)
      .map(line => {
        val product = line.split("\\s+")
        (product(4), product(0))        // (山西 农产品类型)
      })
      .distinct()
      .groupBy(_._1)    // (青海,CompactBuffer((青海,粳米), (青海,面粉), (青海,玉米)))
      .map(item => {
        (item._1, item._2.size, item._2)  // (青海, 3, CompactBuffer((青海,粳米), (青海,面粉), (青海,玉米)))
      })
      .sortBy(_._2, false).take(5)
      .foreach(item => {
        println(item._1)
      })

  }
}

东北三省农产品最高价格降序排列,统计前十名的农产品有哪些

/**
 * 东北三省农产品最高价格降序排列,统计前十名的农产品有哪些 (黑龙江 吉林 辽宁)
 */
object ProductsAnalysis7 {
  private val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("dataframe"))
  private val productRDD: RDD[String] = sc.textFile("/Users/guopuwen/Documents/项目说明/data/product.txt")
  def main(args: Array[String]): Unit = {
    productRDD.filter(_.split("\\s+").length == 6)
      .map(line => {
        val product = line.split("\\s+")
        (product(4), product(0), product(1).toDouble)        // (山西 农产品类型)
      })
      .distinct()
      .filter(item => {
        item._1.equals("黑龙江") || item._1.equals("吉林") || item._1.equals("辽宁")
      })
      .sortBy(_._3, false).take(10)
      .foreach(println)

  }
}