mysql插入大量数据的几种方法executeBatch,load data local infile

mysql插入数据集合可以循环插入,可以使用jdbc的批处理executeBatch语句,可以使用load data local infile语句插入。

目录

一、循环遍历插入

二、批处理executeBatch语句

三、load data local infile语句

1.生成文件插入

2.采用setLocalInfileInputStream方法,这个方法可以不用生成文件,减少生成文件,往文件写入数据的IO操作

报错问题

反射获取表的所有数据库字段名称


一、循环遍历插入

List<BimXmlLevel> levels=new ArrayList<>();
for (BimXmlLevel level : levels) {
    service.save(level);
}

每个对象每次都会生成一条插入语句 insert into tableName values ()
这种方法只要数据量一多速度会很慢。

二、批处理executeBatch语句

使用prepareStatement预编译语句
使用addBatch,executeBatch批量插入语句

    private static JdbcTemplate jdbcTemplate = SpringContextHolder.getBean(JdbcTemplate.class);

    /**
     * 使用prepareStatement预编译语句
     * 使用addBatch,executeBatch批量插入语句
     * 需要在数据库url连接添加参数rewriteBatchedStatements=true
     * 实现原理就是把多条插入语句变成一条 INSERT INTO tableName(xx) values(xx),(xx),(xx)
     * 提示:sql后面不要加; 不然底层会拼接sql报错
     *
     * @param list
     * @throws Exception
     */
    public static void saveList(List list) throws Exception {
        long start = System.currentTimeMillis();
        Connection connection = jdbcTemplate.getDataSource().getConnection();
        connection.setAutoCommit(false);//不自动commit,一定要设置,不然速度也会很慢
        //通过反射获取表的所有数据库字段名称
        List<String> names = getNames(list.get(0));
        //多少个字段就有多少个?号
        List<String> vs = new ArrayList<>();
        for (int i = 0; i < names.size(); i++) {
            vs.add("?");
        }
        //表名
        String tableName = table(list.get(0));
        //语句
        String sql = "INSERT INTO " + tableName + "(" + StringUtil.join(names, ",") + ") VALUES (" + StringUtil.join(vs, ",") + ")";
        PreparedStatement stmt = connection.prepareStatement(sql);
        for (int j = 0; j < list.size(); j++) {
            Object obj = list.get(j);
            for (int i = 0; i < names.size(); i++) {
                String name = names.get(i);
                Object value = ReflectUtil.invokeGetter(obj, name);
                stmt.setObject(i + 1, value);//设置值
            }
            stmt.addBatch();
        }
        //批量执行
        stmt.executeBatch();
        stmt.clearBatch();
        stmt.close();
        connection.commit();//commit
        connection.close();
        long end = System.currentTimeMillis();
        logger.info("{}数据Batch入库成功:{}条,总耗时:{} ms", tableName, list.size(), end - start);
    }

需要在数据库url连接添加参数rewriteBatchedStatements=true
实现原理就是把多条插入语句变成一条 INSERT INTO tableName(xx) values(xx),(xx),(xx)
提示:sql后面不要加; 不然底层会拼接sql报错

三、load data local infile语句

