SRS源码分析-协程相关类_srs协程学习-程序员宅基地

技术标签: SRS  

SRS中使用协程库state-thread(ST), 在使用时对其进行了封装,保证使用方便。这种封装方法和使用thread库比较类似。

SrsEndlessThread

用于创建一个永不退出的协程,生命周期和整个程序一样。使用时需要继承ISrsEndlessThreadHandler方法,并在构造函数中创建SrsEndlessThread,重写cycle方法。

使用时执行流程:
SRSEndlessThread使用时执行流程图

 //永不退出协程的处理类
class ISrsEndlessThreadHandler
{
public:
    ISrsEndlessThreadHandler();
    virtual ~ISrsEndlessThreadHandler();
public:
    virtual int cycle() = 0;
public:
    virtual void on_thread_start();
    virtual int on_before_cycle();
    virtual int on_end_cycle();
    virtual void on_thread_stop();
};

//永不退出的协程
class SrsEndlessThread : public internal::ISrsThreadHandler
{
private:
    internal::SrsThread* pthread; //包含一个协程封装类
    ISrsEndlessThreadHandler* handler; //协程处理类
public:
    SrsEndlessThread(const char* n, ISrsEndlessThreadHandler* h);
    virtual ~SrsEndlessThread();
public:
    virtual int start(); //启动
public:
    virtual int cycle(); //执行的循环
    virtual void on_thread_start();
    virtual int on_before_cycle();
    virtual int on_end_cycle();
    virtual void on_thread_stop();
};
SrsOneCycleThread

用于创建一次循环的thread,在cycle函数执行完后,调用stop_loop退出。执行流程和SrsEndlessThread类似。

//执行完handler->cycle()后,调用stop_loop,退出
int SrsOneCycleThread::cycle()
{
    int ret = handler->cycle();
    pthread->stop_loop();
    return ret;
}
SrsReusableThread

可以重复使用的thread, 其他类继承ISrsReusableThreadHandler,并包含SrsReusableThread的变量,start函数启动线程,stop函数停止线程。重写cycle函数

class ISrsReusableThreadHandler
{
public:
    ISrsReusableThreadHandler();
    virtual ~ISrsReusableThreadHandler();
public:
    virtual int cycle() = 0;
public:
    virtual void on_thread_start();
    virtual int on_before_cycle();
    virtual int on_end_cycle();
    virtual void on_thread_stop();
};
class SrsReusableThread : public internal::ISrsThreadHandler
{
private:
    internal::SrsThread* pthread;
    ISrsReusableThreadHandler* handler;
public:
    SrsReusableThread(const char* n, ISrsReusableThreadHandler* h, int64_t interval_us = 0);
    virtual ~SrsReusableThread();
public:
    virtual int start();
    virtual void stop();
public:
    virtual int cid();
// interface internal::ISrsThreadHandler
public:
    virtual int cycle();
    virtual void on_thread_start();
    virtual int on_before_cycle();
    virtual int on_end_cycle();
    virtual void on_thread_stop();
};
//可以调用stop
void SrsReusableThread::stop()
{
    pthread->stop();
}
SrsReusableThread2

和SrsReusableThread区别是:在线程cycle里有内部循环,需要判断interrupt状态,如果内部loop想要退出线程,应该interrupt该线程。

//多了interrupt
void SrsReusableThread2::interrupt()
{
    pthread->stop_loop();
}

bool SrsReusableThread2::interrupted()
{
    return !pthread->can_loop();
}
SrsThread
/*
 * 线程处理类,定制线程启动的回调函数
 * */
class ISrsThreadHandler
{
public:
    ISrsThreadHandler();
    virtual ~ISrsThreadHandler();
public:
    virtual void on_thread_start(); //线程启动
    virtual int on_before_cycle(); //cycle前
    virtual int cycle() = 0; //cycle
    virtual int on_end_cycle(); //cycle后
    virtual void on_thread_stop(); //stop时
};
 /*
  * 协程的封装,作为内部使用的类
  * */
class SrsThread
{
private:
    st_thread_t tid; //tid
    int _cid; //cid
    bool loop; //是否支持loop
    bool can_run; //是否能run
    bool really_terminated; //是否terminate
    bool _joinable; //是否joinable
    const char* _name; //协程名字
    bool disposed; //是否dispose
private:
    ISrsThreadHandler* handler; //回调处理
    int64_t cycle_interval_us; //循环时间us
public:
     //初始化协程
    SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_t interval_us, bool joinable);
    virtual ~SrsThread();
