seata源码解析:seata server各种消息处理流程_seata关闭channel-程序员宅基地

技术标签: 分布式事务  

请添加图片描述

seata-server消息处理流程

上一篇文章我们分析了seata-server端启动流程。本篇文章我们来看seata-server消息处理流程。
请添加图片描述
seata中有一个全局事务协调器DefaultCoordinator,它主要是处理来自RM和TM的请求来做相应的操作,但是实际的执行者并不是DefaultCoordinator,而是DefaultCore

DefaultCore的继承关系如下图,从继承图中我们可以看到其实Core类的实现类才是一个事务管理器。在seata中有4种事务管理模式,所以每种模式有一个具体的事务管理器。

请添加图片描述
而DefaultCore则是聚合了4种具体的事务管理器,根据事务的不同类型调用不同的事务管理器。组件的关系如下图
请添加图片描述
所以事务协调的主要工作就是接受请求然后调用事务管理器进行相应的操作

事务协调器接收请求

之前我们说到,所有的消息都会交给AbstractNettyRemotingServer.ServerHandler来处理,而AbstractNettyRemotingServer.ServerHandler根据消息的不同类型,交给不同的RemotingProcessor来处理

在这里插入图片描述
所以我们对那种消息感兴趣只需要看对应的RemotingProcessor实现类即可,我们挑几个常见的消息分析以下,思路都差不多。

事务管理器执行操作

RegRmProcessor和RegTmProcessor

tm和rm这部分注册代码看的我有点晕(不重要就没耐心看下去),主要作用就是在tc保存tm和rm的长连接,当tc需要往tm和rm发送消息的时候,就从ChannelManager中找到对应的长连接,然后发送消息

各模式中rm注册的时机如下

xa模式:构建DataSourceProxyXA
at模式:构建DataSourceProxy
tcc模式:GlobalTransactionScanner(Bean初始化后阶段),生成代理对象的时候判定这个方法是tcc的prepare方法

ServerOnRequestProcessor

在TC端,全局事务的状态被保存在GlobalSession对象中,分支事务的状态被保存在BranchSession中

ServerOnRequestProcessor处理消息的公共流程为

  1. 对应的channel是否注册过,没注册过直接关闭连接,否则到第二步
  2. 针对不同的消息交给DefaultCoordinator类的不同方法来处理,并返回结果
开启全局事务
// DefaultCoordinator
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
    throws TransactionException {
    
    response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
        request.getTransactionName(), request.getTimeout()));
    if (LOGGER.isInfoEnabled()) {
    
        LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",
            rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
    }
}

消息的接收是通过DefaultCoordinator,然后交给DefaultCore来执行对应的操作,DefaultCore生成xid并返回

// DefaultCore
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    throws TransactionException {
    
    // 创建一个 GlobalSession
    GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
        timeout);
    MDC.put(RootContext.MDC_KEY_XID, session.getXid());
    // 将 ROOT_SESSION_MANAGER 加入到这个 GlobalSession 的监听器列表中
    session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
 
    // 开启 GlobalSession
    session.begin();

    // 发布事件,如果你对这个事件感兴趣,可以注册这个事件
    // transaction start event
    eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
        session.getTransactionName(), applicationId, transactionServiceGroup, session.getBeginTime(), null, session.getStatus()));

    // 返回 xid
    return session.getXid();
}

可以只返回一个xid,xid由DefaultCore#begin方法生成,xid的生成策略如下

// seata server ip地址 + seata server 端口号 + 雪花算法生成的唯一id
ipAddress + ":" + port + ":" + tranId;

从GlobalSession#begin方法可以看到GlobalSession用到了观察者模式,当GlobalSession的状态发生变更时,会通过给相应的观察者,观察者都是SessionManager,当接收到相应的事件后,将变更的状态进行持久化存储,当使用db模式存储时,这里会在global_table中插入一条记录。

// GlobalSession
public void begin() throws TransactionException {
    
    this.status = GlobalStatus.Begin;
    this.beginTime = System.currentTimeMillis();
    this.active = true;
    for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
    
        lifecycleListener.onBegin(this);
    }
}
注册分支事务
// AbstractCore
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
                           String applicationData, String lockKeys) throws TransactionException {
    
    // 根据 xid 从 SessionManager 中获取到 GlobalSession
    GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
    return SessionHolder.lockAndExecute(globalSession, () -> {
    
        globalSessionStatusCheck(globalSession);
        globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
        // 创建新的分支事务即 branchSession
        BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
                applicationData, lockKeys, clientId);
        MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));
        // 对分支事务需要的资源加锁,加锁的逻辑在别的文章详解
        branchSessionLock(globalSession, branchSession);
        try {
    
            // 将 branchSession 加到 globalSession 的属性中
            globalSession.addBranch(branchSession);
        } catch (RuntimeException ex) {
    
            branchSessionUnlock(branchSession);
            throw new BranchTransactionException(FailedToAddBranch, String
                    .format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),
                            branchSession.getBranchId()), ex);
        }
        if (LOGGER.isInfoEnabled()) {
    
            LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}",
                globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);
        }
        return branchSession.getBranchId();
    });
}
  1. 根据 xid 从 SessionManager 中获取到 GlobalSession
  2. 创建新的分支事务即 BranchSession
  3. 将 branchSession 加到 globalSession 的属性中,此时GlobalSession会发布分支事务注册事件,SessionManager 收到事件后会在 branch_table 中插入一条记录

