如何从 SRS 服务器拉 RTMP 流

本文介绍如何从SRS服务器拉RTMP流。

SrsRtmpConn::stream_service_cycle

客户端从SRS服务器拉流主要逻辑入口在SrsRtmpConn::stream_service_cycle:

srs_error_t SrsRtmpConn::stream_service_cycle()
{
    ...
    
    switch (info->type) {
        case SrsRtmpConnPlay: { //拉流
            // response connection start play
            if ((err = rtmp->start_play(info->res->stream_id)) != srs_success) {
                return srs_error_wrap(err, "rtmp: start play");
            }
            if ((err = http_hooks_on_play()) != srs_success) { // 回调接口通知vhost开始play
                return srs_error_wrap(err, "rtmp: callback on play");
            }
            
            err = playing(source);
            http_hooks_on_stop();   // 回调接口通知vhost play停止
            
            return err;
        }
        ...
}

如果客户端类型为SrsRtmpConnPlay表示拉流。其中:

  1. http_hooks_on_play() 方法中回调 on_play() 方法通知 vhost,用户已经开始 play。
  2. http_hooks_on_stop() 方法中回调 on_stop() 方法通知 vhost,用户已经停止 play。
  3. 最重要的函数是 playing(source),进入该函数。

SrsRtmpConn::playing

srs_error_t SrsRtmpConn::playing(SrsSource* source)
{
    srs_error_t err = srs_success;
    
    // Check page referer of player.
    SrsRequest* req = info->req;
    if (_srs_config->get_refer_enabled(req->vhost)) {
        if ((err = refer->check(req->pageUrl, _srs_config->get_refer_play(req->vhost))) != srs_success) {
            return srs_error_wrap(err, "rtmp: referer check");
        }
    }
    
    // When origin cluster enabled, try to redirect to the origin which is active.
    // A active origin is a server which is delivering stream. 启动origin集群时,先尝试重定向到有流信息的origin节点。
    ...  //省略关于origin集群代码,后面介绍
    
    // Set the socket options for transport.
    set_sock_options();
    
    // Create a consumer of source. 每来一个拉流请求,创建一个消费者
    SrsConsumer* consumer = NULL;
    if ((err = source->create_consumer(this, consumer)) != srs_success) {
        return srs_error_wrap(err, "rtmp: create consumer");
    }
    SrsAutoFree(SrsConsumer, consumer);
    
    // Use receiving thread to receive packets from peer.
    // @see: https://github.com/ossrs/srs/issues/217
    SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP, _srs_context->get_id());
    
    if ((err = trd.start()) != srs_success) {
        return srs_error_wrap(err, "rtmp: start receive thread");
    }
    
    // Deliver packets to peer. 向拉流端发送音视频数据。
    wakable = consumer;
    err = do_playing(source, consumer, &trd);
    wakable = NULL;
    
    trd.stop();
    
    // Drop all packets in receiving thread.
    if (!trd.empty()) {
        srs_warn("drop the received %d messages", trd.size());
    }
    
    return err;
}

主流程在do_playing(source, consumer, &trd),进入该函数。

SrsRtmpConn::do_playing

srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRecvThread* rtrd)
{
    srs_error_t err = srs_success;
    
    SrsRequest* req = info->req;
    srs_assert(req);
    srs_assert(consumer);
    
    // initialize other components
    SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_play();
    SrsAutoFree(SrsPithyPrint, pprint);
    
    SrsMessageArray msgs(SRS_PERF_MW_MSGS); //message数组
    bool user_specified_duration_to_stop = (req->duration > 0);
    int64_t starttime = -1;
    
    // setup the realtime.
    realtime = _srs_config->get_realtime_enabled(req->vhost);
    // setup the mw config.
    // when mw_sleep changed, resize the socket send buffer.
    mw_enabled = true;
    change_mw_sleep(_srs_config->get_mw_sleep(req->vhost));
    // initialize the send_min_interval
    send_min_interval = _srs_config->get_send_min_interval(req->vhost);
    
    srs_trace("start play smi=%dms, mw_sleep=%d, mw_enabled=%d, realtime=%d, tcp_nodelay=%d",
        srsu2msi(send_min_interval), srsu2msi(mw_sleep), mw_enabled, realtime, tcp_nodelay);
    
    while (true) {
        // when source is set to expired, disconnect it.
        if ((err = trd->pull()) != srs_success) {
            return srs_error_wrap(err, "rtmp: thread quit");
        }

        // collect elapse for pithy print.
        pprint->elapse();

        // to use isolate thread to recv, can improve about 33% performance. 使用单独协程接收,能提高大约33%的性能。
        // @see: https://github.com/ossrs/srs/issues/196
        // @see: https://github.com/ossrs/srs/issues/217
        while (!rtrd->empty()) { //如果SrsQueueRecvThread的queue有message,取出处理
            SrsCommonMessage* msg = rtrd->pump();
            if ((err = process_play_control_msg(consumer, msg)) != srs_success) { //播放控制信息,比如结束,暂停等
                return srs_error_wrap(err, "rtmp: play control message");
            }
        }
        
        // quit when recv thread error.
        if ((err = rtrd->error_code()) != srs_success) {
            return srs_error_wrap(err, "rtmp: recv thread");
        }
        
#ifdef SRS_PERF_QUEUE_COND_WAIT
        // wait for message to incoming.
        // @see https://github.com/ossrs/srs/issues/251
        // @see https://github.com/ossrs/srs/issues/257
        if (realtime) {
            // for realtime, min required msgs is 0, send when got one+ msgs.
            consumer->wait(0, mw_sleep);
        } else {
            // for no-realtime, got some msgs then send.
            consumer->wait(SRS_PERF_MW_MIN_MSGS, mw_sleep);
        }
#endif
        
        // get messages from consumer. 从消费者处获取message。
        // each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
        // @remark when enable send_min_interval, only fetch one message a time.
        int count = (send_min_interval > 0)? 1 : 0;
        if ((err = consumer->dump_packets(&msgs, count)) != srs_success) {
            return srs_error_wrap(err, "rtmp: consumer dump packets");
        }
        
        // reportable
        if (pprint->can_print()) {
            kbps->sample();
            srs_trace("-> " SRS_CONSTS_LOG_PLAY " time=%d, msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mw=%d",
                (int)pprint->age(), count, kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
                kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), srsu2msi(mw_sleep));
        }
        
        if (count <= 0) {
#ifndef SRS_PERF_QUEUE_COND_WAIT
            srs_usleep(mw_sleep);
#endif
            // ignore when nothing got.
            continue;
        }
        
        // only when user specifies the duration,
        // we start to collect the durations for each message.
        if (user_specified_duration_to_stop) {
            for (int i = 0; i < count; i++) {
                SrsSharedPtrMessage* msg = msgs.msgs[i];
                
                // foreach msg, collect the duration.
                // @remark: never use msg when sent it, for the protocol sdk will free it.
                if (starttime < 0 || starttime > msg->timestamp) {
                    starttime = msg->timestamp;
                }
                duration += (msg->timestamp - starttime) * SRS_UTIME_MILLISECONDS;
                starttime = msg->timestamp;
            }
        }
        
        // sendout messages, all messages are freed by send_and_free_messages().
        // no need to assert msg, for the rtmp will assert it.  //发送音视频数据给play客户端
        if (count > 0 && (err = rtmp->send_and_free_messages(msgs.msgs, count, info->res->stream_id)) != srs_success) {
            return srs_error_wrap(err, "rtmp: send %d messages", count);
        }
        
        // if duration specified, and exceed it, stop play live.
        // @see: https://github.com/ossrs/srs/issues/45
        if (user_specified_duration_to_stop) {
            if (duration >= req->duration) {
                return srs_error_new(ERROR_RTMP_DURATION_EXCEED, "rtmp: time %d up %d", srsu2msi(duration), srsu2msi(req->duration));
            }
        }
        
        // apply the minimal interval for delivery stream in srs_utime_t.
        if (send_min_interval > 0) {
            srs_usleep(send_min_interval);
        }
    }
    
    return err;
}
srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRecvThread* rtrd)
{
    srs_error_t err = srs_success;
    
    SrsRequest* req = info->req;
    srs_assert(req);
    srs_assert(consumer);
    
    // initialize other components
    SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_play();
    SrsAutoFree(SrsPithyPrint, pprint);
    
    SrsMessageArray msgs(SRS_PERF_MW_MSGS); //message数组
    bool user_specified_duration_to_stop = (req->duration > 0);
    int64_t starttime = -1;
    
    // setup the realtime.
    realtime = _srs_config->get_realtime_enabled(req->vhost);
    // setup the mw config.
    // when mw_sleep changed, resize the socket send buffer.
    mw_enabled = true;
    change_mw_sleep(_srs_config->get_mw_sleep(req->vhost));
    // initialize the send_min_interval
    send_min_interval = _srs_config->get_send_min_interval(req->vhost);
    
    srs_trace("start play smi=%dms, mw_sleep=%d, mw_enabled=%d, realtime=%d, tcp_nodelay=%d",
        srsu2msi(send_min_interval), srsu2msi(mw_sleep), mw_enabled, realtime, tcp_nodelay);
    
    while (true) {
        // when source is set to expired, disconnect it.
        if ((err = trd->pull()) != srs_success) {
            return srs_error_wrap(err, "rtmp: thread quit");
        }

        // collect elapse for pithy print.
        pprint->elapse();

        // to use isolate thread to recv, can improve about 33% performance. 使用单独协程接收,能提高大约33%的性能。
        // @see: https://github.com/ossrs/srs/issues/196
        // @see: https://github.com/ossrs/srs/issues/217
        while (!rtrd->empty()) { //如果SrsQueueRecvThread的queue有message,取出处理
            SrsCommonMessage* msg = rtrd->pump();
            if ((err = process_play_control_msg(consumer, msg)) != srs_success) { //播放控制信息,比如结束,暂停等
                return srs_error_wrap(err, "rtmp: play control message");
            }
        }
        
        // quit when recv thread error.
        if ((err = rtrd->error_code()) != srs_success) {
            return srs_error_wrap(err, "rtmp: recv thread");
        }
        
#ifdef SRS_PERF_QUEUE_COND_WAIT
        // wait for message to incoming.
        // @see https://github.com/ossrs/srs/issues/251
        // @see https://github.com/ossrs/srs/issues/257
        if (realtime) {
            // for realtime, min required msgs is 0, send when got one+ msgs.
            consumer->wait(0, mw_sleep);
        } else {
            // for no-realtime, got some msgs then send.
            consumer->wait(SRS_PERF_MW_MIN_MSGS, mw_sleep);
        }
#endif
        
        // get messages from consumer. 从消费者处获取message。
        // each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
        // @remark when enable send_min_interval, only fetch one message a time.
        int count = (send_min_interval > 0)? 1 : 0;
        if ((err = consumer->dump_packets(&msgs, count)) != srs_success) {
            return srs_error_wrap(err, "rtmp: consumer dump packets");
        }
        
        // reportable
        if (pprint->can_print()) {
            kbps->sample();
            srs_trace("-> " SRS_CONSTS_LOG_PLAY " time=%d, msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mw=%d",
                (int)pprint->age(), count, kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
                kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), srsu2msi(mw_sleep));
        }
        
        if (count <= 0) {
#ifndef SRS_PERF_QUEUE_COND_WAIT
            srs_usleep(mw_sleep);
#endif
            // ignore when nothing got.
            continue;
        }
        
        // only when user specifies the duration,
        // we start to collect the durations for each message.
        if (user_specified_duration_to_stop) {
            for (int i = 0; i < count; i++) {
                SrsSharedPtrMessage* msg = msgs.msgs[i];
                
                // foreach msg, collect the duration.
                // @remark: never use msg when sent it, for the protocol sdk will free it.
                if (starttime < 0 || starttime > msg->timestamp) {
                    starttime = msg->timestamp;
                }
                duration += (msg->timestamp - starttime) * SRS_UTIME_MILLISECONDS;
                starttime = msg->timestamp;
            }
        }
        
        // sendout messages, all messages are freed by send_and_free_messages().
        // no need to assert msg, for the rtmp will assert it.  //发送音视频数据给play客户端
        if (count > 0 && (err = rtmp->send_and_free_messages(msgs.msgs, count, info->res->stream_id)) != srs_success) {
            return srs_error_wrap(err, "rtmp: send %d messages", count);
        }
        
        // if duration specified, and exceed it, stop play live.
        // @see: https://github.com/ossrs/srs/issues/45
        if (user_specified_duration_to_stop) {
            if (duration >= req->duration) {
                return srs_error_new(ERROR_RTMP_DURATION_EXCEED, "rtmp: time %d up %d", srsu2msi(duration), srsu2msi(req->duration));
            }
        }
        
        // apply the minimal interval for delivery stream in srs_utime_t.
        if (send_min_interval > 0) {
            srs_usleep(send_min_interval);
        }
    }
    
    return err;
}

