SRS流媒体服务器基本流程

SRS流媒体服务器基本流程介绍。

基本流程图

SRS流媒体服务器基本流程

main()、domain()和run_master()

1. main函数所在文件在main/srs_main_server.cpp中。

2. 定义了一些全局变量:

  • _srs_config:全局配置文件
  • _srs_log:全局的log文件

3. main() 调用 domain() 执行流程,下面的domain()内部分函数作用解析,因为不是主流程,不过多介绍:

	_srs_config->parse_options(argc, argv)	//解析命令行参数

	_srs_config->get_work_dir() //设置工作目录以及当前目录
	_srs_config->initialize_cwd()    
    
	_srs_log->initialize()	//初始化log
    

4. 重点是会创建SrsServer对象并运行

	_srs_server = new SrsServer();
	run(_srs_server)

5. 创建SrsServer时还会初始化http_api_mux和http_server

    http_api_mux = new SrsHttpServeMux(); 	// HTTP请求多路复用器,不是http拉流的
    http_server = new SrsHttpServer(this); // http服务

6. run(SrsServer* svr) 会初始化服务器和获取守护进程配置in_daemon(默认为false),如果in_daemon为false,直接执行 run_master(SrsServer* svr)

     svr->initialize(NULL) //初始化服务器
    _srs_config->get_daemon() //获取守护进行配置,默认false,如果为true, srs将fork子进程,让子进程执行run_master
    run_master(svr) 

7. run_master(SrsServer* svr) 函数中,服务器做一些初始化工作并调用listern监听客户端的连接,然后调用do_cycle函数(死循环),做一些监控,更新时间及缓存等。

	svr->initialize_st() 		//初始化st协程库
    svr->initialize_signal()	//初始化信号
	svr->acquire_pid_file()		//将pid线程写入文件
    svr->listen()				//监听客户端请求
    svr->register_signal()		//注册信号
    svr->http_handle()    		//注册http的处理模块
	svr->ingest()				//开启流采集
	svr->cycle()				//消息循环处理

8. 初始化signal时,在使用state-threads时需要将信号转化为IO,并创建一个协程处理信号IO。

  1. svr->initialize_signal() 进行初始化信号,里面的signal_manager->initialize() 创建pipe。
  2. svr->register_signal() 进行注册信号,里面的signal_manager->start() 注册信号并启动信号监听的协程。
  3. SrsSignalManager::cycle() 进行最终的监听,循环执行监听操作,调用 SrsServer::on_signal()
  4. SrsServer::on_signal() 会对每一种信号检测,如果信号发生,设置相应的bool变量为true
  5. 最后在SrsServer::do_cycle()中检查信号,并处理

9. 重点关注 SrsServer::listen()

SrsServer::listen()

1. SrsServer::listen() 会监听rtmp/http等客户端请求。

	listen_rtmp()			//监听rtmp
	listen_http_api()    	//监听http
    listen_http_stream()	//监听http-stream
    listen_stream_caster()  //监听转换流
    conn_manager->start()	//启动连接管理的协程    

2. 先分析rtmp推拉流过程,所以重点关注 SrsServer::listen_rtmp(),其他后面用到会介绍,比如listen_http_stream()。

SrsServer::listen_rtmp()

1. SrsServer::listen_rtmp() 函数如下

srs_error_t SrsServer::listen_rtmp()
{
    srs_error_t err = srs_success;
    
    // stream service port. 获取配置文件中要监听的端口列表
    std::vector<std::string> ip_ports = _srs_config->get_listens();
    srs_assert((int)ip_ports.size() > 0);
    //关闭SrsListenerRtmpStream类型的监听器,从listeners管理器中删除监听对象。
    close_listeners(SrsListenerRtmpStream);
    
    for (int i = 0; i < (int)ip_ports.size(); i++) { //遍历ip_ports列表,父类 SrsListener 的指针listener 指向新构造的子类 SrsBufferListener 的对象
        SrsListener* listener = new SrsBufferListener(this, SrsListenerRtmpStream);
        listeners.push_back(listener); //加入listeners管理器列表

        int port; string ip; //分割 ip 地址(如果有的话)和 port 端口
        srs_parse_endpoint(ip_ports[i], ip, port);
        //多态:调用子类 SrsBufferListener 的成员函数 listen
        if ((err = listener->listen(ip, port)) != srs_success) {
            srs_error_wrap(err, "rtmp listen %s:%d", ip.c_str(), port);
        }
    }
    
    return err;
}

2. 监听的类型有:

