spark操作hive表,用java调用提交任务脚本
Spark最主要资源管理方式按排名为Hadoop Yarn, Apache Standalone 和Mesos。
此次使用的yarn模式部署。spark操作hive创建任务,需要将此任务提交。在经过多日查找资料和测试发现多数是提交本地,少有提交到远程yarn集群。发现提交到远程集群的是使用 Standalone模式部署的,这种模式可以指定ip及端口。因此在此次开发中是没有使用java程序提交spark任务。还发现一种在java代码中写提交任务,但需要通过java命令触发,感觉本质上与用spark命令提交没有区别,故此方案不作考虑。所以spark任务提交会做成shell脚本来提交。
import org.apache.spark.sql.SparkSession;
public class SparkTest {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("SparkTest").master("yarn-client").enableHiveSupport()
.getOrCreate();
// 删除hive表
spark.sql("DROP TABLE IF EXISTS CH_TEST.SPARK_TEST");
// 创建hive表
String createSql = "CREATE TABLE IF NOT EXISTS CH_TEST.SPARK_TEST (MEMBER_ID string,MEMBER_NAME string,QTT bigint)";
spark.sql(createSql);
// 插入数据
String insertSql = "INSERT INTO TABLE CH_TEST.HIVE_SPARK select * from CH_TEST.SPARK_TEST a limit 10";
spark.sql(insertSql);
spark.stop();
}
}
只需在main方法中写入spark逻辑,再使用spark命令调用执行即可。SparkSession是最新版本spark所使用的,之前使用的SQLContext、HiveContext 均已过时,会出现找不到表的情况。
# --class指定需要提交的类
# --master指定提交方式
# /root/bigdata/hadoop-0.0.1.jar指定类所在jar包
# 还有其他参数,但此次提交只需要这些就可以
spark2-submit --class com.test.SparkTest
--master yarn /root/bigdata/hadoop-0.0.1.ja
使用java提交spark任务
import java.io.IOException;
import java.io.InputStreamReader;
public class JavaShell {
public static void commitSparkShell(String command,String type,long start){
InputStreamReader stdISR = null;
InputStreamReader errISR = null;
Process process = null;
try {
process = Runtime.getRuntime().exec(command);
CommandStreamGobbler errorGobbler = new CommandStreamGobbler(process.getErrorStream(), command, "ERR");
CommandStreamGobbler outputGobbler = new CommandStreamGobbler(process.getInputStream(), command, "STD");
errorGobbler.start();
// 必须先等待错误输出ready再建立标准输出
while (!errorGobbler.isReady()) {
Thread.sleep(10);
}
outputGobbler.start();
while (!outputGobbler.isReady()) {
Thread.sleep(10);
}
process.waitFor();
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
if (stdISR != null) {
stdISR.close();
}
if (errISR != null) {
errISR.close();
}
if (process != null) {
process.destroy();
}
long end = System.currentTimeMillis();
System.out.println(type + "计算总共耗时:"+ String.valueOf(end - start) + "ms");
} catch (IOException e) {
System.out.println("正式执行命令:" + command + "有IO异常");
}
}
}
}
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.LinkedList;
import java.util.List;
public class CommandStreamGobbler extends Thread{
private InputStream is;
private String command;
private String prefix = "";
private boolean readFinish = false;
private boolean ready = false;
private List<String> infoList = new LinkedList<String>();
public CommandStreamGobbler(InputStream is, String command, String prefix) {
this.is = is;
this.command = command;
this.prefix = prefix;
}
public void run() {
InputStreamReader isr = null;
try {
isr = new InputStreamReader(is);
BufferedReader br = new BufferedReader(isr);
String line = null;
ready = true;
while ((line = br.readLine()) != null) {
infoList.add(line);
System.out.println(prefix + " line: " + line);
}
} catch (IOException ioe) {
System.out.println("正式执行命令:" + command + "有IO异常");
} finally {
try {
if (isr != null) {
isr.close();
}
} catch (IOException ioe) {
System.out.println("正式执行命令:" + command + "有IO异常");
}
readFinish = true;
}
}
public InputStream getIs() {
return is;
}
public String getCommand() {
return command;
}
public boolean isReadFinish() {
return readFinish;
}
public boolean isReady() {
return ready;
}
public List<String> getInfoList() {
return infoList;
}
}
为什么使用CommandStreamGobbler ,因为在测试提交任务的时候,并不能完全执行脚本。因为当标准输出流或标准错误流非常庞大的时候,会出现调用waitFor方法卡死的bug。
原因分析:假设linux进程不断向标准输出流和标准错误流写数据,而JVM却不读取,数据会暂存在linux缓存区,当缓存区存满之后导致该进程无法继续写数据,会僵死,导致java进程会卡死在waitFor()处,永远无法结束。
解决方式:由于标准输出和错误输出都会向Linux缓存区写数据,而脚本如何输出这两种流是Java端不能确定的。为了不让shell脚本的子进程卡死,这两种输出需要分别读取,而且不能互相影响。所以必须新开两个线程来进行读取。