广电用户画像分析之根据客户消费内容添加标签

本篇博客将介绍如何添加用户消费标签和用户消费等级标签.
建议阅读之前先阅读数据预处理的博客再读本篇博客.
相关前提:
https://blog.csdn.net/kilig_CSM/article/details/131299347?spm=1001.2014.3001.5501

根据客户消费内容添加标签

该类的目的是创建一个消费者标签(Consumer Label)。它通过读取名为mmconsume_billevents的表中的数据,根据字段"phone_no"和"fee_code"进行去重,然后使用自定义的函数consumerLabel为每个消费者分配一个标签,最后将结果保存到名为consumerLabel的表中。

思路:

首先,创建一个SparkSession对象,设置应用程序名称、Master地址,并启用Hive支持。

根据设定的日志级别,将SparkContext的日志级别设置为"WARN",以减少输出的日志信息。

从名为processData.mmconsume_billevents的表中选择"phone_no"和"fee_code"两列,并对其进行去重操作,得到一个新的DataFrame对象billevents。

定义了一个自定义函数consumerLabel,用于根据fee_code的不同值分配消费者标签。

使用consumerLabel函数,将billevents DataFrame中的"fee_code"列作为输入,通过withColumn方法添加名为"consumerLabel"的新列,该列的值通过应用consumerLabel函数来计算。

最后,将结果以"overwrite"模式保存到名为userPrint.consumerLabel的表中。

总体而言,该代码通过对mmconsume_billevents表的数据进行处理和转换,生成了一个包含消费者标签的新表consumerLabel,用于进一步分析和使用消费者数据。

完整代码

package code.userprint

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object ConsumerLabel {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")
    val spark = SparkSession.builder().appName("Process")
      .master("local[*]")
      .enableHiveSupport()
      .getOrCreate()
    //
    spark.sparkContext.setLogLevel("WARN")

    // 根据字段  "phone_no","fee_code" 去重
    val billevents = spark.table("processData.mmconsume_billevents").select("phone_no", "fee_code").distinct()

    val consumerLabelUdf = udf((x: String) => consumerLabel(x))

    billevents
      .withColumn("consumerLabel", consumerLabelUdf(col("fee_code")))
      .write
      .mode("overwrite")
      .saveAsTable("userPrint.consumerLabel")

  }
  /**
    * 直播 0J 0B 0Y
    * 应用 0X
    * 付费频道 0T
    * 宽带 0W 0L 0Z 0K
    * 点播 0D
    * 回看 0H
    * 有线电视收视费 0U
    *
    */
  def consumerLabel(fee_code: String):String = {
    if (fee_code == "0J" || fee_code == "0B" || fee_code == "0Y") {
      "直播"
    } else if (fee_code == "0X") {
      "应用"
    }
    else if (fee_code == "0T") {
      "付费频道"
    }
    else if (fee_code == "0D") {
      "点播"
    }
    else if (fee_code == "0H") {
      "回看"
    } else {
      "宽带"
    }
  }

}

核心代码:

val billevents = spark.table("processData.mmconsume_billevents").select("phone_no", "fee_code").distinct()

val consumerLabelUdf = udf((x: String) => consumerLabel(x))

billevents

  .withColumn("consumerLabel", consumerLabelUdf(col("fee_code")))

  .write

  .mode("overwrite")

  .saveAsTable("userPrint.consumerLabel")

根据客户消费等级添加标签

该类的核心目的是根据用户在过去三个月内的消费情况,计算其电视消费级别和宽带消费级别,并将结果存储在新的DataFrame中供后续分析和使用.

思路:

从表processData.mmconsume_billevents中筛选出sm_name不等于’珠江宽频’的记录,得到一个新的DataFrame对象billevents。

计算最大月份max_month,然后通过add_months函数将最大月份往前推3个月,得到起始时间beforeTime。

从billevents中筛选出近三个月的数据,并按phone_no进行分组。使用agg函数计算should_pay - favour_fee的平均值,将结果存储在新列avgPay中。

定义了两个UDF函数TVLevel和BDCLevel,用于根据平均消费金额给用户标记电视消费级别和宽带消费级别。

将TVLevel函数应用于monthThreeData DataFrame中的avgPay列,创建名为TVLevel的新列。