public:
    virtual int cid(); //获取cid
    virtual int start(); //启动线程
    virtual void stop(); //暂停线程
public:
    virtual bool can_loop(); //是否能loop
    virtual void stop_loop(); //停止loop
private:
    virtual void dispose(); //释放
    virtual void thread_cycle(); //线程循环
    static void* thread_fun(void* arg); //线程循环调用的函数
};
/*
     * 线程的构造函数
     * name:函数名
     * thread_handle:线程处理函数
     * interval_us: 休眠时长
     * joinalbe: 是否能join
     * */
    SrsThread::SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_t interval_us, bool joinable)
    {
        _name = name;
        handler = thread_handler;
        cycle_interval_us = interval_us;
        
        tid = NULL;
        loop = false;
        really_terminated = true;
        _cid = -1;
        _joinable = joinable;
        disposed = false;
        can_run = false;
    }
    //析构函数,调用stop
    SrsThread::~SrsThread()
    {
        stop();
    }
    
    int SrsThread::cid()
    {
        return _cid;
    }
    //启动一个协程
    int SrsThread::start()
    {
        int ret = ERROR_SUCCESS;
        
        if(tid) {
            srs_info("thread %s already running.", _name);
            return ret;
        }
        //创建协程,调用thread_fun
        if((tid = st_thread_create(thread_fun, this, (_joinable? 1:0), 0)) == NULL){
            ret = ERROR_ST_CREATE_CYCLE_THREAD;
            srs_error("st_thread_create failed. ret=%d", ret);
            return ret;
        }
        //是否dispose
        disposed = false;
        // we set to loop to true for thread to run.
        loop = true;
        
        // wait for cid to ready, for parent thread to get the cid.
        while (_cid < 0) {
            st_usleep(10 * 1000);
        }
        
        // now, cycle thread can run.
        can_run = true;
        
        return ret;
    }

    //停止一个协程
    void SrsThread::stop()
    {
        if (!tid) {
            return;
        }
        
        loop = false; //loop为false, 那么不会继续执行cycle()
        
        dispose(); //释放协程
        
        _cid = -1;
        can_run = false;
        tid = NULL;        
    }

    //清理
    void SrsThread::dispose()
    {
        if (disposed) {
            return;
        }
        st_thread_interrupt(tid);
        if (_joinable) {
            // wait the thread to exit.
            int ret = st_thread_join(tid, NULL);
            if (ret) {
                srs_warn("core: ignore join thread failed.");
            }
        }
        while (!really_terminated) {
            st_usleep(10 * 1000);
            
            if (really_terminated) {
                break;
            }
            srs_warn("core: wait thread to actually terminated");
        }
        
        disposed = true;
    }

    //协程的循环
    void SrsThread::thread_cycle()
    {
        int ret = ERROR_SUCCESS;
        
        _srs_context->generate_id(); //生成cid
        srs_info("thread %s cycle start", _name);
        
        _cid = _srs_context->get_id();
        
        srs_assert(handler);
        handler->on_thread_start(); //调用handle的on_thread_start
        
        // thread is running now.
        really_terminated = false;
        
        // wait for cid to ready, for parent thread to get the cid.
        while (!can_run && loop) {
            st_usleep(10 * 1000);
        }
        //正在的loop,loop里执行函数为:on_before_cycle->cycle->on_end_cycle
        while (loop) {
            if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) {
                srs_warn("thread %s on before cycle failed, ignored and retry, ret=%d", _name, ret);
                goto failed;
            }
            srs_info("thread %s on before cycle success", _name);
            
            if ((ret = handler->cycle()) != ERROR_SUCCESS) {
                if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) {
                    srs_warn("thread %s cycle failed, ignored and retry, ret=%d", _name, ret);
                }
                goto failed;
            }
            srs_info("thread %s cycle success", _name);
            
            if ((ret = handler->on_end_cycle()) != ERROR_SUCCESS) {
                srs_warn("thread %s on end cycle failed, ignored and retry, ret=%d", _name, ret);
                goto failed;
            }
            srs_info("thread %s on end cycle success", _name);
            
        failed:
            if (!loop) {
                break;
            }
            if (cycle_interval_us != 0) {
                st_usleep(cycle_interval_us);
            }
        }
        
        // really terminated now.
        really_terminated = true;
        
        handler->on_thread_stop();//停止时的回调
        srs_info("thread %s cycle finished", _name);
    }

    //协程执行的函数
    void* SrsThread::thread_fun(void* arg)
    {
        SrsThread* obj = (SrsThread*)arg;
        srs_assert(obj);
        
        obj->thread_cycle(); //调用cycle函数
        
        // for valgrind to detect.
        SrsThreadContext* ctx = dynamic_cast<SrsThreadContext*>(_srs_context);
        if (ctx) {
            ctx->clear_cid();
        }
        
        st_thread_exit(NULL); //退出协程
        
        return NULL;
    }
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/charles1e/article/details/83625038

