读取服务器文件,并进行移动-ChannelSftp
要求:
读取服务器上某个目录的文件下的txt的内容,读取里面的数据,进行处理,保存入库。保存后,把文件移到该目录下的bak目录。
处理:
要操作服务器文件,需要使用 ChannelSftp,文档
常用方法:
put(): 文件上传
get(): 文件下载
cd(): 进入指定目录
ls(): 得到指定目录下的文件列表
rm(): 删除指定文件
rename() 移动文件
java代码:
引用:
gradle:
// 添加sftp的工具包
implementation group: 'com.jcraft', name: 'jsch', version: '0.1.55'
maven:
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>0.1.55</version>
</dependency>
业务
CustomerService
@Slf4j
@Service
public class CustomerService {
public void readFile() throws SftpException {
SftpConfig sftpConfig = SftpUtils.getSftpConfigVal();
ChannelSftp sftp = SftpUtils.connect(sftpConfig);
String path = sftpConfig.getPath();
String newpath = path + "/bak";
// 判断文件路径是否存在
SftpUtils.isExist(sftp, newpath);
sftp.cd(path);
// 读取txt文件
Vector<ChannelSftp.LsEntry> list = sftp.ls("*.txt");
List<String> data;
String pathFileName, newpathFileName ="";
for (ChannelSftp.LsEntry entry : list) {
String fileName = entry.getFilename();
log.info(fileName);
pathFileName = path +"/"+ fileName;
data = SftpUtils.getList(sftp, pathFileName);
if(CollectionUtils.isNotEmpty(data)){
Boolean moveFile = dealFileData(data);
if(moveFile){
try {
newpathFileName = newpath + "/"+ fileName;
log.info("path:{}, newPath: {} ", pathFileName, newpathFileName);
sftp.rename(pathFileName, newpathFileName);
}catch (Exception e){
log.error("路径{},文件{}移动文件失败!", pathFileName, newpathFileName);
e.printStackTrace();
}
}
}
}
SftpUtils.disConnect(sftpConfig)
}
private Boolean dealFileData(List<String> data){
Boolean moveFile = false;
List<String> filterData = ListUtils.emptyIfNull(data).stream().filter(e -> e.contains("_")).collect(Collectors.toList());
if(filterData.size() > 0){// 有符合要求的数据
moveFile = true;
// 对数据进行处理
}
return moveFile;
}
}
实体:
SftpConfig
/**
* sftp 配置
*/
@Data
public class SftpConfig {
/** 密钥地址 */
private String privateKeyPath;
/** 口令 */
private String passphrase;
private String ip;
private Integer port;
private String username;
private String pwd;
private String path;
private String baseDir;
}
SftpChannel
@Slf4j
public class SftpChannel {
Session session = null;
ChannelSftp sftp = null;
//端口默认为22
public static final int SFTP_DEFAULT_PORT = 22;
/** 利用JSch包实现SFTP下载、上传文件(秘钥方式登陆)*/
public ChannelSftp connectByIdentity(SftpConfig sftpConfig) throws JSchException {
JSch jsch = new JSch();
int port = SFTP_DEFAULT_PORT;
//设置密钥和密码
//支持密钥的方式登陆,只需在jsch.getSession之前设置一下密钥的相关信息就可以了
if (StringUtils.isNotBlank(sftpConfig.getPrivateKeyPath())) {
if (StringUtils.isNotBlank(sftpConfig.getPassphrase())) {
//设置带口令的密钥
jsch.addIdentity(sftpConfig.getPrivateKeyPath(), sftpConfig.getPassphrase());
} else {
//设置不带口令的密钥
jsch.addIdentity(sftpConfig.getPrivateKeyPath());
}
}
if (sftpConfig.getPort() != null) {
port = sftpConfig.getPort();
}
if (port > 0) {
//采用指定的端口连接服务器
session = jsch.getSession(sftpConfig.getUsername(), sftpConfig.getIp(), port);
} else {
//连接服务器,采用默认端口
session = jsch.getSession(sftpConfig.getUsername(), sftpConfig.getIp());
}
if (session == null) {
throw new JSchException("session is null,connect fail");
}
log.info("Session created ... UserName={};ip={};port={}", sftpConfig.getUsername(), sftpConfig.getIp(), sftpConfig.getPort());
Properties sshConfig = new Properties();
sshConfig.put("StrictHostKeyChecking", "no");
session.setConfig(sshConfig);
session.setTimeout(30000);
session.connect();
//创建sftp通信通道
Channel channel = session.openChannel("sftp");
channel.connect();
sftp = (ChannelSftp) channel;
log.info("login success...");
return sftp;
}
/** 利用JSch包实现SFTP下载、上传文件(用户名密码方式登陆) */
public ChannelSftp connectByPwd(SftpConfig sftpConfig) throws JSchException {
JSch jsch = new JSch();
int port = SFTP_DEFAULT_PORT;
if (sftpConfig.getPort() != null) {
port = sftpConfig.getPort();
}
if (port > 0) {
//采用指定的端口连接服务器
session = jsch.getSession(sftpConfig.getUsername(), sftpConfig.getIp(), port);
} else {
//连接服务器,采用默认端口
session = jsch.getSession(sftpConfig.getUsername(), sftpConfig.getIp());
}
if (session == null) {
throw new JSchException("session is null,connect fail");
}
log.info("Session created ... UserName={};ip={};port={}", sftpConfig.getUsername(), sftpConfig.getIp(), sftpConfig.getPort());
//设置登陆主机的密码
session.setPassword(sftpConfig.getPwd());//设置密码
Properties sshConfig = new Properties();
sshConfig.put("StrictHostKeyChecking", "no");
session.setConfig(sshConfig);
session.setTimeout(30000);
session.connect();
//创建sftp通信通道
Channel channel = session.openChannel("sftp");
channel.connect();
sftp = (ChannelSftp) channel;
log.info("login success...");
return sftp;
}
public void closeChannel() {
log.info("sftp object closing...");
if (sftp != null) {
if (sftp.isConnected()) {
sftp.disconnect();
}
}
if (session != null) {
if (session.isConnected()) {
session.disconnect();
}
}
}
}
工具类
SftpUtils
/**
* sftp 上传下载工具类
*/
public class SftpUtils {
private static Logger logger = LoggerFactory.getLogger(SftpUtils.class);
private static long count = 3;
private static long count1 = 0;
private static long sleepTime;
/**
* 连接sftp服务器
*
* @return
*/
public static ChannelSftp connect(SftpConfig sftpConfig) {
ChannelSftp sftp = null;
try {
JSch jsch = new JSch();
jsch.getSession(sftpConfig.getUsername(), sftpConfig.getIp(), sftpConfig.getPort());
Session sshSession = jsch.getSession(sftpConfig.getUsername(), sftpConfig.getIp(),
sftpConfig.getPort());
logger.info("Session created ... UserName=" + sftpConfig.getUsername() + ";host=" + sftpConfig.getIp()
+ ";port=" + sftpConfig.getIp());
sshSession.setPassword(sftpConfig.getPwd());
Properties sshConfig = new Properties();
sshConfig.put("StrictHostKeyChecking", "no");
sshSession.setConfig(sshConfig);
sshSession.connect();
logger.info("Session connected ...");
logger.info("Opening Channel ...");
Channel channel = sshSession.openChannel("sftp");
channel.connect();
sftp = (ChannelSftp) channel;
logger.info("登录成功");
} catch (Exception e) {
try {
count1 += 1;
if (count == count1) {
throw new RuntimeException(e);
}
if (count1 != count) {
Thread.sleep(sleepTime);
logger.info("connect again....");
connect(sftpConfig);
}
} catch (InterruptedException e1) {
e1.printStackTrace();
throw new RuntimeException(e1);
}
}
return sftp;
}
/**
* 获取Sftp对象
* @param param json sftp对象
* @return SftpConfig
*/
public static SftpConfig getSftpObj(String param) {
SftpConfig sftpConfig = new SftpConfig();
if (StringUtils.isNotBlank(param)) {
sftpConfig = JSONObject.parseObject(param, SftpConfig.class);
}
return sftpConfig;
}
/** sftp 上传*/
public static boolean upload(SftpConfig config, String baseDir, String fileName, String filePath) {
logger.info("路径:baseDir=" + baseDir);
SftpChannel sftpChannel = new SftpChannel();
ChannelSftp sftp;
try {
if (StringUtils.isNotBlank(config.getPrivateKeyPath())) {
sftp = sftpChannel.connectByIdentity(config);
} else {
sftp = sftpChannel.connectByPwd(config);
}
if (sftp.isConnected()) {
logger.info("connect server success");
} else {
logger.error("connect server fail");
return false;
}
//检查路径
if (!isExist(sftp, baseDir)) {
logger.error("创建sftp服务器路径失败:" + baseDir);
return false;
}
String dst = baseDir + "/" + fileName;
String src = filePath + "/" + fileName;
logger.info("开始上传,本地服务器路径:[" + src + "]目标服务器路径:[" + dst + "]");
sftp.put(src, dst);
sftp.put(src, dst);
logger.info("上传成功");
return true;
} catch (Exception e) {
logger.error("上传失败", e);
return false;
} finally {
sftpChannel.closeChannel();
}
}
/** sftp 上传*/
public boolean uploadNew(SftpConfig config, String baseDir, String fileName, ByteArrayOutputStream out) {
logger.info("sftpconfig==>{}", config);
ChannelSftp sftp = connect(config);
boolean flag = true;
try {
sftp.cd(baseDir);
logger.info("first step=========>{}");
} catch (SftpException e) {
try {
sftp.mkdir(baseDir);
sftp.cd(baseDir);
} catch (SftpException e1) {
flag = false;
throw new RuntimeException("ftp create filepath fail" + baseDir);
}
}
logger.info("the third step =========>{},uploadFile==>{}", flag, fileName);
try (InputStream in = new ByteArrayInputStream(out.toByteArray())) {
sftp.put(in, fileName+".ing");
logger.info("the fourth step =========>{}", "transfer success");
sftp.rename(baseDir + "/" + fileName+".ing", baseDir + "/" + fileName);
logger.info("the fifth step =========>{}", "rename success");
} catch (Exception e) {
flag = false;
throw new RuntimeException("sftp excetion" + e);
} finally {
disConnect(sftp);
}
return flag;
}
/**
* 复制某路径下的文件到另外一个路径
* 直接移动文件,用 ChannelSftp sftp.rename(String oldpath, String newpath)
* @param config SftpConfig
* @param sourceBaseDir 源文件路径
* @param sourceFileName 源文件名称
* @param destFilePath 目标文件路径
* @param destFileName 目标文件名称
* @return boolean
*/
public static boolean copyFile(SftpConfig config, String sourceBaseDir, String sourceFileName, String destFilePath, String destFileName) {
SftpChannel sftpChannel = new SftpChannel();
ChannelSftp sftp;
try {
if (StringUtils.isNotBlank(config.getPrivateKeyPath())) {
sftp = sftpChannel.connectByIdentity(config);
} else {
sftp = sftpChannel.connectByPwd(config);
}
if (sftp.isConnected()) {
logger.info("connect server success");
} else {
logger.error("connect server fail");
return false;
}
String dest;
if (StringUtils.isBlank(destFileName)) {
dest = destFilePath + sourceFileName;
} else {
dest = destFilePath + destFileName;
}
String src = sourceBaseDir + "/" + sourceFileName;
logger.info("start downing,sftp server path:[" + src + "] dest server path:[" + dest + "]");
//获取文件的大小
SftpATTRS attr = sftp.stat(src);
long fileSize = attr.getSize();
logger.info("downing file size :" + fileSize);
sftp.get(src, dest);
logger.info("download success");
return true;
} catch (Exception e) {
logger.error("download fail", e);
return false;
} finally {
sftpChannel.closeChannel();
}
}
/**
*
* @param config SftpConfig
* @param baseDir sftp 路径
* @param sourceFileName sftp 文件名称
* @param savePath 本地路径
* @param destFileName 本地文件名称
* @return 返回文件路径
* @throws SftpException
* @throws JSchException
* @throws IOException
*/
public static String downToLocal(SftpConfig config, String baseDir, String sourceFileName, String savePath, String destFileName) throws SftpException, JSchException, IOException {
SftpChannel sftpChannel = new SftpChannel();
ChannelSftp sftp = null;
String localPath = "";
try {
if (StringUtils.isNotBlank(config.getPrivateKeyPath())) {
sftp = sftpChannel.connectByIdentity(config);
} else {
sftp = sftpChannel.connectByPwd(config);
}
if (sftp.isConnected()) {
logger.info("connect server success");
} else {
logger.error("connect server fail");
}
String src = baseDir + "/" + sourceFileName;
logger.info("start downing,sftp server path:[" + src + "]");
//获取文件的大小
SftpATTRS attr = sftp.stat(src);
long fileSize = attr.getSize();
logger.info("downing file size :" + fileSize);
InputStream is = sftp.get(src);
ByteArrayOutputStream out = new ByteArrayOutputStream();
int len;
byte[] bytes = new byte[1024];
while ((len = is.read(bytes)) != -1) {
out.write(bytes, 0, len);
}
File saveDir = new File(savePath);
if(!saveDir.exists()){ // 没有就创建该文件
saveDir.mkdir();
}
File file = new File(saveDir+File.separator+(System.currentTimeMillis()/10000)+destFileName);
FileOutputStream fos = new FileOutputStream(file);
fos.write(bytes);
fos.close();
is.close();
return file.getPath();
} finally {
sftpChannel.closeChannel();
}
}
/**
* 判断文件夹是否存在
* true 目录创建成功,false 目录创建失败
* @param sftp ChannelSftp
* @param filePath 文件夹路径
* @return boolean
*/
public static boolean isExist(ChannelSftp sftp, String filePath) {
String paths[] = filePath.split("/");
String dir = paths[0];
for (int i = 0; i < paths.length - 1; i++) {
dir = dir + "/" + paths[i + 1];
try {
sftp.cd(dir);
} catch (SftpException sException) {
if (ChannelSftp.SSH_FX_NO_SUCH_FILE == sException.id) {
try {
sftp.mkdir(dir);
} catch (SftpException e) {
e.printStackTrace();
return false;
}
}
}
}
return true;
}
/**
* 获取文件数据
* @param sftp ChannelSftp
* @param ftpRootDir 文件路径及名称, 比如/home/ddzxsftp/ivr/33.txt
* @return List<String>
*/
public static List<String> getList(ChannelSftp sftp, String ftpRootDir) {
List<String> list = new ArrayList<>();
try {
InputStream is = sftp.get(ftpRootDir);
String str;
BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
while ((str = br.readLine()) != null) {
list.add(str);
}
br.close();
} catch (Exception e) {
logger.error("get file data excetion {}", e);
}
return list;
}
/**
* 断掉连接
*/
public void disConnect(ChannelSftp sftp) {
try {
sftp.disconnect();
sftp.getSession().disconnect();
} catch (Exception e) {
logger.error("disConnect errorMessage",e);
}
}
/**
* 获取配置表里面配置sftp连接的内容
* 把连接信息配置在数据库
*/
public static SftpConfig getSftpConfigVal() {
String configVal = "{\"ftpHost\":\"132.65.205.86\",\"ftpPort\":\"22\",\"ftpUserName\":\"yan\",\"ftpPassword\":\"KG@055\",\"ftpBaseDir\":\"/home/sftp\",\"path\":\"/home/sftp/ivr\",\"privateKeyPath\":\"\",\"passphrase\":\"\"}";
JSONObject configObj = JSON.parseObject(configVal);
String ftpHost = MapUtils.getString(configObj, "ftpHost");
int ftpPort = MapUtils.getInteger(configObj, "ftpPort");
String ftpUserName = MapUtils.getString(configObj, "ftpUserName");
String ftpPassword = MapUtils.getString(configObj, "ftpPassword");
String ftpBaseDir = MapUtils.getString(configObj, "ftpBaseDir");
String path = MapUtils.getString(configObj, "path");
String privateKeyPath = MapUtils.getString(configObj, "privateKeyPath");
String passphrase = MapUtils.getString(configObj, "passphrase");
SftpConfig config = new SftpConfig();
config.setIp(ftpHost);
config.setPort(ftpPort);
config.setUsername(ftpUserName);
config.setPwd(ftpPassword);
config.setBaseDir(ftpBaseDir);
config.setPath(path);
config.setPrivateKeyPath(privateKeyPath);
config.setPassphrase(passphrase);
return config;
}
}
总结:
用cd(): 进入指定目录,然后用ls(): 得到指定目录下的文件列表,可以指定具体的文件类型。读取文件后,再用 rename(): 移动文件。了解ChannelSftp命令,要操作服务器上的文件就方便很多