注意:AT模式下,当分支事务注册的时候,会将修改的数据加锁,如果加锁失败,则抛出异常

提交全局事务
// DefaultCore
public GlobalStatus commit(String xid) throws TransactionException {
    
    // 根据xid找到全局事务对象GlobalSession
    GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
    if (globalSession == null) {
    
        // 已经被commit过了,直接返回成功
        return GlobalStatus.Finished;
    }
    // 添加监听器
    globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
    // just lock changeStatus

    boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
    
        // Highlight: Firstly, close the session, then no more branch can be registered.
        // 关闭 GlobalSession 防止再次有新的 BranchSession 注册进来
        globalSession.closeAndClean();
        if (globalSession.getStatus() == GlobalStatus.Begin) {
    
            // 判断是否可以异步提交
            // 目前只有at模式可以异步提交,因为是通过undolog的方式去做的
            if (globalSession.canBeCommittedAsync()) {
    
                globalSession.asyncCommit();
                return false;
            } else {
    
                globalSession.changeStatus(GlobalStatus.Committing);
                return true;
            }
        }
        return false;
    });

    // 同步提交
    // XA/TCC只能同步提交
    if (shouldCommit) {
    
        boolean success = doGlobalCommit(globalSession, false);
        //If successful and all remaining branches can be committed asynchronously, do async commit.
        if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {
    
            globalSession.asyncCommit();
            return GlobalStatus.Committed;
        } else {
    
            return globalSession.getStatus();
        }
    } else {
    
        // 异步提交
        // 只有AT模式能异步提交
        return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();
    }
}
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
    
    boolean success = true;
    // start committing event
    // 发布事件
    eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
        globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),
        globalSession.getBeginTime(), null, globalSession.getStatus()));

    if (globalSession.isSaga()) {
    
        success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
    } else {
    
        // 取出所有的分支事务,然后提交
        Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {
    
            // if not retrying, skip the canBeCommittedAsync branches
            if (!retrying && branchSession.canBeCommittedAsync()) {
    
                return CONTINUE;
            }

            BranchStatus currentStatus = branchSession.getStatus();
            // 一阶段失败
            if (currentStatus == BranchStatus.PhaseOne_Failed) {
    
                globalSession.removeBranch(branchSession);
                return CONTINUE;
            }
            try {
    
                BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);

                switch (branchStatus) {
    
                    case PhaseTwo_Committed:
                        globalSession.removeBranch(branchSession);
                        return CONTINUE;
                    case PhaseTwo_CommitFailed_Unretryable:
                        if (globalSession.canBeCommittedAsync()) {
    
                            LOGGER.error(
                                "Committing branch transaction[{}], status: PhaseTwo_CommitFailed_Unretryable, please check the business log.", branchSession.getBranchId());
                            return CONTINUE;
                        } else {
    
                            // 分支事务,不能异步提交,并且还不重试,全局事务执行失败
                            SessionHelper.endCommitFailed(globalSession);
                            LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());
                            return false;
                        }
                    default:
                        // 当前是否正在重试
                        // retrying=true,说明是从重试队列进来的任务,不用再往重试队列放了
                        if (!retrying) {
    
                            globalSession.queueToRetryCommit();
                            return false;
                        }
                        if (globalSession.canBeCommittedAsync()) {
    
                            LOGGER.error("Committing branch transaction[{}], status:{} and will retry later",
                                branchSession.getBranchId(), branchStatus);
                            return CONTINUE;
                        } else {
    
                            LOGGER.error(
                                "Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId());
                            return false;
                        }
                }
            } catch (Exception ex) {
    
                StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}",
                    new String[] {
    branchSession.toString()});
                if (!retrying) {
    
                    globalSession.queueToRetryCommit();
                    throw new TransactionException(ex);
                }
            }
            return CONTINUE;
        });
        // Return if the result is not null
        // result 不为null 则为 false
        if (result != null) {
    
            return result;
        }
        //If has branch and not all remaining branches can be committed asynchronously,
        //do print log and return false
        // 有分支事务,并且不允许异步提交,说明失败了
        if (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {
    
            LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());
            return false;
        }
    }
    //If success and there is no branch, end the global transaction.
    // 分支事务全部提交成功了
    if (success && globalSession.getBranchSessions().isEmpty()) {
    
        // 全局事务状态改为已提交
        SessionHelper.endCommitted(globalSession);

        // committed event
        eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
            globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),
            globalSession.getBeginTime(), System.currentTimeMillis(), globalSession.getStatus()));

        LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());
    }
    return success;
}

