分享webrtc中任务队列的实现,见文件:rtc_base\task_queue.h。
TaskQueue 定义
具体实现:
class RTC_LOCKABLE RTC_EXPORT TaskQueue {
public:
// TaskQueue priority levels. On some platforms these will map to thread
// priorities, on others such as Mac and iOS, GCD queue priorities.
using Priority = ::webrtc::TaskQueueFactory::Priority;
// 注意这个构造函数,以TaskQueueBase智能指针作为参数
// TaskQueue 的真正实现,其实是这个TaskQueueBase
explicit TaskQueue(std::unique_ptr<webrtc::TaskQueueBase,
webrtc::TaskQueueDeleter> task_queue);
~TaskQueue();
}
创建一个 TaskQueue
具体过程:
- 首先创建一个默认的任务队列工厂
- 然后基于任务队列工厂创建一个任务队列TaskQueueBase基类
- 这个新创建的任务队列TaskQueueBase,
作为参数传给TaskQueue()的构造函数, 从而创建了我们需要的TaskQueue对象 video_encoder_queue_
//首先声明相应对象:一个任务队列工厂、一个任务队列对象
//因为webrtc是跨平台的项目,
//引入一个工厂模式,来兼容不同的平台
//通过工厂来创建任务队列
std::shared_ptr<webrtc::TaskQueueFactory> video_encoder_task_queue_factory_;
rtc::TaskQueue video_encoder_queue_;
//在指定类构造函数列表中创建任务队列
//假如我们的调用类是VideoHandler
//具体的实现如下:
VideoHandbler::VideoHandbler()
: video_encoder_task_queue_factory_(webrtc::CreateDefaultTaskQueueFactory()),
video_encoder_queue_(video_encoder_task_queue_factory_->CreateTaskQueue(
"VideoEncoderQueue",
TaskQueueFactory::Priority::NORMAL)){
}
TaskQueue的构造函数
//TaskQueue的构造函数以TaskQueueBase的智能指针作为参数
//通过成员变量impl_ 接收该指针
//后面会发现,任务最终都传给了TaskQueueBase()
TaskQueue::TaskQueue(
std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter> task_queue)
: impl_(task_queue.release()) {}
//向TaskQueue投递任务时,最终还是通过 imple_抛给了TaskQueueBase()
void TaskQueue::PostTask(std::unique_ptr<webrtc::QueuedTask> task) {
return impl_->PostTask(std::move(task));
}
TaskQueueLibevent
每个平台都有各自的实现
我们主要看TaskQueueLibevent()的实现
//TaskQueueLibevent 继承自TaskQueueBase()真正处理Task的函数
class TaskQueueLibevent final : public TaskQueueBase {
public:
TaskQueueLibevent(absl::string_view queue_name, rtc::ThreadPriority priority);
void Delete() override;
void PostTask(std::unique_ptr<QueuedTask> task) override;
void PostDelayedTask(std::unique_ptr<QueuedTask> task,
uint32_t milliseconds) override;
private:
bool is_active_ = true;
//输入、输出管道用来唤起线程
int wakeup_pipe_in_ = -1;
int wakeup_pipe_out_ = -1;
event_base* event_base_;
event wakeup_event_;
//任务队列线程
//一个任务队列对象一个线程
//对于windows平台内部会执行 CreateThread()创建线程
//android调用 pthread_create()创建线程
//线程的ThreadMain会一直监听管道的可读事件
//从而唤醒线程去处理Task任务
rtc::PlatformThread thread_;
//线程轮训任务队列时会加锁
Mutex pending_lock_;
//最重要的结构:存储Task任务的vector对象
//线程唤醒后会轮训该该pending_对象,处理任务
absl::InlinedVector<std::unique_ptr<QueuedTask>, 4> pending_
RTC_GUARDED_BY(pending_lock_);
// Holds a list of events pending timers for cleanup when the loop exits.
std::list<TimerEvent*> pending_timers_;
TaskQueueLibevent 具体实现
TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name,
rtc::ThreadPriority priority)
: event_base_(event_base_new()),
thread_(&TaskQueueLibevent::ThreadMain, this, queue_name, priority) {
int fds[2];
//创建一个管道用于线程之间通信
RTC_CHECK(pipe(fds) == 0);
//把管道设置为非阻塞
SetNonBlocking(fds[0]);
SetNonBlocking(fds[1]);
wakeup_pipe_out_ = fds[0];
wakeup_pipe_in_ = fds[1];
//绑定管道可读事件,当管道可读时会自动调用OnWakeup()函数
//这块是借用libevent库来实现的
EventAssign(&wakeup_event_, event_base_, wakeup_pipe_out_,
EV_READ | EV_PERSIST, OnWakeup, this);
event_add(&wakeup_event_, 0);
thread_.Start();
}
//调用fcntl()把管道设置诶非阻塞
bool SetNonBlocking(int fd) {
const int flags = fcntl(fd, F_GETFL);
RTC_CHECK(flags != -1);
return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
}
//线程函数一直轮训相关事件
void TaskQueueLibevent::ThreadMain(void* context) {
TaskQueueLibevent* me = static_cast<TaskQueueLibevent*>(context);
{
CurrentTaskQueueSetter set_current(me);
while (me->is_active_)
event_base_loop(me->event_base_, 0);
}
for (TimerEvent* timer : me->pending_timers_)
delete timer;
}
向任务队列中抛入任务
void TaskQueueLibevent::PostTask(std::unique_ptr<QueuedTask> task) {
{
//向队列投递任务时,先加锁
MutexLock lock(&pending_lock_);
bool had_pending_tasks = !pending_.empty();
//把任务插入到队列中
pending_.push_back(std::move(task));
// Only write to the pipe if there were no pending tasks before this one
// since the thread could be sleeping. If there were already pending tasks
// then we know there's either a pending write in the pipe or the thread has
// not yet processed the pending tasks. In either case, the thread will
// eventually wake up and process all pending tasks including this one.
//如果当前线程有未处理的阻塞任务,就暂时不唤醒
if (had_pending_tasks) {
return;
}
}
// Note: This behvior outlined above ensures we never fill up the pipe write
// buffer since there will only ever be 1 byte pending.
//写入的内容是KRunTasks,表明是要处理的Task任务
char message = kRunTasks;
//投入任务后,向管道中写入数据,唤起线程
RTC_CHECK_EQ(write(wakeup_pipe_in_, &message, sizeof(message)),
sizeof(message));
}
管道中写入数据后,会触发OnWakeup()函数
void TaskQueueLibevent::OnWakeup(int socket,
short flags, // NOLINT
void* context) {
TaskQueueLibevent* me = static_cast<TaskQueueLibevent*>(context);
RTC_DCHECK(me->wakeup_pipe_out_ == socket);
char buf;
// 调用read函数从管道中读入数据
//读到数据后基于相应的数据类型进行处理
RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
switch (buf) {
case kQuit:
me->is_active_ = false;
event_base_loopbreak(me->event_base_);
break;
case kRunTasks: {//要处理的任务
absl::InlinedVector<std::unique_ptr<QueuedTask>, 4> tasks;
{
//加锁
//并取出要处理的任务
MutexLock lock(&me->pending_lock_);
tasks.swap(me->pending_);
}
RTC_DCHECK(!tasks.empty());
//遍历所有的task,并调用各自的Run()函数进行处理
//最终就是回调传进来任务函数
for (auto& task : tasks) {
if (task->Run()) {
task.reset();
} else {
// |false| means the task should *not* be deleted.
task.release();
}
}
break;
}
default:
RTC_NOTREACHED();
break;
}
}
使用例子,向任务队列投入任务
向队列中投入一个lambda 任务, 当任务线程接收到该任务后,就会回调该lambda函数,从而让任务在一个异步线程中去执行
video_encoder_queue_.PostTask([this,&video_data]{
....
encoder(video_data);
....
});
视频编码队列 encoder_queue_
encoder_queue_ 就是利用前面讲到TaskQueue进行视频编码的
具体实现:
//声明编码队列
rtc::TaskQueue encoder_queue_;
//构造函数列表中创建该编码队列
VideoStreamEncoder::VideoStreamEncoder():
//创建编码队列
encoder_queue_(task_queue_factory->CreateTaskQueue(
"EncoderQueue",
TaskQueueFactory::Priority::NORMAL))
{}
//向编码队列中投入视频数据进行编码
//所以真正的编码线程,就是encoder_queue所拥有的线程了
void VideoStreamEncoder::OnFrame(const VideoFrame& video_frame) {
encoder_queue_.PostTask(//把视频帧封装为Task投递给encoder_queue
[this, incoming_frame, post_time_us, log_stats, post_interval_us]() {
if (posted_frames_waiting_for_encode == 1 && !cwnd_frame_drop) {
MaybeEncodeVideoFrame(incoming_frame, post_time_us);
}
}
}
析构函数会主动关闭任务队列
析构函数中会自动关闭该任务队列,并停止对应任务线程,不需要我们干预
TaskQueue::~TaskQueue() {
// There might running task that tries to rescheduler itself to the TaskQueue
// and not yet aware TaskQueue destructor is called.
// Calling back to TaskQueue::PostTask need impl_ pointer still be valid, so
// do not invalidate impl_ pointer until Delete returns.
impl_->Delete();
}
void TaskQueueLibevent::Delete() {
RTC_DCHECK(!IsCurrent());
struct timespec ts;
char message = kQuit;
//向管道中写入关闭消息
while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
// The queue is full, so we have no choice but to wait and retry.
RTC_CHECK_EQ(EAGAIN, errno);
ts.tv_sec = 0;
ts.tv_nsec = 1000000;
nanosleep(&ts, nullptr);
}
//停止线程
thread_.Stop();
event_del(&wakeup_event_);
IgnoreSigPipeSignalOnCurrentThread();
//关闭管道
close(wakeup_pipe_in_);
close(wakeup_pipe_out_);
wakeup_pipe_in_ = -1;
wakeup_pipe_out_ = -1;
event_base_free(event_base_);
delete this;
}
以上就是webrtc的 TaskQueue() 任务队列的实现过程了
欢迎指正,欢迎关注,感谢大家!
作者:lcalqf
来源:音视频之路
原文:https://mp.weixin.qq.com/s/P6nVB7Tq2Z2k7thlVSUpEw
版权声明:本文内容转自互联网,本文观点仅代表作者本人。本站仅提供信息存储空间服务,所有权归原作者所有。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至1393616908@qq.com 举报,一经查实,本站将立刻删除。