SrsRtmpConn::do_playing主要有三个功能。

  1. 播放控制信息,比如结束,暂停等。
  2. 从消费者列表中取出Rtmp message(SrsMessageQueue)
  3. 发送message给拉流客户端。

播放控制信息(SrsRtmpConn::process_play_control_msg)

对应代码如下:

srs_error_t SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg)
{
    srs_error_t err = srs_success;
    
    if (!msg) {
        return err;
    }
    SrsAutoFree(SrsCommonMessage, msg);
    
    if (!msg->header.is_amf0_command() && !msg->header.is_amf3_command()) {
        return err;
    }
    
    SrsPacket* pkt = NULL;
    if ((err = rtmp->decode_message(msg, &pkt)) != srs_success) {
        return srs_error_wrap(err, "rtmp: decode message");
    }
    SrsAutoFree(SrsPacket, pkt);
    
    // for jwplayer/flowplayer, which send close as pause message.
    // @see https://github.com/ossrs/srs/issues/6
    SrsCloseStreamPacket* close = dynamic_cast<SrsCloseStreamPacket*>(pkt);
    if (close) {
        return srs_error_new(ERROR_CONTROL_RTMP_CLOSE, "rtmp: close stream");
    }
    
    // call msg,
    // support response null first,
    // @see https://github.com/ossrs/srs/issues/106
    // TODO: FIXME: response in right way, or forward in edge mode.
    SrsCallPacket* call = dynamic_cast<SrsCallPacket*>(pkt);
    if (call) {
        // only response it when transaction id not zero,
        // for the zero means donot need response.
        if (call->transaction_id > 0) {
            SrsCallResPacket* res = new SrsCallResPacket(call->transaction_id);
            res->command_object = SrsAmf0Any::null();
            res->response = SrsAmf0Any::null();
            if ((err = rtmp->send_and_free_packet(res, 0)) != srs_success) {
                return srs_error_wrap(err, "rtmp: send packets");
            }
        }
        return err;
    }
    
    // pause
    SrsPausePacket* pause = dynamic_cast<SrsPausePacket*>(pkt);
    if (pause) {
        if ((err = rtmp->on_play_client_pause(info->res->stream_id, pause->is_pause)) != srs_success) {
            return srs_error_wrap(err, "rtmp: pause");
        }
        if ((err = consumer->on_play_client_pause(pause->is_pause)) != srs_success) {
            return srs_error_wrap(err, "rtmp: pause");
        }
        return err;
    }
    
    // other msg.
    return err;
}