可以看到AT模式可以异步提交,因为AT模式全局提交只是删除undoLog,异步提交可以提高执行效率。而其他模式得同步提交,依次向RM发送分支事务提交请求,当所有分支事务都执行成功后,全局事务提交成功。否则,将任务交给管理重试的SessionManager进行重试

全局事务的提交和回滚逻辑差不多,回滚逻辑就不分析了

ServerOnResponseProcessor

当我们需要进行全局提交时,需要向各个RM发送对应的请求,注意发送的是同步请求,阻塞获取结果。

实现思路主要是如下一个map

// 消息id -> 消息对应的MessageFuture
ConcurrentMap<Integer, MessageFuture> futures

每个消息有一个消息id,当发送的时候给每条消息创建一个MessageFuture,放在futures中,然后这个MessageFuture(底层其实就是CompletableFuture)阻塞获取结果

而ServerOnResponseProcessor则是用来接收分支提交(请求和响应对应的消息id是一样的),当收到结果后,设置消息对应的MessageFuture为完成,此时阻塞的同步请求就能获取到结果了

请添加图片描述

public class ServerOnResponseProcessor implements RemotingProcessor {
    

    @Override
    public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
    
        // 根据消息id找到对应的MessageFuture
        MessageFuture messageFuture = futures.remove(rpcMessage.getId());
        if (messageFuture != null) {
    
            messageFuture.setResultMessage(rpcMessage.getBody());
        } else {
    
            // 没有找到对应的消息发送记录
            // 删除部分代码
        }
    }
}

参考博客

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/zzti_erlie/article/details/120894915

智能推荐

分布式光纤传感器的全球与中国市场2022-2028年:技术、参与者、趋势、市场规模及占有率研究报告_预计2026年中国分布式传感器市场规模有多大-程序员宅基地

文章浏览阅读3.2k次。本文研究全球与中国市场分布式光纤传感器的发展现状及未来发展趋势,分别从生产和消费的角度分析分布式光纤传感器的主要生产地区、主要消费地区以及主要的生产商。重点分析全球与中国市场的主要厂商产品特点、产品规格、不同规格产品的价格、产量、产值及全球和中国市场主要生产商的市场份额。主要生产商包括:FISO TechnologiesBrugg KabelSensor HighwayOmnisensAFL GlobalQinetiQ GroupLockheed MartinOSENSA Innovati_预计2026年中国分布式传感器市场规模有多大

07_08 常用组合逻辑电路结构——为IC设计的延时估计铺垫_基4布斯算法代码-程序员宅基地

文章浏览阅读1.1k次,点赞2次,收藏12次。常用组合逻辑电路结构——为IC设计的延时估计铺垫学习目的:估计模块间的delay,确保写的代码的timing 综合能给到多少HZ,以满足需求!_基4布斯算法代码

OpenAI Manager助手(基于SpringBoot和Vue)_chatgpt网页版-程序员宅基地

文章浏览阅读3.3k次,点赞3次,收藏5次。OpenAI Manager助手(基于SpringBoot和Vue)_chatgpt网页版

关于美国计算机奥赛USACO,你想知道的都在这_usaco可以多次提交吗-程序员宅基地

文章浏览阅读2.2k次。USACO自1992年举办,到目前为止已经举办了27届,目的是为了帮助美国信息学国家队选拔IOI的队员,目前逐渐发展为全球热门的线上赛事,成为美国大学申请条件下,含金量相当高的官方竞赛。USACO的比赛成绩可以助力计算机专业留学,越来越多的学生进入了康奈尔,麻省理工,普林斯顿,哈佛和耶鲁等大学,这些同学的共同点是他们都参加了美国计算机科学竞赛(USACO),并且取得过非常好的成绩。适合参赛人群USACO适合国内在读学生有意向申请美国大学的或者想锻炼自己编程能力的同学,高三学生也可以参加12月的第_usaco可以多次提交吗

MySQL存储过程和自定义函数_mysql自定义函数和存储过程-程序员宅基地