将BDCLevel函数应用于monthThreeData DataFrame中的avgPay列,创建名为BCDLevel的新列。

打印结果,显示包含电视消费级别的monthThreeDataWithTVLevel DataFrame和包含宽带消费级别的monthThreeDataWithBCDLevel DataFrame。

完整代码

package code.userprint

import org.apache.spark.sql.SparkSession

object ConsumerLevel {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("Process")
      .master("local[*]")
      .enableHiveSupport()
      .getOrCreate()
    //
    spark.sparkContext.setLogLevel("WARN")

    // 筛选 sm_name != '珠江宽频'
    val billevents = spark.table("processData.mmconsume_billevents")
      .filter("sm_name == '珠江宽频'")

    /** 求三个月内的数据
      * 1.求出最大月份 2018-07-01 00:00:00
      * 2.最大月份-3 = 最大月份前三个月的初始时间 使用add_months()
      * 3.对于全局数据进行筛选  year_month > beforeTime
      */
    import org.apache.spark.sql.functions._
     billevents.selectExpr("max(year_month) as max_month")
    .selectExpr("add_months(max_month,-3) as beforeTime").show() //2018-04-01
//      .withColumn("开始时间点",add_months(col("year_month"),-3)) // 效果等同于上一句

    // 筛选出近三个月的数据,根据 phone_no 分组,使用 agg 计算 should_pay - favour_fee 的平均值
    val monthThreeData = billevents.filter(" year_month > '2018-04-01'")
    .groupBy("phone_no").agg(mean(col("should_pay") - col("favour_fee")))
    .withColumnRenamed("avg((should_pay - favour_fee))","avgPay")
    monthThreeData.show()
    // 电视消费标签构建 构建时 sm_name != '珠江宽频'
    val TVLevelUdf = udf((x:Double)=>TVLevel(x))
    monthThreeData.withColumn("TVLevel",TVLevelUdf(col("avgPay"))).show()
//    .write.mode("overwrite").saveAsTable("userPrint.TVLevel")
    // 宽带消费标签构建  构建时 sm_name == '珠江宽频'
    val BCDlevelUdf = udf((x:Double)=>BDCLevel(x))
    monthThreeData.withColumn("BCDlevel",BCDlevelUdf(col("avgPay"))).show()
//      .write.mode("overwrite").saveAsTable("userPrint.BCDLevel")


  }
  def TVLevel(pay:Double): String ={
    if (-26.5<pay && pay<26.5){
      "电视超低消费"
    }else if (pay >= 26.5 && pay < 46.5){
      "电视低消费"
    }else if (pay >= 26.5 && pay < 46.5){
      "电视中等消费"
    }else{
      "电视高消费"
    }
  }
  def BDCLevel(pay:Double): String ={
    if (pay<=20){
      "宽带低消费"
    }else if(pay>20 && pay<=50){
      "宽带中消费"
    }else{
      "宽带高消费"
    }
  }
}

核心代码:

// 筛选 sm_name != '珠江宽频'

val billevents = spark.table("processData.mmconsume_billevents")

  .filter("sm_name == '珠江宽频'")

// 求三个月内的数据

val maxMonth = billevents.selectExpr("max(year_month) as max_month")

val beforeTime = maxMonth.selectExpr("add_months(max_month, -3) as beforeTime").show()

// 筛选出近三个月的数据,根据 phone_no 分组,使用 agg 计算 should_pay - favour_fee 的平均值

val monthThreeData = billevents.filter("year_month > '2018-04-01'")

  .groupBy("phone_no")

  .agg(mean(col("should_pay") - col("favour_fee")))

  .withColumnRenamed("avg((should_pay - favour_fee))", "avgPay")

// 电视消费标签构建

val TVLevelUdf = udf((x: Double) => TVLevel(x))

Val monthThreeDataWithTVLevel = monthThreeData.withColumn("TVLevel", TVLevelUdf(col("avgPay"))).show()

// 宽带消费标签构建

val BCDLevelUdf = udf((x: Double) => BDCLevel(x))

val monthThreeDataWithBCDLevel = monthThreeData.withColumn("BCDLevel", BCDLevelUdf(col("avgPay"))).show()