2019-01-29

spring farmework 是如何实现数据库的事务控制

内容包含一些对spring源码的解读,会需要对spring框架有一定的掌握能力

提示

本内容为原创内容,转发及引用请注明出处。

解决疑惑

  • 使用try catch 对事务的影响
  • 事务中能否获取新增的id
  • sql错误和程序错误是否都会影响事务

引用

关于事务(mysql)

菜鸟教程上有关于mysql事务的说明。 我就只记录一下我还不太明白的了。

事务隔离级别不可重复读表头幻读
读未提交(read-uncommitted)
不可重复读(read-committed)
可重复读(repeatable-read)
串行化(serializable)

sql测试事务 此时数据库没有记录

BEGIN;
INSERT INTO USER (`id`, `name`, `age`) VALUES(1, 'zhangsan', 22);
INSERT INTO USER (`id`, `name`, `age`) VALUES(1, 'zhangsan', 22);
COMMIT;
id	name	age
1	zhangsan	22

插入成功

BEGIN;
INSERT INTO USER (`id`, `name`, `age`) VALUES(2, 'zhangsan', 22);
INSERT INTO USER (`id`, `name`, `age`) VALUES(2, 'zhangsan', 22);
ROLLBACK;

插入失败,没有新增,但是自增id 2 下次会跳过

代码测试

使用 TRUNCATE TABLE USER 清空数据库
UserDao.java

 @Transactional
    public  void  insert(){
        String sql ="INSERT INTO USER ( NAME, age) VALUES ( ?, ?)";
        jdbcTemplate.update(sql,"zhangdan",20);
    }

执行测试,insert成功,查看数据库如下

id	name	age
1	zhangdan	20

修改UserDao.java

 @Transactional
    public  void  insert(){
        String sql ="INSERT INTO USER ( NAME, age) VALUES ( ?, ?)";
        jdbcTemplate.update(sql,"zhangdan",20);
        int i= 1/0;
    }

查看数据库,没有新增 修改UserDao.java,增加 try catch

 @Transactional
    public  void  insert(){
        String sql ="INSERT INTO USER ( NAME, age) VALUES ( ?, ?)";
        jdbcTemplate.update(sql,"zhangdan",20);
        try {
            int i= 1/0;
        }catch (Exception e){

        }
    }

查看数据库,新增成功,但id不是连续的,这样可以知道是可以获取事务中新增的id的。

id	name	age
1	zhangdan	20
3	zhangdan	20

查看源码

org.springframework.transaction.PlatformTransactionManager

public interface PlatformTransactionManager {
    //返回当前激活的事务或根据定义参数创建一个新的事务
    TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException;
    //根据给定事务的状态提交事务,如果这个事务已经以编程的方式标记为回滚,则执行回滚
    void commit(TransactionStatus status) throws TransactionException;
    //执行给定事务的回滚
    void rollback(TransactionStatus status) throws TransactionException;
}

org.springframework.jdbc.datasource.DataSourceTransactionManager

public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
		implements ResourceTransactionManager, InitializingBean {
    //ioc容器创建时会注入DataSourceTransactionManager
    public DataSourceTransactionManager(DataSource dataSource) {
		this();
		setDataSource(dataSource);
		afterPropertiesSet();
	}
    @Override
    protected Object doGetTransaction() {
        DataSourceTransactionObject txObject = new DataSourceTransactionObject();
        //是否允许嵌套事务
        txObject.setSavepointAllowed(isNestedTransactionAllowed());
        //去线程获取ConnectionHolder(保存在 ThreadLocal<Map<Object, Object>> resources),此时为null
        ConnectionHolder conHolder =
                (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
        txObject.setConnectionHolder(conHolder, false);
        return txObject;
    }

}

这个ThreadLocal中存放的ConnectionHolder在 org.springframework.jdbc.datasource.doBegin

protected void doBegin(Object transaction, TransactionDefinition definition){
    ...
    if (!txObject.hasConnectionHolder() ||
            txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
        //创建一个新的连接
        Connection newCon = this.dataSource.getConnection();
        if (logger.isDebugEnabled()) {
            logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
        }
        txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
    }
    ...
    if (txObject.isNewConnectionHolder()) {
        //存放到ThreadLocal中
        TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
    }
    ...
}

getTransaction调用堆栈

看调用的 UserDao 是cglib生成的代理对象 org.springframework.aop.framework.CglibAopProxy

public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
			Object oldProxy = null;
			boolean setProxyContext = false;
			Class<?> targetClass = null;
			Object target = null;
			try {
                //在该线程内暴露proxy代理对象 
				if (this.advised.exposeProxy) {
					// Make invocation available if necessary.
					oldProxy = AopContext.setCurrentProxy(proxy);
					setProxyContext = true;
				}
				// May be null. Get as late as possible to minimize the time we
				// "own" the target, in case it comes from a pool...
                //获取到需要代理的目标对象
				target = getTarget();
				if (target != null) {
					targetClass = target.getClass();
				}
                //根据目标类和方法找到对应的拦截器链 
                //这些拦截器链是通过xml或注解配置的
				List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
				Object retVal;
				// Check whether we only have one InvokerInterceptor: that is,
				// no real advice, but just reflective invocation of the target.
				if (chain.isEmpty() && Modifier.isPublic(method.getModifiers())) {
					// We can skip creating a MethodInvocation: just invoke the target directly.
					// Note that the final invoker must be an InvokerInterceptor, so we know
					// it does nothing but a reflective operation on the target, and no hot
					// swapping or fancy proxying.
					Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
					retVal = methodProxy.invoke(target, argsToUse);
				}
				else {
					//创建一个MethodInterceptor,并执行,见下一段代码
					retVal = new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed();
				}
				retVal = processReturnType(proxy, target, method, retVal);
				return retVal;
			}
			finally {
				if (target != null) {
					releaseTarget(target);
				}
				if (setProxyContext) {
					// Restore old proxy.
					AopContext.setCurrentProxy(oldProxy);
				}
			}
		}

