Spring事务监听TransactionalEventListener

2/24/2023 Spring

# @TransactionalEventListener

  • @TransactionalEventListener可以说是@EventListener的增强版,可以更好地配合数据库事务。

在项目开发过程中,我们不乏会有这样的诉求:需要在执行完数据库操作后,发送消息(比如短信、邮件、微信通知等)来执行其它的操作,而这些并不是主干业务,所以一般会放在异步线程里去执行~

关于这么执行的情况:这样可能会出现业界经典的事务提交成功后进行异步操作问题。关于问题的解决,Spring它非常友好的提供了两种解决方案来处理:

  • 1、事务同步管理器TransactionSynchronizationManager
  • 2、@TransactionalEventListener注解(需要Spring4.2+

# 来点示例

# User 对象

@Data
public class User {

 private long id;
 private String name;
 private Integer age;

}
1
2
3
4
5
6
7
8

# 业务操作

@Service
@Slf4j
public class UserServiceImpl extends implements UserService {

	@Autowired
  	UserMapper userMapper;

	@Autowired
  	ApplicationEventPublisher eventPublisher;

  	// 注册用户
	public void userRegister(User user){
		userMapper.insertUser(user);
		eventPublisher.publishEvent(new UserRegisterEvent(new Date()));
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 用户注册事件

@Getter
@Setter
public class UserRegisterEvent extends ApplicationEvent {

    private Date registerDate;

    public UserRegisterEvent(Date registerDate) {
        super(registerDate);
        this.registerDate = registerDate;
    }
}
1
2
3
4
5
6
7
8
9
10
11

# 事件监听器

@Slf4j
@Component
public class UserListener {

    @Autowired
    UserService userService;

    @Async
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, classes = UserRegisterEvent.class)
    public void onUserRegisterEvent(UserRegisterEvent event) {
        userService.sendActivationCode(event.getRegisterDate());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# TransactionalEventListener 源码

/**
 * @author Stephane Nicoll
 * @author Sam Brannen
 * @since 4.2
 */
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EventListener
public @interface TransactionalEventListener {

	// 这个注解取值有:BEFORE_COMMIT、AFTER_COMMIT、AFTER_ROLLBACK、AFTER_COMPLETION
	// 各个值都代表什么意思表达什么功能,非常清晰,下面解释了对应的枚举类~
	// 需要注意的是:AFTER_COMMIT + AFTER_COMPLETION是可以同时生效的
	// AFTER_ROLLBACK + AFTER_COMPLETION是可以同时生效的
	TransactionPhase phase() default TransactionPhase.AFTER_COMMIT;

	// 表明若没有事务的时候,对应的event是否需要执行,默认值为false表示,没事务就不执行了。
	boolean fallbackExecution() default false;

	/// 这里巧妙的用到了@AliasFor的能力,放到了@EventListener身上
	// 注意:一般建议都需要指定此值,否则默认可以处理所有类型的事件,范围太广了。
	@AliasFor(annotation = EventListener.class, attribute = "classes")
	Class<?>[] value() default {};

	@AliasFor(annotation = EventListener.class, attribute = "classes")
	Class<?>[] classes() default {};

	String condition() default "";

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

# phase 参数值

public enum TransactionPhase {

	// 指定目标方法在事务commit之前执行
  	// 除非将fallbackExecution设置为true,否则当没有处于一个事务中时,@TransactionalEventListener注册的监听方法不会被执行。
	BEFORE_COMMIT,

	// 默认,指定目标方法在事务commit之后执行
	AFTER_COMMIT,

 	// 指定目标方法在事务rollback之后执行
	AFTER_ROLLBACK,

 	// 指定目标方法在事务完成时执行,这里的完成是指无论事务是成功提交还是事务回滚了
	AFTER_COMPLETION

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 实现原理

  • Spring对事务监控的处理逻辑在TransactionSynchronization中,如下是该接口的声明
public interface TransactionSynchronization extends Flushable {

    // 在当前事务挂起时执行
    default void suspend() {
    }

    // 在当前事务重新加载时执行
    default void resume() {
    }

    // 在当前数据刷新到数据库时执行
    default void flush() {
    }

    // 在当前事务commit之前执行
    default void beforeCommit(boolean readOnly) {
    }

    // 在当前事务completion之前执行
    default void beforeCompletion() {
    }

    // 在当前事务commit之后实质性
    default void afterCommit() {
    }

    // 在当前事务completion之后执行
    default void afterCompletion(int status) {
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

这里的TransactionSynchronization接口只是抽象了一些行为,用于事务事件发生时触发,这些行为在Spring事务中提供了内在支持,即在相应的事务事件时,其会获取当前所有注册的TransactionSynchronization对象,然后调用其相应的方法。

事务标签解析时,Spring会注册一个TransactionalEventListenerFactory类型的beanSpring容器中,这里注册的TransactionalEventListenerFactory实现了EventListenerFactory接口,这个接口的主要作用是先判断目标方法是否是某个监听器的类型,然后为目标方法生成一个监听器,其会在某个bean初始化之后由Spring调用其方法用于生成监听器

/**
 * {@link EventListenerFactory} implementation that handles {@link TransactionalEventListener}
 * annotated methods.
 *
 * @author Stephane Nicoll
 * @since 4.2
 */
public class TransactionalEventListenerFactory implements EventListenerFactory, Ordered {

	private int order = 50;


	public void setOrder(int order) {
		this.order = order;
	}

	@Override
	public int getOrder() {
		return this.order;
	}

    // 指定目标方法是否是所支持的监听器的类型,这里的判断逻辑就是如果目标方法上包含有
    // TransactionalEventListener注解,则说明其是一个事务事件监听器
	@Override
	public boolean supportsMethod(Method method) {
		return AnnotatedElementUtils.hasAnnotation(method, TransactionalEventListener.class);
	}

    // 为目标方法生成一个事务事件监听器,这里ApplicationListenerMethodTransactionalAdapter实现了ApplicationEvent接口
	@Override
	public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
		return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method);
	}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

ApplicationListenerMethodTransactionalAdapter本质上是实现了ApplicationListener接口的,也就是说,其是Spring的一个事件监听器,这也就是为什么进行事务处理时需要使用ApplicationEventPublisher.publish()方法发布一下当前事务的事件。 ApplicationListenerMethodTransactionalAdapter在监听到发布的事件之后会生成一个TransactionSynchronization对象,并且将该对象注册到当前事务逻辑中

class ApplicationListenerMethodTransactionalAdapter extends ApplicationListenerMethodAdapter {

	private final TransactionalEventListener annotation;

	public ApplicationListenerMethodTransactionalAdapter(String beanName, Class<?> targetClass, Method method) {
		super(beanName, targetClass, method);
		TransactionalEventListener ann = AnnotatedElementUtils.findMergedAnnotation(method, TransactionalEventListener.class);
		if (ann == null) {
			throw new IllegalStateException("No TransactionalEventListener annotation found on method: " + method);
		}
		this.annotation = ann;
	}

	@Override
	public void onApplicationEvent(ApplicationEvent event) {
        // 如果当前TransactionManager已经配置开启事务事件监听,
		// 此时才会注册TransactionSynchronization对象
		if (TransactionSynchronizationManager.isSynchronizationActive()) {
            // 通过当前事务事件发布的参数,创建一个TransactionSynchronization对象
			TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);
            // 注册TransactionSynchronization对象到TransactionManager中
			TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
		}
		else if (this.annotation.fallbackExecution()) {
            // 如果当前TransactionManager没有开启事务事件处理,但是当前事务监听方法中配置了
   			// fallbackExecution属性为true,说明其需要对当前事务事件进行监听,无论其是否有事务
			if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {
				logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase");
			}
			processEvent(event);
		}
		else {
			// No transactional event execution at all
        	// 走到这里说明当前是不需要事务事件处理的,因而直接略过
			if (logger.isDebugEnabled()) {
				logger.debug("No transaction is active - skipping " + event);
			}
		}
	}

    // TransactionSynchronizationEventAdapter是一个内部类,它是一个TransactionSynchronization同步器
	// 此类实现也比较简单,它的order由listener.getOrder();来决定
	private TransactionSynchronization createTransactionSynchronization(ApplicationEvent event) {
		return new TransactionSynchronizationEventAdapter(this, event, this.annotation.phase());
	}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

上述annotation属性就是在事务监听方法上解析的TransactionalEventListener注解中配置的属性。可以看到,对于事务事件的处理,这里创建了一个TransactionSynchronization对象,其实主要的处理逻辑就是在返回的这个对象中,而createTransactionSynchronization()方法内部只是创建了一个TransactionSynchronizationEventAdapter对象就返回了

private static class TransactionSynchronizationEventAdapter extends TransactionSynchronizationAdapter {

	private final ApplicationListenerMethodAdapter listener;

	private final ApplicationEvent event;

	private final TransactionPhase phase;

	public TransactionSynchronizationEventAdapter(ApplicationListenerMethodAdapter listener,
			ApplicationEvent event, TransactionPhase phase) {

		this.listener = listener;
		this.event = event;
		this.phase = phase;
	}

	@Override
		public int getOrder() {
			return this.listener.getOrder();
		}

   	 // 在目标方法配置的phase属性为BEFORE_COMMIT时,处理before commit事件
    // 最终都是委托给了listenner来真正的执行处理  来执行最终处理逻辑(也就是解析classes、condtion、执行方法体等等)
	@Override
	public void beforeCommit(boolean readOnly) {
		if (this.phase == TransactionPhase.BEFORE_COMMIT) {
			processEvent();
		}
	}

    // 这里对于after completion事件的处理,虽然分为了三个if分支,但是实际上都是执行的processEvent()
    // 方法,因为after completion事件是事务事件中一定会执行的,因而这里对于commit,
    // rollback和completion事件都在当前方法中处理也是没问题的
	@Override
	public void afterCompletion(int status) {
		if (this.phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
			processEvent();
		}
		else if (this.phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) {
			processEvent();
		}
		else if (this.phase == TransactionPhase.AFTER_COMPLETION) {
			processEvent();
		}
	}
    // 执行事务事件
	protected void processEvent() {
		this.listener.processEvent(this.event);
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

可以看到,对于事务事件的处理,最终都是委托给了ApplicationListenerMethodAdapter.processEvent()方法进行的

public class ApplicationListenerMethodAdapter implements GenericApplicationListener {

	protected final Log logger = LogFactory.getLog(getClass());

	private final String beanName;

	private final Method method;

	private final Method targetMethod;

	private final AnnotatedElementKey methodKey;

	private final List<ResolvableType> declaredEventTypes;

	@Nullable
	private final String condition;

	private final int order;

	@Nullable
	private ApplicationContext applicationContext;

	@Nullable
	private EventExpressionEvaluator evaluator;


	/**
	 * Process the specified {@link ApplicationEvent}, checking if the condition
	 * match and handling non-null result, if any.
	 */
	public void processEvent(ApplicationEvent event) {
        // 处理事务事件的相关参数,这里主要是判断TransactionalEventListener注解中是否配置了value
        // 或classes属性,如果配置了,则将方法参数转换为该指定类型传给监听的方法;如果没有配置,则判断
        // 目标方法是ApplicationEvent类型还是PayloadApplicationEvent类型,是则转换为该类型传入
		Object[] args = resolveArguments(event);
        // 这里主要是获取TransactionalEventListener注解中的condition属性,然后通过
		// Spring expression language将其与目标类和方法进行匹配
		if (shouldHandle(event, args)) {
            // 通过处理得到的参数借助于反射调用事务监听方法
			Object result = doInvoke(args);
			if (result != null) {
                 // 对方法的返回值进行处理
				handleResult(result);
			}
			else {
				logger.trace("No result object given - no result to handle");
			}
		}
	}

    // 处理事务监听方法的参数
    @Nullable
	protected Object[] resolveArguments(ApplicationEvent event) {
        // 获取发布事务事件时传入的参数类型
		ResolvableType declaredEventType = getResolvableType(event);
		if (declaredEventType == null) {
			return null;
		}
        // 如果事务监听方法的参数个数为0,则直接返回
		if (this.method.getParameterCount() == 0) {
			return new Object[0];
		}
        // 如果事务监听方法的参数不为ApplicationEvent或PayloadApplicationEvent,则直接将发布事务
        // 事件时传入的参数当做事务监听方法的参数传入。从这里可以看出,如果事务监听方法的参数不是
        // ApplicationEvent或PayloadApplicationEvent类型,那么其参数必须只能有一个,并且这个
        // 参数必须与发布事务事件时传入的参数一致
		Class<?> declaredEventClass = declaredEventType.toClass();
		if (!ApplicationEvent.class.isAssignableFrom(declaredEventClass) &&
				event instanceof PayloadApplicationEvent) {
			Object payload = ((PayloadApplicationEvent) event).getPayload();
			if (declaredEventClass.isInstance(payload)) {
                // 如果参数类型为ApplicationEvent或PayloadApplicationEvent,则直接将其传入事务事件方法
				return new Object[] {payload};
			}
		}
		return new Object[] {event};
	}

    // 判断事务事件方法方法是否需要进行事务事件处理
    private boolean shouldHandle(ApplicationEvent event, @Nullable Object[] args) {
		if (args == null) {
			return false;
		}
		String condition = getCondition();
		if (StringUtils.hasText(condition)) {
			Assert.notNull(this.evaluator, "EventExpressionEvaluator must not be null");
			return this.evaluator.condition(
					condition, event, this.targetMethod, this.methodKey, args, this.applicationContext);
		}
		return true;
	}

    // 对事务事件方法的返回值进行处理,这里的处理方式主要是将其作为一个事件继续发布出去,这样就可以在
	// 一个统一的位置对事务事件的返回值进行处理
    protected void handleResult(Object result) {
        // 如果返回值是数组类型,则对数组元素一个一个进行发布
		if (result.getClass().isArray()) {
			Object[] events = ObjectUtils.toObjectArray(result);
			for (Object event : events) {
				publishEvent(event);
			}
		}
		else if (result instanceof Collection<?>) {
            // 如果返回值是集合类型,则对集合进行遍历,并且发布集合中的每个元素
			Collection<?> events = (Collection<?>) result;
			for (Object event : events) {
				publishEvent(event);
			}
		}
		else {
            // 如果返回值是一个对象,则直接将其进行发布
			publishEvent(result);
		}
	}

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117

# 总结

对于事务事件的处理,总结而言,就是为每个事务事件监听方法创建了一个TransactionSynchronizationEventAdapter对象,通过该对象在发布事务事件的时候,会在当前线程中注册该对象,这样就可以保证每个线程每个监听器中只会对应一个TransactionSynchronizationEventAdapter对象。在Spring进行事务事件的时候会调用该对象对应的监听方法,从而达到对事务事件进行监听的目的。