// The listener type for server to identify the connection,
// that is, use different type to process the connection.
enum SrsListenerType
{
    // RTMP client,
    SrsListenerRtmpStream = 0,
    // HTTP api,
    SrsListenerHttpApi = 1,
    // HTTP stream, HDS/HLS/DASH
    SrsListenerHttpStream = 2,
    // UDP stream, MPEG-TS over udp.
    SrsListenerMpegTsOverUdp = 3,
    // TCP stream, RTSP stream.
    SrsListenerRtsp = 4,
    // TCP stream, FLV stream over HTTP.
    SrsListenerFlv = 5,
};

3. close_listeners是将listeners中类型为type的元素移除。

void SrsServer::close_listeners(SrsListenerType type)
{
    std::vector<SrsListener*>::iterator it;
    for (it = listeners.begin(); it != listeners.end();) {
        SrsListener* listener = *it;
        
        if (listener->listen_type() != type) { //不同type,continue
            ++it;
            continue;
        }
        
        srs_freep(listener);
        it = listeners.erase(it); //从listeners移除(vector)
    }
}

SrsBufferListener::listen()、SrsTcpListener::listen()

1. listern_rtmp() 调用调用 SrsBufferListener 的 listen() 监听

srs_error_t SrsBufferListener::listen(string i, int p) //i默认为"0.0.0.0"
{
    srs_error_t err = srs_success;
    
    ip = i;
    port = p;
    
    srs_freep(listener);
    listener = new SrsTcpListener(this, ip, port); //创建TCP监听
    
    if ((err = listener->listen()) != srs_success) { //进入监听
        return srs_error_wrap(err, "buffered tcp listen");
    }
    
    string v = srs_listener_type2string(type);
    srs_trace("%s listen at tcp://%s:%d, fd=%d", v.c_str(), ip.c_str(), port, listener->fd());
    
    return err;
}

2. listener 会调用SrsTcpListener::listen()

srs_error_t SrsTcpListener::listen()
{
    srs_error_t err = srs_success;

    if ((err = srs_tcp_listen(ip, port, &lfd)) != srs_success) { //创建监听的fd,并将fd注册到st库上
        return srs_error_wrap(err, "listen at %s:%d", ip.c_str(), port);
    }
    
    srs_freep(trd);
    trd = new SrsSTCoroutine("tcp", this); //创建一个协程
    if ((err = trd->start()) != srs_success) {  //启动协程,进入SrsSTCoroutine::cycle()
        return srs_error_wrap(err, "start coroutine");
    }
    
    return err;
}

3. SrsTcpListener类进行实际的监听,通过socket->bind->listen(在srs_tcp_listen函数中完成)创建监听的fd,并将fd注册到st库上,之后fd上的事件都有st库监听并处理。

4. 创建tcp协程,用于处理连接,协程启动,并进入 SrsSTCoroutine::cycle() 函数。
a. cycle()函数用于处理客户端连接。

5. 部分函数注释如下:

    srs_tcp_listen(ip, port, &lfd)
        fd = socket(r->ai_family, r->ai_socktype, r->ai_protocol) //创建socket
        do_srs_tcp_listen(fd, r, pfd)
            srs_fd_closeexec(fd)				//设置FD_CLOEXEC
            srs_fd_reuseaddr(fd)				//重复使用fd
            bind(fd, r->ai_addr, r->ai_addrlen)	//绑定服务ip和端口
            ::listen(fd, SERVER_LISTEN_BACKLOG)	//在_fd上开启监听
            (*pfd = srs_netfd_open_socket(fd))	//将fd注册到st库上,以后这个fd的所有请求都交由库处理
    trd = new SrsSTCoroutine("tcp", this)		//创建一个协程
    trd->start()								//启动协程,进入SrsSTCoroutine::cycle()

SrsTcpListener::cycle()

1. SrsSTCoroutine::cycle() 最后会调用到 SrsTcpListener::cycle()

2. SrsTcpListener::cycle() 监听协程接受连接请求后将执行逻辑交给BufferListener处理。

srs_error_t SrsTcpListener::cycle()
{
    srs_error_t err = srs_success;
    
    while (true) {
        if ((err = trd->pull()) != srs_success) {
            return srs_error_wrap(err, "tcp listener");
        }
        //接收连接
        srs_netfd_t fd = srs_accept(lfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT);
        if(fd == NULL){
            return srs_error_new(ERROR_SOCKET_ACCEPT, "accept at fd=%d", srs_netfd_fileno(lfd));
        }
        
	    if ((err = srs_fd_closeexec(srs_netfd_fileno(fd))) != srs_success) {
	        return srs_error_wrap(err, "set closeexec");
	    }
        //这个监听协程只是处理连接请求,具体的执行逻辑交给BufferListener处理
        if ((err = handler->on_tcp_client(fd)) != srs_success) {
            return srs_error_wrap(err, "handle fd=%d", srs_netfd_fileno(fd));
        }
    }
    
    return err;
}