智能推荐

详细到吐血 —— 树莓派驱动开发入门:从读懂框架到自己写驱动_树莓派 编译驱动-程序员宅基地

文章浏览阅读9.3k次,点赞47次,收藏237次。师承陈立臣目录驱动初步认知为什么要学会写驱动?主设备号与次设备号从open到电灯,从上层到底层,经历了什么?驱动初步认知为什么要学会写驱动?c标准库时一定有的,而wiringPi库不一定有。树莓派开发简单是因为有库,实现超声波,实现继电器操作,做灯的点亮未来换一块板子,不用树莓派,只要能拿到linux内核源码,拿到芯片手册,电路图主设备号与次设备号一切皆为文件cd /devopen为什么能够区分是鼠标,键盘,还是屏幕?文件名,设备号ls -l10,23510.234设备节点,主设备号_树莓派 编译驱动

linux系统无法正常启动的解决方法_linux配做错误启动不了,怎么删除-程序员宅基地

文章浏览阅读8.6k次。一.root密码忘记的解决方法1.开机后在系统选择页面按e,找到以linux16为开头的那一行,删除ro后面的内容,并将ro改为rw rd.break同时按下ctrl+x进入拯救模式2.在命令界面输入chroot /sysroot/ ##从内核模式切换到shell模式echo westos | passwd --stdin root ##设置密码为west..._linux配做错误启动不了,怎么删除

刷题记录第八十五天-组合总数III-程序员宅基地

文章浏览阅读330次,点赞4次,收藏5次。【代码】刷题记录第八十五天-组合总数III。

2024年贵州省职业院校技能大赛高职组“大数据应用开发“赛题第07套-程序员宅基地

文章浏览阅读777次,点赞29次,收藏25次。环境说明:服务端登录地址详见各任务服务端说明。补充说明:宿主机及各容器节点可通过Asbru工具或SSH客户端进行SSH访问。子任务一:Hadoop 完全分布式安装配置本任务需要使用root用户完成相关配置,安装Hadoop需要配置前置环境。命令中要求使用绝对路径,具体要求如下:1、从宿主机/opt目录下将文件hadoop-3.1.3.tar.gz、jdk-8u212-linux-x64.tar.gz复制到容器Master中的/opt/software路径中(若路径不存在,则需新建),将Master

python 字典(dict)按键和值排序浅谈_python dict排序输出-程序员宅基地