1.生成文件插入

 private static JdbcTemplate jdbcTemplate = SpringContextHolder.getBean(JdbcTemplate.class);
    /**
     * 行分隔符
     */
    public static final String LINE_TERMINATED = ";;";
    /**
     * 字段分隔符
     */
    public static final String FIELDS_TERMINATED = "||";
    
    public static void saveFile(List list) throws Exception {
        long start = System.currentTimeMillis();
        Connection connection = jdbcTemplate.getDataSource().getConnection();
        connection.setAutoCommit(false);//不自动commit
        //通过反射获取表的所有数据库字段名称
        List<String> names = getNames(list.get(0));
        //表名
        String tableName = table(list.get(0));

        //获取插入数据
        StringBuilder data = new StringBuilder();
        for (Object obj : list) {
            for (int i = 0; i < names.size(); i++) {
                String name = names.get(i);
                Object value = ReflectUtil.invokeGetter(obj, name);
                if (value == null) value = "";
                if ("java.util.Date".equals(value.getClass().getName()))
                    value = DateUtil.formatYMD(ConvertUtil.toDate(value));
                if (i == names.size() - 1) {
                    data.append(value).append(LINE_TERMINATED);//最后一个参数,一条数据结束
                } else {
                    data.append(value).append(FIELDS_TERMINATED);
                }
            }
        }

        //生成数据文件
        String path = ConfigUtil.getTempDir() + File.separator + "BIM" + File.separator + "data" + File.separator + tableName + "-" + IdWorker.uuid() + ".txt";
        File file = new File(path);
        //写入数据
        FileUtils.write(file, data, "utf-8");
        //文件路径使用 / 符号
        path = path.replace("\\", "/");
        //load data local infile语句
        String sql = "load data local infile '"+path+"' into table " + tableName +
                " fields terminated by '" + FIELDS_TERMINATED + "' lines terminated by '" + LINE_TERMINATED + "' " +
                " (" + StringUtil.join(names, ",") + ");";
        Statement statement = connection.createStatement();
        statement.executeUpdate(sql);
        statement.close();
        connection.commit();//commit
        connection.close();
        long end = System.currentTimeMillis();
        logger.info("{}数据file入库成功:{}条,总耗时:{} ms", tableName, list.size(), end - start);

2.采用setLocalInfileInputStream方法,这个方法可以不用生成文件,减少生成文件,往文件写入数据的IO操作

private static JdbcTemplate jdbcTemplate = SpringContextHolder.getBean(JdbcTemplate.class);
    /**
     * 行分隔符
     */
    public static final String LINE_TERMINATED = ";;";
    /**
     * 字段分隔符
     */
    public static final String FIELDS_TERMINATED = "||";

    public static void saveFile(List list) throws Exception {
        long start = System.currentTimeMillis();
        Connection connection = jdbcTemplate.getDataSource().getConnection();
        connection.setAutoCommit(false);//不自动commit
        //通过反射获取表的所有数据库字段名称
        List<String> names = getNames(list.get(0));
        //表名
        String tableName = table(list.get(0));

        //获取插入数据
        StringBuilder data = new StringBuilder();
        for (Object obj : list) {
            for (int i = 0; i < names.size(); i++) {
                String name = names.get(i);
                Object value = ReflectUtil.invokeGetter(obj, name);
                if (value == null) value = "";
                if ("java.util.Date".equals(value.getClass().getName()))
                    value = DateUtil.formatYMD(ConvertUtil.toDate(value));
                if (i == names.size() - 1) {
                    data.append(value).append(LINE_TERMINATED);//最后一个参数,一条数据结束
                } else {
                    data.append(value).append(FIELDS_TERMINATED);
                }
            }
        }
        byte[] bytes = data.toString().getBytes();
        InputStream dataStream = new ByteArrayInputStream(bytes);

        //文件路径可以不写,采用文件流
        String sql = "load data local infile 'filepath' into table " + tableName +
                " fields terminated by '" + FIELDS_TERMINATED + "' lines terminated by '" + LINE_TERMINATED + "' " +
                " (" + StringUtil.join(names, ",") + ");";
        PreparedStatement stmt = connection.prepareStatement(sql);
        ClientPreparedStatement statement = stmt.unwrap(ClientPreparedStatement.class);
        statement.setLocalInfileInputStream(dataStream);//设置文件输入流
        int size = statement.executeUpdate();
        if (list.size() != size)
            throw new BusinessException(tableName + "导入条数:" + size + ",总数据条数:" + list.size() + ",检查数据是否有非法字符" + FIELDS_TERMINATED + "," + LINE_TERMINATED);
        statement.close();
        connection.commit();//commit
        connection.close();
        long end = System.currentTimeMillis();
        logger.info("{}数据file入库成功:{}条,总耗时:{} ms", tableName, list.size(), end - start);
    }

报错问题

 Loading local data is disabled; this must be enabled on both the client and server sides
1.在mysql配置文件my.ini修改:
[mysqld]下添加
local_infile = 1
[mysql]下添加
local_infile = 1
2.在数据库连接添加参数allowLoadLocalInfile=true
url=jdbc\:mysql\xxxxx?allowLoadLocalInfile=true

注意:文本文件中字段中的空值用\N表示

反射获取表的所有数据库字段名称

    /**
     * 获取所有数据库字段
     */
    private static List<String> getNames(Object obj) {
        List<String> ignore = Arrays.asList("parentIds", "createBy", "createTime", "updateTime", "companyId");//忽略插入字段
        List<String> names = new ArrayList<>();
        List<Field> fields = new ArrayList<>();
        for (Class<?> superClass = obj.getClass(); superClass != Object.class; superClass = superClass.getSuperclass()) {
            Field[] declaredFields = superClass.getDeclaredFields();
            fields.addAll(Arrays.asList(declaredFields));
        }
        for (Field field : fields) {
            Boolean status = true;
            Transient fieldTran = field.getAnnotation(Transient.class);
            if (fieldTran != null) status = false;
            String methodName = "get" + StringUtils.capitalize(field.getName());
            Method methodByName = ReflectUtil.getAccessibleMethodByName(obj, methodName);
            if (methodByName != null) {
                Transient methodTran = methodByName.getAnnotation(Transient.class);
                if (methodTran != null) status = false;
            } else {
                status = false;
            }
            if (status && !ignore.contains(field.getName())) {
                names.add(field.getName());
            }
        }
        return names;
    }

    /**
     * 获取数据库表名
     */
    private static String table(Object obj) {
        Table table = obj.getClass().getAnnotation(Table.class);
        if (table != null && StringUtil.isNotBlank(table.name())) {
            return table.name();
        } else {
            return obj.getClass().getSimpleName();
        }
    }