Flink实时电商数仓之DWS层

需求分析

  • 关键词
    在这里插入图片描述
  • 统计关键词出现的频率

IK分词

进行分词需要引入IK分词器,使用它时需要引入相关的依赖。它能够将搜索的关键字按照日常的使用习惯进行拆分。比如将苹果iphone 手机,拆分为苹果,iphone, 手机。

<dependency>
    <groupId>org.apache.doris</groupId>
    <artifactId>flink-doris-connector-1.17</artifactId>
</dependency>

<dependency>
    <groupId>com.janeluo</groupId>
    <artifactId>ikanalyzer</artifactId>
</dependency>

测试代码如下:

public class IkUtil {
    public static void main(String[] args) throws IOException {
        String s = "Apple 苹果15 5G手机";
        StringReader stringReader = new StringReader(s);

        IKSegmenter ikSegmenter = new IKSegmenter(stringReader, true);//第二个参数表示是否再对拆分后的单词再进行拆分,true时表示不在继续拆分

        Lexeme next = ikSegmenter.next();
        while (next!= null) {
            System.out.println(next.getLexemeText());
            next = ikSegmenter.next();
        }
    }
}

整体流程

  1. 创建自定义分词工具类IKUtil,IK是一个分词工具依赖
  2. 创建自定义函数类
  3. 注册函数
  4. 消费kafka DWD页面主题数据并设置水位线
  5. 从主流中过滤搜索行为
    • page[‘item’] is not null
    • item_type : “keyword”
    • last_page_id: “search”
  6. 使用分词函数对keyword进行拆分
  7. 对keyword进行分组开窗聚合
  8. 写出到doris
    • 创建doris sink
    • flink需要打开检查点才能将数据写出到doris

在这里插入图片描述

具体实现

import com.atguigu.gmall.realtime.common.base.BaseSQLApp;
import com.atguigu.gmall.realtime.common.constant.Constant;
import com.atguigu.gmall.realtime.common.util.SQLUtil;
import com.atguigu.gmall.realtime.dws.function.KwSplit;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

/**
 * title:
 *
 * @Author 浪拍岸
 * @Create 28/12/2023 上午11:06
 * @Version 1.0
 */
public class DwsTrafficSourceKeywordPageViewWindow extends BaseSQLApp {
    public static void main(String[] args) {
        new DwsTrafficSourceKeywordPageViewWindow().start(10021,4,"dws_traffic_source_keyword_page_view_window");
    }
    @Override
    public void handle(StreamExecutionEnvironment env, TableEnvironment tableEnv, String groupId) {
        //1. 读取主流dwd页面主题数据
        tableEnv.executeSql("create table page_info(\n" +
                "    `common` map<string,string>,\n" +
                "    `page` map<string,string>,\n" +
                "    `ts` bigint,\n" +
                "    `row_time` as to_timestamp_ltz(ts,3),\n" +
                "     WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n" +
                ")" + SQLUtil.getKafkaSourceSQL(Constant.TOPIC_DWD_TRAFFIC_PAGE, groupId));

        //测试是否获取到数据
        //tableEnv.executeSql("select * from page_info").print();

        //2. 筛选出关键字keywords
        Table keywrodTable = tableEnv.sqlQuery("select\n" +
                "    page['item'] keywords,\n" +
                "    `row_time`,\n" +
                "    ts\n" +
                " from page_info\n" +
                " where page['last_page_id'] = 'search'\n" +
                " and page['item_type'] = 'keyword'\n" +
                " and page['item'] is not null");
        tableEnv.createTemporaryView("keywords_table", keywrodTable);

        // 测试是否获取到数据
        //tableEnv.executeSql("select * from keywords_table").print();

        //3. 自定义分词函数并注册
        tableEnv.createTemporarySystemFunction("kwSplit", KwSplit.class );

        //4. 调用分词函数对keywords进行拆分
        Table splitKwTable = tableEnv.sqlQuery("select keywords, keyword, `row_time`" +
                " from keywords_table" +
                " left join lateral Table(kwSplit(keywords)) on true");
        tableEnv.createTemporaryView("split_kw_table", splitKwTable);

        //tableEnv.executeSql("select * from split_kw_table").print();

        //5. 对keyword进行分组开窗聚合
        Table windowAggTable = tableEnv.sqlQuery("select\n" +
                "    keyword,\n" +
                "    cast(tumble_start(row_time,interval '10' second ) as string) wStart,\n" +
                "    cast(tumble_end(row_time,interval '10' second ) as string) wEnd,\n" +
                "    cast(current_date as string)  cur_date,\n" +
                "    count(*) keyword_count\n" +
                "from split_kw_table\n" +
                "group by tumble(row_time, interval '10' second), keyword");

        //tableEnv.createTemporaryView("result_table",table);
        //tableEnv.executeSql("select keyword,keyword_count+1 from result_table").print();

        //6. 写出到doris
        tableEnv.executeSql("create table doris_sink\n" +
                "(\n" +
                "    keyword                STRING,\n" +
                "    wStart                 STRING,\n" +
                "    wEnd                   STRING,\n" +
                "    cur_date               STRING,\n" +
                "    keyword_count          BIGINT\n" +
                ")" + SQLUtil.getDorisSinkSQL(Constant.DWS_TRAFFIC_SOURCE_KEYWORD_PAGE_VIEW_WINDOW));

        windowAggTable.insertInto("doris_sink").execute();
    }
}