文章浏览阅读624次,点赞2次,收藏2次。然后用sorted方法,通过key这个参数,指定排序是按照value,也就是第一个元素d[1的值来排序。reverse = True表示是需要翻转的,默认是从小到大,翻转的话,那就是从大到小。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随微点阅读小编过来看看吧。以上就是小编为大家带来的python 字典(dict)按键和值排序全部内容了,希望可以帮到大家!print dic.items() 得到[(键,值)]的列表。1 下面的是按照value的值从大到小的顺序来排序。_python dict排序输出

【GD32E230】HC-SR04 超声波测距模块 宽电压3-5.5V 工业级 传感器-程序员宅基地

文章浏览阅读400次,点赞5次,收藏3次。测量周期:当接收到 HC-SR04 通过 Echo 管脚输出的高电平脉冲后,便可进行下一次测量,所以测量周期取决于测量距离,当距离被测物体很近时,Echo 返回的脉冲宽度较窄,测量周期 就很短;最坏情况下,被测物体超出超声波模块的测量范围,此时 返回的脉冲宽度最长,约为 66ms,所以最坏情况下的测量周期稍大于 66ms 即可(取 70ms 足够)。当测量距离超过 HC-SR04 的测量范围时,仍会通过 Echo管脚输出高电平的信号,高电平的宽度约为 66ms。2.10.2 规格参数。

随便推点

Java swing 设计常用方法及作用_javaswing函数大全-程序员宅基地

文章浏览阅读400次。星号可以通配0到多个任意字符(引入包的时候可以用这个偷点懒,但是会把上一级包含的所有包引入,如果需要对程序大小有较为严格的要求,不建议使用,因为会引入其他你不需要的包)这些方法只是 Java Swing 中的一小部分,还有很多其他的方法可以用来实现各种功能。Java Swing 是 Java 中的一个 GUI 工具包,它提供了丰富的组件和容器,可以用来构建各种图形用户界面。7.setBackground(Color color): 设置组件的背景色。_javaswing函数大全

毕业设计:基于java的搜索引擎系统设计与实现_视觉 检索系统java-程序员宅基地

文章浏览阅读124次。基于Java的搜索引擎系统是一个用于对大量数据进行快速查询和搜索的软件系统。对其进行性能评估是为了确保系统能够快速、准确地响应搜索请求,并提供良好的用户体验。以下是基于Java的搜索引擎系统设计与实现的系统性能评估的总结和分析。1、查询速度评估:查询速度是搜索引擎系统最重要的性能指标之一。通过测试系统对搜索请求的响应时间,可以评估查询速度。可以使用不同的搜索关键词和数据集进行测试,观察系统在不同情况下的查询速度表现。同时,还可以分析系统对于不同搜索请求的响应时间的差异,找出可能需要优化的部分。_视觉 检索系统java

TextView 判断自动换行_android 判断文字是否换行-程序员宅基地

文章浏览阅读2k次。先看 需求: 布局中有四种样式 (标签必须在一起 不能截断) 因为没办法用字段区分这四种类型, 所以只能用一个item布局实现效果原理是这样的, 主要是中间的内容 跟后面的标签 会出现这样的问题, 项目中解决的方案是 用两个TextView : content 和 label , content动态计算, 计算换行逻辑: content +label 一行 或者 label不截断的情况下两行_android 判断文字是否换行

案例学习|Python实现某医院药品销售分析_对某个医院药品销售进行数据进行关联性分析的代码-程序员宅基地

文章浏览阅读2.4k次,点赞4次,收藏31次。数据分析的基本过程一般分为以下几个部分:提出问题获取并理解数据数据清洗构建模型数据可视化1.提出问题在数据分析之前,我们先要明确分析目标,可以帮助我们更高效的选取数据,进行分析研究。本次的分析目标是从销售数据中分析出以下业务指标:1)月均消费次数2)月均消费金额3)客单价4)消费趋势有了分析目标,我们再来关注一下数据情况。2.获取并理解数据这里的数据集来源于微信公..._对某个医院药品销售进行数据进行关联性分析的代码

JavaScript中的ES5,ES6是什么意思?_javasc的es5 es6都是什么意思-程序员宅基地

文章浏览阅读832次。JavaScript中的ES5,ES6是什么意思?其实JavaScript是ECMAScript中最有名的一个子类_javasc的es5 es6都是什么意思

【网络工程】常见硬件网络设备(中继器、集线器、网桥、交换机、路由器、网关)等总结_搭建通信网络工程中,需要使用哪些类型的网络设备?-程序员宅基地

文章浏览阅读3.9k次,点赞5次,收藏24次。前言:最近一段时间有朋友问,通信的原理是什么?那么在了解通信原理之前,我们首先要对网络通信常用的设备进行熟悉。在计算机网络体系中,有几样通信设备或者说网络名词出现的频率相当的高,它们是:中继器、集线器、网桥、交换机、路由器和网关。 其实,弄清楚这几个计算机网络的名词并不困难,如果能以计算机网络层次的概念给它们划清界限的话,那就很容易把它们区分出来。那本鱼现在就有条理地梳理一下它们各自的含义和作用,以及它们之间的联系。那我们首先看一下这些网络设备分别处于计算机网络的哪些层次:一、中继器(Repe._搭建通信网络工程中,需要使用哪些类型的网络设备?

推荐文章

热门文章

相关标签