文章浏览阅读394次。1.1 存储程序1.2 创建存储过程1.3 创建自定义函数1.3.1 示例1.4 自定义函数和存储过程的区别1.5 变量的使用1.6 定义条件和处理程序1.6.1 定义条件1.6.1.1 示例1.6.2 定义处理程序1.6.2.1 示例1.7 光标的使用1.7.1 声明光标1.7.2 打开光标1.7.3 使用光标1.7.4 关闭光标1.8 流程控制的使用1.8.1 IF语句1.8.2 CASE语句1.8.3 LOOP语句1.8.4 LEAVE语句1.8.5 ITERATE语句1.8.6 REPEAT语句。_mysql自定义函数和存储过程

半导体基础知识与PN结_本征半导体电流为0-程序员宅基地

文章浏览阅读188次。半导体二极管——集成电路最小组成单元。_本征半导体电流为0

随便推点

【Unity3d Shader】水面和岩浆效果_unity 岩浆shader-程序员宅基地

文章浏览阅读2.8k次,点赞3次,收藏18次。游戏水面特效实现方式太多。咱们这边介绍的是一最简单的UV动画(无顶点位移),整个mesh由4个顶点构成。实现了水面效果(左图),不动代码稍微修改下参数和贴图可以实现岩浆效果(右图)。有要思路是1,uv按时间去做正弦波移动2,在1的基础上加个凹凸图混合uv3,在1、2的基础上加个水流方向4,加上对雾效的支持,如没必要请自行删除雾效代码(把包含fog的几行代码删除)S..._unity 岩浆shader

广义线性模型——Logistic回归模型(1)_广义线性回归模型-程序员宅基地

文章浏览阅读5k次。广义线性模型是线性模型的扩展,它通过连接函数建立响应变量的数学期望值与线性组合的预测变量之间的关系。广义线性模型拟合的形式为:其中g(μY)是条件均值的函数(称为连接函数)。另外,你可放松Y为正态分布的假设,改为Y 服从指数分布族中的一种分布即可。设定好连接函数和概率分布后,便可以通过最大似然估计的多次迭代推导出各参数值。在大部分情况下,线性模型就可以通过一系列连续型或类别型预测变量来预测正态分布的响应变量的工作。但是,有时候我们要进行非正态因变量的分析,例如:(1)类别型.._广义线性回归模型

HTML+CSS大作业 环境网页设计与实现(垃圾分类) web前端开发技术 web课程设计 网页规划与设计_垃圾分类网页设计目标怎么写-程序员宅基地

文章浏览阅读69次。环境保护、 保护地球、 校园环保、垃圾分类、绿色家园、等网站的设计与制作。 总结了一些学生网页制作的经验:一般的网页需要融入以下知识点:div+css布局、浮动、定位、高级css、表格、表单及验证、js轮播图、音频 视频 Flash的应用、ul li、下拉导航栏、鼠标划过效果等知识点,网页的风格主题也很全面:如爱好、风景、校园、美食、动漫、游戏、咖啡、音乐、家乡、电影、名人、商城以及个人主页等主题,学生、新手可参考下方页面的布局和设计和HTML源码(有用点赞△) 一套A+的网_垃圾分类网页设计目标怎么写

C# .Net 发布后,把dll全部放在一个文件夹中,让软件目录更整洁_.net dll 全局目录-程序员宅基地

文章浏览阅读614次,点赞7次,收藏11次。之前找到一个修改 exe 中 DLL地址 的方法, 不太好使,虽然能正确启动, 但无法改变 exe 的工作目录,这就影响了.Net 中很多获取 exe 执行目录来拼接的地址 ( 相对路径 ),比如 wwwroot 和 代码中相对目录还有一些复制到目录的普通文件 等等,它们的地址都会指向原来 exe 的目录, 而不是自定义的 “lib” 目录,根本原因就是没有修改 exe 的工作目录这次来搞一个启动程序,把 .net 的所有东西都放在一个文件夹,在文件夹同级的目录制作一个 exe._.net dll 全局目录

BRIEF特征点描述算法_breif description calculation 特征点-程序员宅基地

文章浏览阅读1.5k次。本文为转载,原博客地址:http://blog.csdn.net/hujingshuang/article/details/46910259简介 BRIEF是2010年的一篇名为《BRIEF:Binary Robust Independent Elementary Features》的文章中提出,BRIEF是对已检测到的特征点进行描述,它是一种二进制编码的描述子,摈弃了利用区域灰度..._breif description calculation 特征点

房屋租赁管理系统的设计和实现,SpringBoot计算机毕业设计论文_基于spring boot的房屋租赁系统论文-程序员宅基地

文章浏览阅读4.1k次,点赞21次,收藏79次。本文是《基于SpringBoot的房屋租赁管理系统》的配套原创说明文档,可以给应届毕业生提供格式撰写参考,也可以给开发类似系统的朋友们提供功能业务设计思路。_基于spring boot的房屋租赁系统论文