本网站(662p.com)打包出售,且带程序代码数据,662p.com域名,程序内核采用TP框架开发,需要联系扣扣:2360248666 /wx:lianweikj
精品域名一口价出售:1y1m.com(350元) ,6b7b.com(400元) , 5k5j.com(380元) , yayj.com(1800元), jiongzhun.com(1000元) , niuzen.com(2800元) , zennei.com(5000元)
需要联系扣扣:2360248666 /wx:lianweikj
MybatisPlus批量保存原理及失效原因排查全过程
itnanba · 210浏览 · 发布于2023-01-11 +关注

这篇文章主要介绍了MybatisPlus批量保存原理及失效原因排查全过程,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

问题描述

一般情况下,在MybatisPlus中使用saveBatch方法进行批量保存只需要:在数据库连接串中添加&rewriteBatchedStatements=true,并将MySQL驱动保证在5.0.18以上即可。

但是在这里实际使用中批量保存并没有生效,列表数据被分组成几批数据保存,而不是一批数据保存,通过调试、查看数据库日志等方式可以验证。

所以现在是配置正确,驱动正确,批量保存的数据正确,但是批量保存没有生效。

批量保存原理

框架是不会出问题的,这里来看下MybatisPlus实现批量保存的实现方式,调用saveBatch方法后发生了什么。

ServiceImpl.saveBatch()

@Transactional(rollbackFor = Exception.class)
@Override
public boolean saveBatch(Collection<T> entityList, int batchSize) {
  // ${className}.insert
    String sqlStatement = getSqlStatement(SqlMethod.INSERT_ONE);
  // 函数式编程 BiConsumer<T, U>
    return executeBatch(entityList, batchSize, (sqlSession, entity) -> sqlSession.insert
    (sqlStatement, entity));
}

采用函数式编程实现BiConsumer接口,其用法相当于

@Override
public boolean saveBatch(Collection entityList, int batchSize) {
     BiConsumer<SqlSession, Object> consumer = new EntityConsumer();
     return executeBatch(entityList, batchSize, consumer);
}
 class EntityConsumer implements BiConsumer<SqlSession, Object>{
     @Override
    public void accept(SqlSession sqlSession, Object object) {
        sqlSession.insert("${className}.insert", object);
    }
}

SqlHelper.executeBatch()

定义批量保存的模板方法

public static <E> boolean executeBatch(Class<?> entityClass, Log log, Collection<E> list,
 int batchSize, BiConsumer<SqlSession, E> consumer) {
    Assert.isFalse(batchSize < 1, "batchSize must not be less than one");
    return !CollectionUtils.isEmpty(list) && executeBatch(entityClass, log, sqlSession -> {
        int size = list.size();
        int i = 1;
        for (E element : list) {
      // 调用sqlSession.insert("${className}.insert", object);
      // 数据最终保存到StatementImpl.batchedArgs中,用于后面做批量保存
            consumer.accept(sqlSession, element);
            if ((i % batchSize == 0) || i == size) {
        // 批量保存StatementImpl.batchedArgs中的数据
                sqlSession.flushStatements();
            }
            i++;
        }
    });
}

MybatisBatchExecutor.doUpdate()

将待执行对象添加到对应的Statement中,可以理解为将批量数据分组,分组的依据包含两个:

if (sql.equals(currentSql) && ms.equals(currentStatement))

● 数据的SQL语句必须完全一致,包括表名和列

● 使用的MappedStatement一致,即Mapper一致

@Override
public int doUpdate(MappedStatement ms, Object parameterObject) throws SQLException {
    final Configuration configuration = ms.getConfiguration();
    final StatementHandler handler = configuration.newStatementHandler(this, ms, parameterObject, RowBounds.DEFAULT, null, null);
    final BoundSql boundSql = handler.getBoundSql();
    final String sql = boundSql.getSql();
    final Statement stmt;
  // **
    if (sql.equals(currentSql) && ms.equals(currentStatement)) {
        int last = statementList.size() - 1;
        stmt = statementList.get(last);
        applyTransactionTimeout(stmt);
        handler.parameterize(stmt);//fix Issues 322
        BatchResult batchResult = batchResultList.get(last);
        batchResult.addParameterObject(parameterObject);
    } else {
        Connection connection = getConnection(ms.getStatementLog());
        stmt = handler.prepare(connection, transaction.getTimeout());
        if (stmt == null) {
            return 0;
        }
        handler.parameterize(stmt);    //fix Issues 322
        currentSql = sql;
        currentStatement = ms;
        statementList.add(stmt);
        batchResultList.add(new BatchResult(ms, sql, parameterObject));
    }
    handler.batch(stmt);
    return BATCH_UPDATE_RETURN_VALUE;
}

PreparedStatement.addBatch()

将数据添加到StatementImpl.batchedArgs中,至此第一阶段完成

public void addBatch() throws SQLException {
    synchronized (checkClosed().getConnectionMutex()) {
        if (this.batchedArgs == null) {
            this.batchedArgs = new ArrayList<Object>();
        }
         for (int i = 0; i < this.parameterValues.length; i++) {
            checkAllParametersSet(this.parameterValues[i], this.parameterStreams[i], i);
        }
         this.batchedArgs.add(new BatchParams(this.parameterValues, this.parameterStreams, 
         this.isStream, this.streamLengths, this.isNull));
    }
}

MybatisBatchExecutor.doFlushStatements()

遍历Statemen,并执行executeBatch()方法

