Spark生产集群各种使用
1.环境配置
1.1 版本说明
要求 | 版本 | 是否必须 | 其他事项 |
---|---|---|---|
Hadoop | 3.3.4 | 是 | hadoop3.3.0之后原生支持国内主要对象存储 |
Hive | 3.1.3 | 否 | 实测没有Hive也可以使用sparksql,使用hive更好的管理HDFS数据 |
spark | 3.3.1 | 是 | hive和spark整合后,语法为HSQL,自定义函数按照hive使用即可 |
1.2 相关介绍官网
Hadoop官网:https://hadoop.apache.org/docs/r3.3.4/hadoop-cos/cloud-storage/index.html
腾讯对象存储cos使用官网:https://cloud.tencent.com/document/product/436/6884
spark使用官网文档:https://spark.apache.org/docs/3.3.1/sql-ref.html
hive使用官网文档:https://cwiki.apache.org/confluence/display/Hive/Home#Home-UserDocumentation
2. 使用实例
2.1 spark集群操作doris集群
脚本实例
vim test.sh
#!/bin/bash
dori_test="
CREATE
TEMPORARY VIEW spark_doris1
USING doris
OPTIONS(
'table.identifier'='demo.t1',
'fenodes'='xxx:8030',
'user'='root',
'password'='xxx'
);
CREATE
TEMPORARY VIEW spark_doris2
USING doris
OPTIONS(
'table.identifier'='demo.t2',
'fenodes'='xxx:8030',
'user'='xxx',
'password'='xxx'
);
INSERT INTO spark_doris1
select * from spark_doris2;
"
spark-sql -e "$dori_test"
nohup ./test.sh > ./logs.text 2>&1 &
2.2 spark集群同时操作doris集群和hive集群
vim test.sh
#!/bin/bash
dori_test="
set spark.debug.maxToStringFields=200
set spark.sql.adaptive.coalescePartitions.enabled=true
set spark.sql.cbo.enabled=true
set spark.sql.sources.parallelPartitionDiscovery.parallelism=50
set spark.sql.shuffle.partitions=1000
CREATE
TEMPORARY VIEW spark_doris1
USING doris
OPTIONS(
'table.identifier'='demo.t1',
'fenodes'='xxx:8030',
'user'='root',
'password'='xxx'
);
CREATE
TEMPORARY VIEW spark_doris2
USING doris
OPTIONS(
'table.identifier'='demo.t2',
'fenodes'='xxx:8030',
'user'='xxx',
'password'='xxx'
);
INSERT INTO spark_doris1
select a1.id,name,age
(select id,name from spark_doris2) a1
left join
(select id,age from hive.table) a2
on a1.id=a2.id
"
spark-sql --conf "spark.dynamicAllocation.maxExecutors=200" --conf "spark.driver.maxResultSize=8g" -S -e "$dori_test"
nohup ./test.sh > ./logs.text 2>&1 &
2.3 spark集群数据写入HDFS中,元数据存储于hive中
#在hive中建表
CREATE TABLE `student1`(
id int, name string
)
PARTITIONED BY (
`part_day` string)
LOCATION
'/warehouse/test/student1'
#数据插入
# set spark.yarn.queue=cece_dw;
insert into table student1 values(1,'abc','20231217');
2.4 spark集群数据写入cos对象存储,元数据存储于hive中
#在hive中建表
CREATE TABLE `student2`(
id int, name string
)
PARTITIONED BY (
`part_day` string)
LOCATION
'cosn://xxxx/test/student2';
#数据插入
# set spark.yarn.queue=cece_dw;
insert into table student2 values(1,'abc','20231217');
3.参数配置说明
因spark为hive的计算引擎,因此如操作本地hive数据hive的参数也可以在配置使用
3.1 spark参数配置说明
#设置计算引擎
set hive.execution.engine=spark;
#设置spark提交模式
set spark.master=yarn-cluster;
#设置作业提交队列
set spark.yarn.queue=${queue_name};
#设置队列的名字
set mapreduce.job.queuename=root.users.hdfs;
#设置作业名称
set spark.app.name=${job_name};
#该参数用于设置Spark作业总共要用多少个Executor进程来执行
set spark.executor.instances=25;
#设置执行器计算核个数
set spark.executor.cores=4;
#设置执行器内存
set spark.executor.memory=8g
#设置任务并行度
set mapred.reduce.tasks=600;
#设置每个executor的jvm堆外内存
set spark.yarn.executor.memoryOverhead=2048;
#设置内存比例(spark2.0+)
set spark.memory.fraction=0.8;
#设置对象序列化方式
set spark.serializer=org.apache.serializer.KyroSerializer;
#设置动态分区
set hive.exec.dynamic.partition=true; --开启动态分区功能
set hive.exec.dynamic.partition.mode=nonstrict; --允许所有分区是动态的
set hive.exec.max.dynamic.partitions.pernode=1000; --每个mapper/reducer可以创建的最大动态分区数
--set hive.exec.dynamic.partitions=10000; 这个可不要
insert overwrite table test partition(country,state) select * from test2; --添加动态分区示例
3.2 hive参数配置说明
#设置hive的计算引擎tez/spark/mr三选一
set hive.execution.engine=tez/spark/mr;
#设置每个map的内存
set mapreduce.map.memory.mb=4096;
#设置每个reduce的内存
set mapreduce.reduce.memory.mb=4096;
#这个 Java 程序可以使用的最大堆内存数,一定要小于mapreduce.map.memory.mb
set mapreduce.map.java.opts=-Xmx13106M;
#可以使用的最大堆内存数,一定要小于mapreduce.reduce.memory.mb
mapreduce.reduce.java.opts
#辅助设置-打印表头
set hive.cli.print.header=true;
#辅助设置-显示当前数据库
set hive.cli.print.current.db=true;
#开启任务并行执行
set hive.exec.parallel=true;
#同一个sql允许并行任务的最大线程数
set hive.exec.parallel.thread.number=8;
#部分需要联合使用
#设置成true, 表示开启动态分区功能
set hive.exec.dynamic.partition=true;
#设置成nonstrict, 表示允许所有分区都是动态的
set hive.exec.dynamic.partition.mode=nonstrict;
#每个mapper或reducer可以创建的最大动态分区个数.如果某个mapper或reducer尝试创建大于这个值的分区的话则会知出一个致命错误
set hive.exec.max.dynamic.partitions.pernode=1000;
#一个动态分区创建语句可以创建的虽大动态分区个数,如果超过这个佳则会抛出—个致命错误
set hive.exec.max.dynamic.partitions=1000;
#全局可以创建的最大文件个数.有一个Hadoop计数器会跟踪记录创速了多少个文件
hive.exec.max.created.files
#设置reduce数量, mapper数量:reduce数量 = 10:1
set mapred.reduce.tasks=100;
#设置每个reduce处理数据量,默认1G
set hive.exec.reducers.bytes.per.reducer=500000000; (500M)
#部分需要联合使用
#每个Map最大输入大小,间接设置map个数,默认256M就比较好。
set mapred.max.split.size=256000000;
#每个Map最小输入大小
set mapred.min.split.size=100000000;
#一个节点上split的至少的大小
set mapred.min.split.size.per.node=100000000;
#一个交换机下split的至少的大小
set mapred.min.split.size.per.rack=100000000;
#执行Map前进行小文件合并,前面参数确定合并文件块的大小
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
#在Map-only的任务结束时合并小文件
set hive.merge.mapfiles=true;
# 合并Tez 作业产生的文件
set hive.merge.tezfiles=true;
# 合并spark 作业产生的文件
set hive.merge.sparkfiles=true;
#合并 MapReduce 作业产生的文件
set hive.merge.mapredfiles=true;
#Map输入合并小文件
#每个Map最大输入大小
set mapred.max.split.size=256000000;
#一个节点上split的至少的大小
set mapred.min.split.size.per.node=100000000;
#一个交换机下split的至少的大小
set mapred.min.split.size.per.rack=100000000;
#执行Map前进行小文件合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
#输出合并
#合并 MapOnly 的作业产生的文件
set hive.merge.mapfiles=true;
#合并后的目标文件大小
# 希望的合并后的目标文件大小,如果此值小于 hive.merge.smallfiles.avgsize, 则此值为 hive.merge.smallfiles.avgsize。
set hive.merge.size.per.task=128000000;
#小文件的判断
#如果一个 job 结束后,生成的文件的平均大小 小于 参数 hive.merge.smallfiles.avgsize 设定的值,则认为是小文件。如以下设置平均小文件的大小为 128M。
set hive.merge.smallfiles.avgsize=128000000;
set mapred.min.split.size.per.rack=100000000;
set mapred.min.split.size.per.node=100000000;
set hive.merge.size.per.task=256000000;
set mapred.max.split.size=256000000;
set hive.merge.smallfiles.avgsize=16000000;
#reduce任务从map完成80%后开始执行
群默认0.8,大部分比较小的job都是适合的,对于map比较重的大job,这个值可以适当调大,比如0.9
set mapreduce.job.reduce.slowstart.completedmaps=0.9
#每个reduce处理的数据量,间接设置reduce的个数
set hive.exec.reducers.bytes.per.reducer=500000000;
#设置最大reduce的个数
set hive.exec.reducers.max=999;
#直接设置reduce的个数,reduce个数并不是越多越好!Order by时只有一个reduce
set mapred.reduce.tasks=10;
#是否开启矢量化
#矢量化查询执行是 Hive 的一项功能,可大大降低典型查询操作(如扫描、过滤器、聚合和连接)的 CPU 使用率 ,但是只支持部分函数
#split函数不支持,使用split开启矢量化时反而批处理适得其反
set hive.vectorized.execution.enabled=false;
#开启Map输出阶段压缩
# 开启hive中间传输数据压缩功能
set hive.exec.compress.intermediate=true;
# 开启mapreduce中map输出压缩功能
set mapreduce.map.output.compress=true;
# 设置mapreduce中map输出数据的压缩方式
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
#开启Reduce输出阶段压缩
# 1.开启hive最终输出数据压缩功能
set hive.exec.compress.output=true;
# 2.开启mapreduce最终输出数据压缩
set mapreduce.output.fileoutputformat.compress=true;
# 3.设置mapreduce最终数据输出压缩方式
#org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec
set mapreduce.output.fileoutputformat.compress.codec = org.apache.hadoop.io.compress.SnappyCodec;
# 4.设置mapreduce最终数据输出压缩为块压缩
set mapreduce.output.fileoutputformat.compress.type=BLOCK;