从消费者列表中取出Rtmp message(SrsConsumer::dump_packets)

从SrsMessageQueue获取messages,对应代码如下:

srs_error_t SrsConsumer::dump_packets(SrsMessageArray* msgs, int& count)
{
    srs_error_t err = srs_success;
    
    srs_assert(count >= 0);
    srs_assert(msgs->max > 0);
    
    // the count used as input to reset the max if positive.
    int max = count? srs_min(count, msgs->max) : msgs->max;
    
    // the count specifies the max acceptable count,
    // here maybe 1+, and we must set to 0 when got nothing.
    count = 0;
    
    if (should_update_source_id) {
        srs_trace("update source_id=%d/%d", source->source_id(), source->pre_source_id());
        should_update_source_id = false;
    }
    
    // paused, return nothing.
    if (paused) {
        return err;
    }
    
    // pump msgs from queue. 从SrsMessageQueue获取messages
    if ((err = queue->dump_packets(max, msgs->msgs, count)) != srs_success) {
        return srs_error_wrap(err, "dump packets");
    }
    
    return err;
}

数据实际存储在:SrsMessageQueue的SrsFastVector中,注:

RTMP推流时,通过SrsMessageQueue::enqueue函数将onMetadata、audio和video数据加入到SrsFastVector数组中。详见:RTMP推流到SRS流媒体服务器metadata,video,audio数据处理