SrsBufferListener::on_tcp_client()

1. SrsBufferListener::on_tcp_client() 代码如下:

srs_error_t SrsBufferListener::on_tcp_client(srs_netfd_t stfd)
{
    srs_error_t err = server->accept_client(type, stfd); //交给SrsServer接受处理
    if (err != srs_success) {
        srs_warn("accept client failed, err is %s", srs_error_desc(err).c_str());
        srs_freep(err);
    }
    
    return srs_success;
}

2. 最终调用SrsServer的accept_client处理

SrsServer::accept_client()、SrsServer::fd2conn()

1. SrsServer::accept_client() 代码如下:
a. 先根据type获取连接的SrsConnection
b. 将SrsConnection加入conns,conns存放所有的连接
c. 为每一个SrsConnection开启一个连接协程

srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd)
{
    srs_error_t err = srs_success;
    
    SrsConnection* conn = NULL;
    
    if ((err = fd2conn(type, stfd, &conn)) != srs_success) { //根据type获取连接的SrsConnection
        if (srs_error_code(err) == ERROR_SOCKET_GET_PEER_IP && _srs_config->empty_ip_ok()) {
            srs_close_stfd(stfd); srs_error_reset(err);
            return srs_success;
        }
        return srs_error_wrap(err, "fd2conn");
    }
    srs_assert(conn);
    
    // directly enqueue, the cycle thread will remove the client.
    conns.push_back(conn); // 加入conns,conns存放所有的连接
    
    // cycle will start process thread and when finished remove the client.
    // @remark never use the conn, for it maybe destroyed.
    if ((err = conn->start()) != srs_success) { //每个连接开启一个连接协程
        return srs_error_wrap(err, "start conn coroutine");
    }
    
    return err;
}

2. 获取连接的主要代码:

因为现在type是SrsListenerRtmpStream,所有conn返回的是SrsRtmpConn。

srs_error_t SrsServer::fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsConnection** pconn)
{
    srs_error_t err = srs_success;
    
    int fd = srs_netfd_fileno(stfd);
    string ip = srs_get_peer_ip(fd);
    
    // for some keep alive application, for example, the keepalived,
    // will send some tcp packet which we cann't got the ip,
    // we just ignore it. 
    if (ip.empty()) { //无法获取ip则进行忽略
        return srs_error_new(ERROR_SOCKET_GET_PEER_IP, "ignore empty ip, fd=%d", fd);
    }
    
    // check connection limitation.
    int max_connections = _srs_config->get_max_connections(); //获取最大连接数
    if (handler && (err = handler->on_accept_client(max_connections, (int)conns.size())) != srs_success) {
        return srs_error_wrap(err, "drop client fd=%d, max=%d, cur=%d for err: %s",
            fd, max_connections, (int)conns.size(), srs_error_desc(err).c_str());
    }
    if ((int)conns.size() >= max_connections) { //如果超过了连接限制,直接拒绝连接
        return srs_error_new(ERROR_EXCEED_CONNECTIONS,
            "drop fd=%d, max=%d, cur=%d for exceed connection limits",
            fd, max_connections, (int)conns.size());
    }
    
    // avoid fd leak when fork.
    // @see https://github.com/ossrs/srs/issues/518
    if (true) {
        int val;
        if ((val = fcntl(fd, F_GETFD, 0)) < 0) {
            return srs_error_new(ERROR_SYSTEM_PID_GET_FILE_INFO, "fnctl F_GETFD error! fd=%d", fd);
        }
        val |= FD_CLOEXEC;
        if (fcntl(fd, F_SETFD, val) < 0) {
            return srs_error_new(ERROR_SYSTEM_PID_SET_FILE_INFO, "fcntl F_SETFD error! fd=%d", fd);
        }
    }
    
    if (type == SrsListenerRtmpStream) {
        *pconn = new SrsRtmpConn(this, stfd, ip); //创建RTMP连接
    } else if (type == SrsListenerHttpApi) {
        *pconn = new SrsHttpApi(this, stfd, http_api_mux, ip);
    } else if (type == SrsListenerHttpStream) {
        *pconn = new SrsResponseOnlyHttpConn(this, stfd, http_server, ip);
    } else {
        srs_warn("close for no service handler. fd=%d, ip=%s", fd, ip.c_str());
        srs_close_stfd(stfd);
        return err;
    }
    
    return err;
}

SrsSTCoroutine::start()、SrsConnection::cycle()

