Spark SQL【基于泰坦尼克号生还数据的 Spark 数据分析处理】

前言

        昨天实验课试着做了一个 Spark SQL 小案例,发现好多内容还是没有掌握,以及好多书上没有的内容需要学习。

一、数据准备

csv 文件内容部分数据展示:

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S
2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C
3,1,3,"Heikkinen, Miss. Laina",female,26,0,0,STON/O2. 3101282,7.925,,S
4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35,1,0,113803,53.1,C123,S
5,0,3,"Allen, Mr. William Henry",male,35,0,0,373450,8.05,,S
6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q

字段说明

• PassengerId : 乘客编号。
• Survived : 是否存活,0表示未能存活,1表示存活。
• Pclass : 描述乘客所属的等级,总共分为三等,用1、2、3来描述:1表示高等;2表示中等;3表示低等。
• Name : 乘客姓名。
• Sex : 乘客性别。
• Age : 乘客年龄。
• SibSp : 与乘客同行的兄弟姐妹(Siblings)和配偶(Spouse)数目。
• Parch : 与乘客同行的家长(Parents)和孩子(Children)数目。
• Ticket : 乘客登船所使用的船票编号。
• Fare : 乘客上船的花费。
• Cabin : 乘客所住的船舱。
• Embarked : 乘客上船时的港口,C表示Cherbourg;Q表示Queenstown;S表示Southampton。

二、Spark数据预处理

1、通过读取本地文件生成 DataFrame 对象。

// 创建 SparkSession 对象
    val conf = new SparkConf().setMaster("local[*]").setAppName("practice1")
    val spark = SparkSession.builder()
      .config(conf)
      .getOrCreate()

    // 导入隐式转换相关依赖
    import spark.implicits._

    // 读取csv文件生成 DataFrame 对象
    val df = spark.read.format("csv")
      .option("header","true")
      .option("mode","DROPMALFORMED")
      .load("data/practice1/titanic.csv")

2、修改字段类型

DataFrame 读取进来的都是 StringType 类型,我们需要对部分字段进行修改。

        'withColumn'是一个DataFrame转换函数,用于在现有的DataFrame上添加或替换列。这个函数接收两个参数,第一个是新列的名称,第二个是新列的值。对于新列的值,我们使用 cast 方法将它强制转为一个新的类型。

        cast方法用于将一个数据类型的值转换为另一个数据类型。它可以用于将一种数据类型转换为另一种数据类型,例如将字符串转换为整数或将整数转换为浮点数等。

withColumn 作为一个转换函数会返回一个新的 DataFrame 对象,记得通过变量或常量存储起来。

// 修改字段数据类型
    val md_df = df.withColumn("Pclass", df("Pclass").cast(IntegerType)) // 乘客登记 包括1-2-3三个等级
      .withColumn("Survived", df("Survived").cast(IntegerType)) //是否存活-1存活 0-未能存活
      .withColumn("Age", df("Age").cast(DoubleType)) // 年龄
      .withColumn("SibSp", df("SibSp").cast(IntegerType)) // 乘客的兄弟姐妹和配偶的数量
      .withColumn("Parch", df("Parch").cast(IntegerType)) //乘客的家长和孩子数目
      .withColumn("Fare", df("Fare").cast(DoubleType)) // 上传的花费

3、删除不必要的字段

// 删除不必要的字段
    val df1 = md_df.drop("PassengerId").drop("Name").drop("Ticket").drop("Cabin")

4、缺失值处理

用到的函数:

DSL 语句中的 select、where函数,以及 count 、zip 函数。

涉及到的操作:

RDD 对象转为 DataFrame 对象,这里因为RDD对象的内容是元组,所以可以直接调用 toDF 方法。 