org.springframework.aop.framework.ReflectiveMethodInvocation

@Override
	public Object proceed() throws Throwable {
		//	We start with an index of -1 and increment early.
		if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
			return invokeJoinpoint();
		}

		Object interceptorOrInterceptionAdvice =
				this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
        //如果是动态匹配方法拦截器
		if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {
			// Evaluate dynamic method matcher here: static part will already have
			// been evaluated and found to match.
			InterceptorAndDynamicMethodMatcher dm =
					(InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice;
			if (dm.methodMatcher.matches(this.method, this.targetClass, this.arguments)) {
				return dm.interceptor.invoke(this);
			}
			else {
				// Dynamic matching failed.
				// Skip this interceptor and invoke the next in the chain.
				return proceed();//不是则继续递归
			}
		}
		else {
			// It's an interceptor, so we just invoke it: The pointcut will have
			// been evaluated statically before this object was constructed.
			return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
		}
	}

org.springframework.transaction.interceptor.TransactionAspectSupport

protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
			throws Throwable {

    // 读取事务的配置
    final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
    // 事务处理器
    final PlatformTransactionManager tm = determineTransactionManager(txAttr);
    // 代理连接点 UserDao.insert方法的全限定名称
    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

    if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
        // 会调用事务处理器,生成一个事务
        TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
        Object retVal = null;
        try {
            // 这是一个环绕通知,调用拦截器链中的下一个拦截器
            // 这里才会开始调用 UserDao.insert 方法,当执行到 1/0 时报错,会进入catch
            retVal = invocation.proceedWithInvocation();
        }
        catch (Throwable ex) {//这也是为什么insert方法中如果使用try catch不会回滚的原因,此处无法捕获到异常
            // 调用下面列出的方法 completeTransactionAfterThrowing
            completeTransactionAfterThrowing(txInfo, ex);
            throw ex;
        }
        finally {
            cleanupTransactionInfo(txInfo);
        }
        commitTransactionAfterReturning(txInfo);
        return retVal;
    }
    ....
}

org.springframework.jdbc.core.JdbcTemplate

@Override
public <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action)
        throws DataAccessException {

    Assert.notNull(psc, "PreparedStatementCreator must not be null");
    Assert.notNull(action, "Callback object must not be null");
    if (logger.isDebugEnabled()) {
        String sql = getSql(psc);
        logger.debug("Executing prepared SQL statement" + (sql != null ? " [" + sql + "]" : ""));
    }
    //获取数据库连接
    Connection con = DataSourceUtils.getConnection(getDataSource());
    PreparedStatement ps = null;
    try {
        Connection conToUse = con;
        if (this.nativeJdbcExtractor != null &&
                this.nativeJdbcExtractor.isNativeConnectionNecessaryForNativePreparedStatements()) {
            conToUse = this.nativeJdbcExtractor.getNativeConnection(con);
        }
        ps = psc.createPreparedStatement(conToUse);
        applyStatementSettings(ps);
        PreparedStatement psToUse = ps;
        if (this.nativeJdbcExtractor != null) {
            psToUse = this.nativeJdbcExtractor.getNativePreparedStatement(ps);
        }
        T result = action.doInPreparedStatement(psToUse);
        handleWarnings(ps);
        return result;
    }
    catch (SQLException ex) {
        // Release Connection early, to avoid potential connection pool deadlock
        // in the case when the exception translator hasn't been initialized yet.
        if (psc instanceof ParameterDisposer) {
            ((ParameterDisposer) psc).cleanupParameters();
        }
        String sql = getSql(psc);
        psc = null;
        JdbcUtils.closeStatement(ps);
        ps = null;
        DataSourceUtils.releaseConnection(con, getDataSource());
        con = null;
        throw getExceptionTranslator().translate("PreparedStatementCallback", sql, ex);
    }
    finally {
        if (psc instanceof ParameterDisposer) {
            ((ParameterDisposer) psc).cleanupParameters();
        }
        JdbcUtils.closeStatement(ps);
        DataSourceUtils.releaseConnection(con, getDataSource());
    }
}

org.springframework.transaction.interceptor

protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) {
		if (txInfo != null && txInfo.hasTransaction()) {
			if (logger.isTraceEnabled()) {
				logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
						"] after exception: " + ex);
			}
			if (txInfo.transactionAttribute.rollbackOn(ex)) {
				try {
                    // 执行事务管理器的rollback方法
					txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
				}
				catch (TransactionSystemException ex2) {
					logger.error("Application exception overridden by rollback exception", ex);
					ex2.initApplicationException(ex);
					throw ex2;
				}
				catch (RuntimeException ex2) {
					logger.error("Application exception overridden by rollback exception", ex);
					throw ex2;
				}
				catch (Error err) {
					logger.error("Application exception overridden by rollback error", ex);
					throw err;
				}
			}
		....
		}
	}

org.springframework.jdbc.datasource 最后执行roolback

protected void doRollback(DefaultTransactionStatus status) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
    // 获取事务中保存的ConnectionHolder,然后打开连接 ThreadLocal
    Connection con = txObject.getConnectionHolder().getConnection();
    if (status.isDebug()) {
        logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
    }
    try {
        // 执行rollback
        con.rollback();
    }
    catch (SQLException ex) {
        throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
    }
}