1. SrsConnection::start() 代码如下:

srs_error_t SrsConnection::start()
{
    srs_error_t err = srs_success;

    if ((err = skt->initialize(stfd)) != srs_success) {
        return srs_error_wrap(err, "init socket");
    }
    //启动conn协程,最后会执行到SrsConnection::cycle()
    if ((err = trd->start()) != srs_success) { 
        return srs_error_wrap(err, "coroutine");
    }
    
    return err;
}

2. SrsConnection::cycle() 代码如下:

srs_error_t SrsConnection::cycle()
{
    srs_error_t err = do_cycle(); //SrsRtmpConn::do_cycle
    
    // Notify manager to remove it.
    manager->remove(this);
    
    // success.
    if (err == srs_success) {
        srs_trace("client finished.");
        return err;
    }
    
    // client close peer.
    // TODO: FIXME: Only reset the error when client closed it.
    if (srs_is_client_gracefully_close(err)) {
        srs_warn("client disconnect peer. ret=%d", srs_error_code(err));
    } else if (srs_is_server_gracefully_close(err)) {
        srs_warn("server disconnect. ret=%d", srs_error_code(err));
    } else {
        srs_error("serve error %s", srs_error_desc(err).c_str());
    }
    
    srs_freep(err);
    return srs_success;
}

SrsRtmpConn::do_cycle()

如果有推流事件,就会进入SrsRtmpConn::do_cycle(),此函数负责具体执行RTMP流程,包括握手,接收connect请求,发送response connect响应,以及接收音视频流数据等处理。

srs_error_t SrsRtmpConn::do_cycle()
{
    srs_error_t err = srs_success;
    
    srs_trace("RTMP client ip=%s, fd=%d", ip.c_str(), srs_netfd_fileno(stfd));
    
    rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT); //设置接收超时时间
    rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT); //设置发送超时时间

    if ((err = rtmp->handshake()) != srs_success) { //rtmp握手
        return srs_error_wrap(err, "rtmp handshake");
    }

    uint32_t rip = rtmp->proxy_real_ip();
    if (rip > 0) {
        srs_trace("RTMP proxy real client ip=%d.%d.%d.%d",
            uint8_t(rip>>24), uint8_t(rip>>16), uint8_t(rip>>8), uint8_t(rip));
    }
    
    SrsRequest* req = info->req;
    if ((err = rtmp->connect_app(req)) != srs_success) { //握手成功后,srs会接收并解析客户端发送过来的RTMP消息connect
        return srs_error_wrap(err, "rtmp connect tcUrl");
    }
    
    // set client ip to request.
    req->ip = ip;
    
    srs_trace("connect app, tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%d, app=%s, args=%s",
        req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(),
        req->schema.c_str(), req->vhost.c_str(), req->port,
        req->app.c_str(), (req->args? "(obj)":"null"));
    
    // show client identity
    if(req->args) {
        std::string srs_version;
        std::string srs_server_ip;
        int srs_pid = 0;
        int srs_id = 0;
        
        SrsAmf0Any* prop = NULL;
        if ((prop = req->args->ensure_property_string("srs_version")) != NULL) {
            srs_version = prop->to_str();
        }
        if ((prop = req->args->ensure_property_string("srs_server_ip")) != NULL) {
            srs_server_ip = prop->to_str();
        }
        if ((prop = req->args->ensure_property_number("srs_pid")) != NULL) {
            srs_pid = (int)prop->to_number();
        }
        if ((prop = req->args->ensure_property_number("srs_id")) != NULL) {
            srs_id = (int)prop->to_number();
        }
        
        if (srs_pid > 0) {
            srs_trace("edge-srs ip=%s, version=%s, pid=%d, id=%d",
                srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);
        }
    }
    //服务循环
    if ((err = service_cycle()) != srs_success) {
        err = srs_error_wrap(err, "service cycle");
    }
    
    srs_error_t r0 = srs_success;
    if ((r0 = on_disconnect()) != srs_success) {
        err = srs_error_wrap(err, "on disconnect %s", srs_error_desc(r0).c_str());
        srs_freep(r0);
    }
    
    // If client is redirect to other servers, we already logged the event.
    if (srs_error_code(err) == ERROR_CONTROL_REDIRECT) {
        srs_error_reset(err);
    }
    
    return err;
}
SRS流媒体服务器基本流程

版权声明:本文内容转自互联网,本文观点仅代表作者本人。本站仅提供信息存储空间服务,所有权归原作者所有。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至1393616908@qq.com 举报,一经查实,本站将立刻删除。

(1)

相关推荐

发表回复

登录后才能评论