统计缺失值
// 缺失值处理
    val columns: Array[String] = df1.columns  //返回df1的字段组成的数组 Array("字段1","字段2","字段3"...)
    // 通过select方法对字段数组中的每一个字段进行搜索,并通过where方法找出满足列col(字段).isNUll的值的count(个数)
    val missing_cnt: Array[Long] = columns.map(field => df1.select(col(field)).where(col(field).isNull).count())
    // 通过zip方法将两个集合数组合并成一个元组
    val tuples: Array[(Long, String)] = missing_cnt.zip(columns)
    // 把生成的元组读取为RDD对象再转为DataFrame对象
    val result_df: DataFrame = spark.sparkContext.parallelize(tuples).toDF("missing_cnt", "column_name")
    result_df.show()  // 统计缺失值

 统计结果展示:

+-----------+-----------+
|missing_cnt|column_name|
+-----------+-----------+
|          0|   Survived|
|          0|     Pclass|
|          0|        Sex|
|        177|        Age|
|          0|      SibSp|
|          0|      Parch|
|          0|       Fare|
|          2|   Embarked|
+-----------+-----------+
缺失值处理
// 处理缺失值函数
    def meanAge(dataFrame: DataFrame): Double = {
      dataFrame.select("Age")
        .na.drop() //删除 Age 为空的行
        //'round' 函数用于将数字四舍五入到指定的小数位数。'mean' 函数则用于计算一组数值的平均值。
        .agg(round(mean("Age"), 0)) //对'Age'列计算平均值,并保留0位小数,也就是取整
        .first()  //由于agg操作返回的是一个DataFrame,而这个DataFrame只有一行,所以使用first()方法获取这一行。
        .getDouble(0) //从结果行中获取第一个字段(索引为0)的值,并将其转换为Double类型。
    }

处理: 

val df2 = df1.na.fill(Map("Age" -> meanAge(df1), "Embarked" -> "S"))
    df2.show()

处理结果展示:

+--------+------+------+----+-----+-----+-------+--------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|
+--------+------+------+----+-----+-----+-------+--------+
|       0|     3|  male|22.0|    1|    0|   7.25|       S|
|       1|     1|female|38.0|    1|    0|71.2833|       C|
|       1|     3|female|26.0|    0|    0|  7.925|       S|
|       1|     1|female|35.0|    1|    0|   53.1|       S|
|       0|     3|  male|35.0|    0|    0|   8.05|       S|
|       0|     3|  male|30.0|    0|    0| 8.4583|       Q|
|       0|     1|  male|54.0|    0|    0|51.8625|       S|
|       0|     3|  male| 2.0|    3|    1| 21.075|       S|
|       1|     3|female|27.0|    0|    2|11.1333|       S|
|       1|     2|female|14.0|    1|    0|30.0708|       C|
|       1|     3|female| 4.0|    1|    1|   16.7|       S|
|       1|     1|female|58.0|    0|    0|  26.55|       S|
|       0|     3|  male|20.0|    0|    0|   8.05|       S|
|       0|     3|  male|39.0|    1|    5| 31.275|       S|
|       0|     3|female|14.0|    0|    0| 7.8542|       S|
|       1|     2|female|55.0|    0|    0|   16.0|       S|
|       0|     3|  male| 2.0|    4|    1| 29.125|       Q|
|       1|     2|  male|30.0|    0|    0|   13.0|       S|
|       0|     3|female|31.0|    1|    0|   18.0|       S|
|       1|     3|female|30.0|    0|    0|  7.225|       C|
+--------+------+------+----+-----+-----+-------+--------+
only showing top 20 rows

三、Spark 数据分析

1、891人当中,共多少人生还?

// 1.891人当中,共多少人生还?
    val survived_count: DataFrame = df2.groupBy("Survived").count()
    survived_count.show()
//保存结果到本地 
survived_count.coalesce(1).write.option("header","true").csv("output/practice1/survived_count.csv")

运行结果: 

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+

2.不同上船港口生还情况

// 2.不同上船港口生还情况
    val survived_embark = df2.groupBy("Embarked", "Survived").count()
    survived_embark.show()
    survived_embark.coalesce(1).write.option("header","true").csv("data/practice1survived_embark.csv")

运行结果:

