博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊reactor异步线程的变量传递
阅读量:5822 次
发布时间:2019-06-18

本文共 5291 字,大约阅读时间需要 17 分钟。

  hot3.png

本文主要研究下reactor异步线程的变量传递

threadlocal的问题

在传统的请求/应答同步模式中,使用threadlocal来传递上下文变量是非常方便的,可以省得在每个方法参数添加公用的变量,比如当前登录用户。但是业务方法可能使用了async或者在其他线程池中异步执行,这个时候threadlocal的作用就失效了。

这个时候的解决办法就是采取propagation模式,即在同步线程与异步线程衔接处传播这个变量。

TaskDecorator

比如spring就提供了TaskDecorator,通过实现这个接口,可以自己控制传播那些变量。例如:

class MdcTaskDecorator implements TaskDecorator {   @Override  public Runnable decorate(Runnable runnable) {    // Right now: Web thread context !    // (Grab the current thread MDC data)    Map
contextMap = MDC.getCopyOfContextMap(); return () -> { try { // Right now: @Async thread context ! // (Restore the Web thread context's MDC data) MDC.setContextMap(contextMap); runnable.run(); } finally { MDC.clear(); } }; }}

这里注意在finally里头clear

配置这个taskDecorator

@EnableAsync@Configurationpublic class AsyncConfig implements AsyncConfigurer {   @Override  public Executor getAsyncExecutor() {    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();    executor.setTaskDecorator(new MdcTaskDecorator());    executor.initialize();    return executor;  }}

完整实例详见[Spring 4.3: Using a TaskDecorator to copy MDC data to threads]()

Reactor Context

spring5引入webflux,其底层是基于reactor,那么reactor如何进行上下文变量的传播呢?官方提供了Context对象来替代threadlocal。

其特性如下:

  • 类似map的kv操作,比如put(Object key, Object value),putAll(Context), hasKey(Object key)
  • immutable,即同一个key,后面put不会覆盖
  • 提供getOrDefault,getOrEmpty方法
  • Context与作用链上的每个Subscriber绑定
  • 通过subscriberContext(Context)来访问
  • Context的作用是自底向上

实例

设置及读取

@Test    public void testSubscriberContext(){        String key = "message";        Mono
r = Mono.just("Hello") .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key))) .subscriberContext(ctx -> ctx.put(key, "World")); StepVerifier.create(r) .expectNext("Hello World") .verifyComplete(); }

这里从最底部的subscriberContext设置message值为World,然后flatMap里头通过subscriberContext来访问。

自底向上

@Test    public void testContextSequence(){        String key = "message";        Mono
r = Mono.just("Hello") //NOTE 这个subscriberContext设置的太高了 .subscriberContext(ctx -> ctx.put(key, "World")) .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.getOrDefault(key, "Stranger"))); StepVerifier.create(r) .expectNext("Hello Stranger") .verifyComplete(); }

由于这个例子的subscriberContext设置的太高了,不能作用在flatMap里头的Mono.subscriberContext()

不可变

@Test    public void testContextImmutable(){        String key = "message";        Mono
r = Mono.subscriberContext() .map( ctx -> ctx.put(key, "Hello")) //这里返回了一个新的,因此上面的设置失效了 .flatMap( ctx -> Mono.subscriberContext()) .map( ctx -> ctx.getOrDefault(key,"Default")); StepVerifier.create(r) .expectNext("Default") .verifyComplete(); }

subscriberContext永远返回一个新的

多个连续的subscriberContext

@Test    public void testReadOrder(){        String key = "message";        Mono
r = Mono.just("Hello") .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key))) .subscriberContext(ctx -> ctx.put(key, "Reactor")) .subscriberContext(ctx -> ctx.put(key, "World")); StepVerifier.create(r) .expectNext("Hello Reactor") .verifyComplete(); }

operator只会读取离它最近的一个context

flatMap间的subscriberContext

@Test    public void testContextBetweenFlatMap(){        String key = "message";        Mono
r = Mono.just("Hello") .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key))) .subscriberContext(ctx -> ctx.put(key, "Reactor")) .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key))) .subscriberContext(ctx -> ctx.put(key, "World")); StepVerifier.create(r) .expectNext("Hello Reactor World") .verifyComplete(); }

flatMap读取离它最近的context

flatMap中的subscriberContext

@Test    public void testContextInFlatMap(){        String key = "message";        Mono
r = Mono.just("Hello") .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key)) ) .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key)) .subscriberContext(ctx -> ctx.put(key, "Reactor")) ) .subscriberContext(ctx -> ctx.put(key, "World")); StepVerifier.create(r) .expectNext("Hello World Reactor") .verifyComplete(); }

这里第一个flatMap无法读取第二个flatMap内部的context

小结

reactor通过提供Context来实现了类似同步线程threadlocal的功能,非常强大,值得好好琢磨。

doc

  • [Spring 4.3: Using a TaskDecorator to copy MDC data to threads]()
  • [Spring Security Context Propagation with ]()
  • [Context Aware Java Executor and Spring's ]()

转载于:https://my.oschina.net/go4it/blog/1636173

你可能感兴趣的文章
前端将markdown转换成html
查看>>
[CLPR] 用于加速训练神经网络的二阶方法
查看>>
Flannel网络配置
查看>>
使用VSTO读取WORD表格中的单元格内容的问题
查看>>
CUDA学习笔记(三)
查看>>
迅速搭建简易静态服务器
查看>>
啦啦啦~
查看>>
【转】Win7双击文件夹打开新窗口
查看>>
01进程
查看>>
VUE中使用sass
查看>>
Uva 10534 波浪子序列
查看>>
XML 命名空间
查看>>
LeetCode – Refresh – Permutations II
查看>>
zabbix3.0.4使用shell脚本和zabbix自带模板两种方法添加对指定进程和端口的监控
查看>>
zabbix系列(八)zabbix添加对web页面url的状态监控
查看>>
mysql无法输入中文排错
查看>>
咱也来谈谈web打印快递单及经验
查看>>
POJ3253 Fence Repair【哈夫曼树+优先队列】
查看>>
HDU2551 竹青遍野【数学计算+水题】
查看>>
关于Netty的疑问
查看>>