@Override
public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
    try {
        List<BatchResult> results = new ArrayList<>();
        if (isRollback) {
            return Collections.emptyList();
        }
        for (int i = 0, n = statementList.size(); i < n; i++) {
            Statement stmt = statementList.get(i);
            applyTransactionTimeout(stmt);
            BatchResult batchResult = batchResultList.get(i);
            try {
                batchResult.setUpdateCounts(stmt.executeBatch());
                MappedStatement ms = batchResult.getMappedStatement();
                List<Object> parameterObjects = batchResult.getParameterObjects();
                KeyGenerator keyGenerator = ms.getKeyGenerator();
                if (Jdbc3KeyGenerator.class.equals(keyGenerator.getClass())) {
                    Jdbc3KeyGenerator jdbc3KeyGenerator = (Jdbc3KeyGenerator) keyGenerator;
                    jdbc3KeyGenerator.processBatch(ms, stmt, parameterObjects);
                } else if (!NoKeyGenerator.class.equals(keyGenerator.getClass())) { //issue #141
                    for (Object parameter : parameterObjects) {
                        keyGenerator.processAfter(this, ms, stmt, parameter);
                    }
                }
                // Close statement to close cursor #1109
                closeStatement(stmt);
            } catch (BatchUpdateException e) {
                StringBuilder message = new StringBuilder();
                message.append(batchResult.getMappedStatement().getId())
                    .append(" (batch index #")
                    .append(i + 1)
                    .append(")")
                    .append(" failed.");
                if (i > 0) {
                    message.append(" ")
                        .append(i)
                        .append(" prior sub executor(s) completed successfully, 
                        but will be rolled back.");
                }
                throw new BatchExecutorException(message.toString(), e, results, batchResult);
            }
            results.add(batchResult);
        }
        return results;
    } finally {
        for (Statement stmt : statementList) {
            closeStatement(stmt);
        }
        currentSql = null;
        statementList.clear();
        batchResultList.clear();
    }
}

PreparedStatement.executeBatchInternal()

最终执行批量操作的逻辑,这里会判断rewriteBatchedStatements参数

@Override
protected long[] executeBatchInternal() throws SQLException {
    synchronized (checkClosed().getConnectionMutex()) {
         if (this.connection.isReadOnly()) {
            throw new SQLException(Messages.getString("PreparedStatement.25")
             + Messages.getString("PreparedStatement.26"),
                SQLError.SQL_STATE_ILLEGAL_ARGUMENT);
        }
         if (this.batchedArgs == null || this.batchedArgs.size() == 0) {
            return new long[0];
        }
         // we timeout the entire batch, not individual statements
        int batchTimeout = this.timeoutInMillis;
        this.timeoutInMillis = 0;
         resetCancelledState();
         try {
            statementBegins();
             clearWarnings();
       // 判断rewriteBatchedStatements参数
            if (!this.batchHasPlainStatements && this.connection.getRewriteBatchedStatements())
             {
                 if (canRewriteAsMultiValueInsertAtSqlLevel()) {
                    return executeBatchedInserts(batchTimeout);
                }
                 if (this.connection.versionMeetsMinimum(4, 1, 0) 
                 && !this.batchHasPlainStatements && this.batchedArgs != null
                    && this.batchedArgs.size() > 3 /* cost of option setting rt-wise */) {
                    return executePreparedBatchAsMultiStatement(batchTimeout);
                }
            }
             return executeBatchSerially(batchTimeout);
        } finally {
            this.statementExecuting.set(false);
             clearBatch();
        }
    }
}

问题排查

在流程中几个关键节点为:

● MybatisBatchExecutor:1、执行数据分组逻辑。2、遍历Statement执行批量保存逻辑。

● StatementImpl:保存batchedArgs

● PreparedStatement:执行最终的批量保存逻辑

可以看出JDK层只执行最终的保存操作,如果这里的数据batchedArgs没有拿到批量的,那一定是MybatisPlus的分组逻辑出现问题。通过调试发现问题出现在

if (sql.equals(currentSql) && ms.equals(currentStatement))

由于有些数据有些列没有默认值导致SQL的列不同,数据被添加到不同的Statement执行,导致最终的批量操作失效。

总结

整个批量过程可以分为两个阶段:

  • 1、将批量数据添加到statementImpl.batchedArgs中保存。

  • 2、调用statement.executeBatch方法完成批量。

来看下这两步最基本的操作之上,MybatisPlus做了哪些事情:

  • 1、定义批量操作的模板。

  • 2、验证集合中的数据,将完全一致的SQL添加到同一个Statement中。

  • 3、Jdbc3KeyGenerator?

值得注意的是,批量模板中的单条新增调用的是sqlSession.insert(),这个方法是没有执行execute的,只是将数据放到statementImpl.batchedArgs中。而常规的单条新增调用的是baseMapper.insert()方法,其是基于动态代理的方法来实现。

可以看出以上流程经历了已知的四层结构:MybatisPlus–>Mybatis–>MySQL connector–>JDK。其最底层还是通过preparedStatement来实现批量操作,和我们通过原始JDBC来实现批量操作的原理相同,上层都是框架实现的封装。


相关推荐

PHP实现部分字符隐藏

沙雕mars · 1325浏览 · 2019-04-28 09:47:56
Java中ArrayList和LinkedList区别

kenrry1992 · 908浏览 · 2019-05-08 21:14:54
Tomcat 下载及安装配置

manongba · 970浏览 · 2019-05-13 21:03:56
JAVA变量介绍

manongba · 962浏览 · 2019-05-13 21:05:52
什么是SpringBoot

iamitnan · 1086浏览 · 2019-05-14 22:20:36
加载中

0评论

评论
分类专栏
小鸟云服务器
扫码进入手机网页