+--------+--------+-----+
|Embarked|Survived|count|
+--------+--------+-----+
|       Q|       1|   30|
|       S|       0|  427|
|       S|       1|  219|
|       C|       1|   93|
|       Q|       0|   47|
|       C|       0|   75|
+--------+--------+-----+

3.存活/未存活的男女数量及比例

 // 3.存活/未存活的男女数量及比例
    val survived_sex_count=df2.groupBy("Sex","Survived").count()
    val survived_sex_percent=survived_sex_count.withColumn("percent",format_number(col("count").divide(functions.sum("count").over()).multiply(100),5));
    survived_sex_percent.show()
    survived_sex_percent.coalesce(1).write.option("header", "true").csv("data/practice1/survived_sex_percent.csv")

运行结果:

+------+--------+-----+--------+
|   Sex|Survived|count| percent|
+------+--------+-----+--------+
|  male|       0|  468|52.52525|
|female|       1|  233|26.15039|
|female|       0|   81| 9.09091|
|  male|       1|  109|12.23345|
+------+--------+-----+--------+

4. 不同级别乘客生还人数和占总生还人数的比例

// 4. 不同级别乘客生还人数和占总生还人数的比例
    val survived_df = df2.filter(col("Survived")===1)
    val pclass_survived_count=survived_df.groupBy("Pclass").count()
    val pclass_survived_percent=pclass_survived_count.withColumn("percent",format_number(col("count").divide(functions.sum("count").over()).multiply(100),5));
    pclass_survived_percent.show()
    pclass_survived_percent.coalesce(1).write.option("header", "true").csv("data/practice1/pclass_survived_percent.csv")

运行结果:

+------+-----+--------+
|Pclass|count| percent|
+------+-----+--------+
|     1|  136|39.76608|
|     3|  119|34.79532|
|     2|   87|25.43860|
+------+-----+--------+

5. 有无同行父母/孩子的生还情况

 // 5.有无同行父母/孩子的生还情况
    val df4=df2.withColumn("Parch_label",when(df2("Parch")>0,1).otherwise(0))
    val parch_survived_count=df4.groupBy("Parch_label","Survived").count()
    parch_survived_count.show()
    parch_survived_count.coalesce(1).write.option("header", "true").csv("data/practice1/parch_survived_count.csv")

运行结果:

+-----------+--------+-----+
|Parch_label|Survived|count|
+-----------+--------+-----+
|          1|       0|  104|
|          1|       1|  109|
|          0|       0|  445|
|          0|       1|  233|
+-----------+--------+-----+

6.按照年龄,将乘客划分为未成年人、青年人、中年人和老年人,分析四个群体生还情况

    // 6.按照年龄,将乘客划分为未成年人、青年人、中年人和老年人,分析四个群体生还情况
    val df3=survived_df.withColumn("Age_label",when(df2("Age")<=18,"minor").when(df2("Age")>18 && df2("Age")<=35,"young").when(df2("Age")>35 && df2("Age")<=55,"middle").otherwise("older"))
    val age_survived=df3.groupBy("Age_label","Survived").count()
    age_survived.show()
    age_survived.coalesce(1).write.option("header", "true").csv("data/practice1/age_survived.csv")

运行结果:

+---------+--------+-----+
|Age_label|Survived|count|
+---------+--------+-----+
|    young|       1|  189|
|    older|       1|   12|
|    minor|       1|   70|
|   middle|       1|   71|
+---------+--------+-----+

7. 提取乘客等级和上船费用信息

    // 7.提取乘客等级和上船费用信息
    val sef = Seq("Pclass", "Fare")
    val df5 = df2.select(sef.head, sef.tail: _*)
    df5.show(5)
    df5.coalesce(1).write.option("header", "true").csv("data/practice1/pclass_fare.csv")

运行结果:

+------+-------+
|Pclass|   Fare|
+------+-------+
|     3|   7.25|
|     1|71.2833|
|     3|  7.925|
|     1|   53.1|
|     3|   8.05|
+------+-------+
only showing top 5 rows

四、数据可视化

数据可视化部分打算在学完 R 语言再完成,Python 实现后续更新。