发送message给拉流客户端

SrsProtocol::send_and_free_messages函数负责将message发送给拉流客户端。

srs_error_t SrsProtocol::send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id)
{
    // always not NULL msg.
    srs_assert(msgs);
    srs_assert(nb_msgs > 0);
    
    // update the stream id in header.
    for (int i = 0; i < nb_msgs; i++) { //遍历获取msg
        SrsSharedPtrMessage* msg = msgs[i];
        
        if (!msg) {
            continue;
        }
        
        // check perfer cid and stream, 检查stream id
        // when one msg stream id is ok, ignore left.
        if (msg->check(stream_id)) {
            break;
        }
    }
    
    // donot use the auto free to free the msg,
    // for performance issue. 不要使用auto free来释放msg,因为性能问题。
    srs_error_t err = do_send_messages(msgs, nb_msgs);
    
    for (int i = 0; i < nb_msgs; i++) {
        SrsSharedPtrMessage* msg = msgs[i];
        srs_freep(msg);
    }
    
    // donot flush when send failed
    if (err != srs_success) {
        return srs_error_wrap(err, "send messages");
    }
    
    // flush messages in manual queue
    if ((err = manual_response_flush()) != srs_success) {
        return srs_error_wrap(err, "manual flush response");
    }
    
    print_debug_info();
    
    return err;
}

发送消息。

srs_error_t SrsProtocol::do_iovs_send(iovec* iovs, int size)
{
    return srs_write_large_iovs(skt, iovs, size);
}
srs_error_t srs_write_large_iovs(ISrsProtocolReadWriter* skt, iovec* iovs, int size, ssize_t* pnwrite)
{
    srs_error_t err = srs_success;
    
    // the limits of writev iovs.
    // for srs-librtmp, @see https://github.com/ossrs/srs/issues/213
#ifndef _WIN32
    // for linux, generally it's 1024.
    static int limits = (int)sysconf(_SC_IOV_MAX);
#else
    static int limits = 1024;
#endif
    
    // send in a time.
    if (size <= limits) {
        if ((err = skt->writev(iovs, size, pnwrite)) != srs_success) {
            return srs_error_wrap(err, "writev");
        }
        return err;
    }
   
    // send in multiple times.
    int cur_iov = 0;
    ssize_t nwrite = 0;
    while (cur_iov < size) {
        int cur_count = srs_min(limits, size - cur_iov);
        if ((err = skt->writev(iovs + cur_iov, cur_count, &nwrite)) != srs_success) { //转发给拉流端的流转发出去。
            return srs_error_wrap(err, "writev");
        }
        cur_iov += cur_count;
        if (pnwrite) {
            *pnwrite += nwrite;
        }
    }
    
    return err;
}

原文:https://www.yuque.com/wahaha-0yfyj/mnfloz/shzn6x

版权声明:本文内容转自互联网,本文观点仅代表作者本人。本站仅提供信息存储空间服务,所有权归原作者所有。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至1393616908@qq.com 举报,一经查实,本站将立刻删除。

(0)

相关推荐

发表回复

登录后才能评论