webrtc源码分析-线程任务管理
2021/7/31 1:06:26
本文主要是介绍webrtc源码分析-线程任务管理,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
1.前言
webrtc线程源于chromium,其中有消息队列,通信等功能,相对于原始的std::thread或者posix pthread而言,好用不少,本文介绍了webrtc 线程的常用功能以及实现;
2.正文
2.1 webrtc中的主要线程
出于管理接口即时性,平衡IO任务或其它任务阻塞性等,通过异步的方式,将不同类型的任务归类到不同的异步线程去处理是常见的处理方式,webrtc中一共有三大线程
-
signaling_thread_: 处理PeerConnection有关的接口任务和observer回调
-
network_thread_:网络io等
-
worker_thread_:其它阻塞耗时的任务
2.2 使用Invoke在异步线程执行任务
当需要将一个任务放到异步线程的时候,只需要使用thread->Invoke<>()
函数即可,取JsepTransportController::SetLocalDescription()
作为例子, 在函数最开头就检查了,network_thread线程是否是当前线程,如果不是则通过network_thread_->Invoke<>()
将当前函数投递到network_thread_中去执行:
RTCError JsepTransportController::SetLocalDescription( SdpType type, const cricket::SessionDescription* description) { // network线程运行 if (!network_thread_->IsCurrent()) { return network_thread_->Invoke<RTCError>( RTC_FROM_HERE, [=] { return SetLocalDescription(type, description); }); } RTC_DCHECK_RUN_ON(network_thread_); if (!initial_offerer_.has_value()) { initial_offerer_.emplace(type == SdpType::kOffer); if (*initial_offerer_) { SetIceRole_n(cricket::ICEROLE_CONTROLLING); } else { SetIceRole_n(cricket::ICEROLE_CONTROLLED); } } return ApplyDescription_n(/*local=*/true, type, description); }
2.3 Invoke的实现和线程任务管理
2.3.1 任务的投递
以上述的SetLocalDescription()
为例,其调用的network_thread_->Invoke()
如下:
template中的第一个参数ReturnT
为执行函数的返回值类型,第二个参数用来SFINAE确保返回值不是void
(还有一个适配void类型的版本)
作为task的任务被转化成了FunctionView
类型的functor,传入到函数InvokeInternal()
中
template < class ReturnT, typename = typename std::enable_if<!std::is_void<ReturnT>::value>::type> ReturnT Invoke(const Location& posted_from, FunctionView<ReturnT()> functor) { ReturnT result; InvokeInternal(posted_from, [functor, &result] { result = functor(); }); return result; }
InvokeInternal()
会将functor转化成Msg handler然后放到this线程队列中去
void Thread::InvokeInternal(const Location& posted_from, rtc::FunctionView<void()> functor) { TRACE_EVENT2("webrtc", "Thread::Invoke", "src_file", posted_from.file_name(), "src_func", posted_from.function_name()); class FunctorMessageHandler : public MessageHandler { public: explicit FunctorMessageHandler(rtc::FunctionView<void()> functor) : functor_(functor) {} void OnMessage(Message* msg) override { functor_(); } private: rtc::FunctionView<void()> functor_; // 将funtor转化成Msg handler } handler(functor); // 发送到this线程队列中 Send(posted_from, &handler); }
在Thread::Send()
函数中有非常多的细节,首先会判断当前线程和this线程是否相同,是就直接执行,否则生成一个QueueTask 投递到this线程的队列中去
void Thread::Send(const Location& posted_from, MessageHandler* phandler, uint32_t id, MessageData* pdata) { RTC_DCHECK(!IsQuitting()); if (IsQuitting()) return; // Sent messages are sent to the MessageHandler directly, in the context // of "thread", like Win32 SendMessage. If in the right context, // call the handler directly. // 构造成msg Message msg; msg.posted_from = posted_from; msg.phandler = phandler; msg.message_id = id; msg.pdata = pdata; // 如果当前线程就是this线程的话,直接将 // 执行任务即可 if (IsCurrent()) { msg.phandler->OnMessage(&msg); return; } AssertBlockingIsAllowedOnCurrentThread(); // 获取当前线程 Thread* current_thread = Thread::Current(); #if RTC_DCHECK_IS_ON if (current_thread) { RTC_DCHECK(current_thread->IsInvokeToThreadAllowed(this)); ThreadManager::Instance()->RegisterSendAndCheckForCycles(current_thread, this); } #endif // Perhaps down the line we can get rid of this workaround and always require // current_thread to be valid when Send() is called. std::unique_ptr<rtc::Event> done_event; if (!current_thread) done_event.reset(new rtc::Event()); bool ready = false; // 将msg封装达成QueueTask,放到线程队列中 PostTask(webrtc::ToQueuedTask( [&msg]() mutable { msg.phandler->OnMessage(&msg); }, [this, &ready, current_thread, done = done_event.get()] { if (current_thread) { CritScope cs(&crit_); ready = true; current_thread->socketserver()->WakeUp(); } else { done->Set(); } })); if (current_thread) { // 当前的thread是google thread bool waited = false; crit_.Enter(); while (!ready) { // 任务未执行完,阻塞等待到任务完成被唤醒 crit_.Leave(); current_thread->socketserver()->Wait(kForever, false); // epoll wait waited = true; crit_.Enter(); } crit_.Leave(); // Our Wait loop above may have consumed some WakeUp events for this // Thread, that weren't relevant to this Send. Losing these WakeUps can // cause problems for some SocketServers. // // Concrete example: // Win32SocketServer on thread A calls Send on thread B. While processing // the message, thread B Posts a message to A. We consume the wakeup for // that Post while waiting for the Send to complete, which means that when // we exit this loop, we need to issue another WakeUp, or else the Posted // message won't be processed in a timely manner. if (waited) { // socketserver有两个使用场景 // 1.像这种给别的线程投递了阻塞任务后,进行wait等到执行完毕 // 2.Thread::Get()函数中获取消息的时候,如果获取不到就会陷入永久的wait直到被wakup() // 对于第二点,此处提到了一个问题,A向B投递了一个阻塞任务task1后wait等待结果,此时别的线程 // 向A的队列投递了一个任务task1,投递的时候会有wakeup()的操作,那么上面检测ready的loop会把 // 这个wakeup()给吃掉,当任务完成时,由于wakeup被吃掉了,导致线程获取task得时候会陷入wait // 无法及时处理task,(表述上确实如此,但代码上看似乎没有这样得问题,因为是先检测队列是否为空 // 再继续wait的) current_thread->socketserver()->WakeUp(); } } else { // 非常google thread done_event->Wait(rtc::Event::kForever); } }
先跟着主流程走看看这个PostTask()
函数, PostTask()
内部直接调用了POST()
, 在POST()
中把msg再次封装成一个rtc::message
然后投递到this线程的任务队列messages_
中,然后执行WakeUpSocketServer()
唤醒this线程消费任务,至此,任务的投递过程就完成了;
void Thread::PostTask(std::unique_ptr<webrtc::QueuedTask> task) { // Though Post takes MessageData by raw pointer (last parameter), it still // takes it with ownership. Post(RTC_FROM_HERE, &queued_task_handler_, /*id=*/0, new ScopedMessageData<webrtc::QueuedTask>(std::move(task))); } void Thread::Post(const Location& posted_from, MessageHandler* phandler, uint32_t id, MessageData* pdata, bool time_sensitive) { RTC_DCHECK(!time_sensitive); if (IsQuitting()) { delete pdata; return; } // Keep thread safe // Add the message to the end of the queue // Signal for the multiplexer to return { CritScope cs(&crit_); // 将QueueTask 封装成 rtc::message 放到message队列中 Message msg; msg.posted_from = posted_from; msg.phandler = phandler; msg.message_id = id; msg.pdata = pdata; messages_.push_back(msg); } // 唤醒this线程消费任务 WakeUpSocketServer(); }
2.3.2 任务的消费
接下来看看task的消费流程,thread启动之后会运行Run()
然后运行ProcessMessages()
void Thread::Run() { ProcessMessages(kForever); } bool Thread::ProcessMessages(int cmsLoop) { // Using ProcessMessages with a custom clock for testing and a time greater // than 0 doesn't work, since it's not guaranteed to advance the custom // clock's time, and may get stuck in an infinite loop. RTC_DCHECK(GetClockForTesting() == nullptr || cmsLoop == 0 || cmsLoop == kForever); int64_t msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop); int cmsNext = cmsLoop; while (true) { #if defined(WEBRTC_MAC) ScopedAutoReleasePool pool; #endif Message msg; // 获取消息 if (!Get(&msg, cmsNext)) return !IsQuitting(); // 分发处理 Dispatch(&msg); if (cmsLoop != kForever) { cmsNext = static_cast<int>(TimeUntil(msEnd)); if (cmsNext < 0) return true; } } }
在Thread::Get()
中会从消息队列messages_
获取消息,看起来很长,核心的只有几句:
遍历delay_messages_,获取到期消息并放入到messages_队列中
将messages_存在的消息取出,返回出去,如果没有,则break陷入阻塞直到被唤醒
bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) { // Return and clear peek if present // Always return the peek if it exists so there is Peek/Get symmetry if (fPeekKeep_) { *pmsg = msgPeek_; fPeekKeep_ = false; return true; } // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch int64_t cmsTotal = cmsWait; int64_t cmsElapsed = 0; int64_t msStart = TimeMillis(); int64_t msCurrent = msStart; while (true) { // Check for posted events int64_t cmsDelayNext = kForever; bool first_pass = true; while (true) { // All queue operations need to be locked, but nothing else in this loop // (specifically handling disposed message) can happen inside the crit. // Otherwise, disposed MessageHandlers will cause deadlocks. { CritScope cs(&crit_); // On the first pass, check for delayed messages that have been // triggered and calculate the next trigger time. if (first_pass) { first_pass = false; // 遍历delay message到期消息 while (!delayed_messages_.empty()) { if (msCurrent < delayed_messages_.top().run_time_ms_) { cmsDelayNext = TimeDiff(delayed_messages_.top().run_time_ms_, msCurrent); break; } // 将到期消息移动到messages_队列中 messages_.push_back(delayed_messages_.top().msg_); delayed_messages_.pop(); } } // Pull a message off the message queue, if available. // 获取messages_任务 if (messages_.empty()) { break; } else { *pmsg = messages_.front(); messages_.pop_front(); } } // crit_ is released here. // If this was a dispose message, delete it and skip it. if (MQID_DISPOSE == pmsg->message_id) { RTC_DCHECK(nullptr == pmsg->phandler); delete pmsg->pdata; *pmsg = Message(); continue; } return true; } if (IsQuitting()) break; // Which is shorter, the delay wait or the asked wait? int64_t cmsNext; if (cmsWait == kForever) { cmsNext = cmsDelayNext; } else { cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) cmsNext = cmsDelayNext; } { // 阻塞直到消息来 // Wait and multiplex in the meantime if (!ss_->Wait(static_cast<int>(cmsNext), process_io)) return false; } // If the specified timeout expired, return msCurrent = TimeMillis(); cmsElapsed = TimeDiff(msCurrent, msStart); if (cmsWait != kForever) { if (cmsElapsed >= cmsWait) return false; } } return false; }
当消息被获取出,就调用dispatch()
然后执行,至此,任务就被执行完成了
void Thread::Dispatch(Message* pmsg) { TRACE_EVENT2("webrtc", "Thread::Dispatch", "src_file", pmsg->posted_from.file_name(), "src_func", pmsg->posted_from.function_name()); RTC_DCHECK_RUN_ON(this); int64_t start_time = TimeMillis(); pmsg->phandler->OnMessage(pmsg);// 执行 int64_t end_time = TimeMillis(); int64_t diff = TimeDiff(end_time, start_time); if (diff >= dispatch_warning_ms_) { RTC_LOG(LS_INFO) << "Message to " << name() << " took " << diff << "ms to dispatch. Posted from: " << pmsg->posted_from.ToString(); // To avoid log spew, move the warning limit to only give warning // for delays that are larger than the one observed. dispatch_warning_ms_ = diff + 1; } }
调用QueuedTaskHandler::OnMessage()
,从msg->pdata中还原成QueueTask运行,然后release释放掉
void Thread::QueuedTaskHandler::OnMessage(Message* msg) { RTC_DCHECK(msg); //取出data 还原成queue task auto* data = static_cast<ScopedMessageData<webrtc::QueuedTask>*>(msg->pdata); std::unique_ptr<webrtc::QueuedTask> task = std::move(data->data()); // Thread expects handler to own Message::pdata when OnMessage is called // Since MessageData is no longer needed, delete it. delete data; // 运行之后释放 // QueuedTask interface uses Run return value to communicate who owns the // task. false means QueuedTask took the ownership. if (!task->Run()) task.release(); }
2.3.3 执行结果的返回
上述task执行的时候涉及到两个非常重要的函数task->Run()
和 task.release()
:
task->Run()
会将最初的投递进来的函数运行然后存到result中,流程如下
首先构造QueueTask时的lambda
void Thread::Send(const Location& posted_from, MessageHandler* phandler, uint32_t id, MessageData* pdata) { ...... PostTask(webrtc::ToQueuedTask( [&msg]() mutable { msg.phandler->OnMessage(&msg); }, // <= Run() [this, &ready, current_thread, done = done_event.get()] { if (current_thread) { CritScope cs(&crit_); ready = true; current_thread->socketserver()->WakeUp(); } else { done->Set(); } })); ...... }
msg.phandler->OnMessage(&msg)
中的phanlder是InvokeInternal
构造的FunctorMessageHandler,调用的override的OnMessage()函数
void Thread::InvokeInternal(const Location& posted_from, rtc::FunctionView<void()> functor) { TRACE_EVENT2("webrtc", "Thread::Invoke", "src_file", posted_from.file_name(), "src_func", posted_from.function_name()); class FunctorMessageHandler : public MessageHandler { public: explicit FunctorMessageHandler(rtc::FunctionView<void()> functor) : functor_(functor) {} void OnMessage(Message* msg) override { functor_(); } // <= OnMessage() private: rtc::FunctionView<void()> functor_; } handler(functor); Send(posted_from, &handler); }
functor_()
是Invoke中是将result = functor();
封装成的一个lambda,所以执行完成后的result会存在该result中;
template < class ReturnT, typename = typename std::enable_if<!std::is_void<ReturnT>::value>::type> ReturnT Invoke(const Location& posted_from, FunctionView<ReturnT()> functor) { ReturnT result; InvokeInternal(posted_from, [functor, &result] { result = functor(); }); return result; }
在之前投递任务的send函数中,当前线程PostTask()
后就开始current_thread->socketserver()->Wait(kForever, false)
, 陷入阻塞;
task.release()
则会唤醒投递任务后陷入wait的当前线程,让其将result返回
void Thread::Send(const Location& posted_from, MessageHandler* phandler, uint32_t id, MessageData* pdata) { ...... std::unique_ptr<rtc::Event> done_event; if (!current_thread) done_event.reset(new rtc::Event()); bool ready = false; // 将msg封装达成QueueTask,放到线程队列中 PostTask(webrtc::ToQueuedTask( [&msg]() mutable { msg.phandler->OnMessage(&msg); }, [this, &ready, current_thread, done = done_event.get()] { // <= task.release() if (current_thread) { CritScope cs(&crit_); ready = true; current_thread->socketserver()->WakeUp(); // 唤醒 } else { done->Set(); } })); if (current_thread) { // 当前的thread是google thread bool waited = false; crit_.Enter(); while (!ready) { // 任务未执行完,阻塞等待到任务完成被唤醒 crit_.Leave(); current_thread->socketserver()->Wait(kForever, false); // epoll wait waited = true; crit_.Enter(); } crit_.Leave(); // Our Wait loop above may have consumed some WakeUp events for this // Thread, that weren't relevant to this Send. Losing these WakeUps can // cause problems for some SocketServers. // // Concrete example: // Win32SocketServer on thread A calls Send on thread B. While processing // the message, thread B Posts a message to A. We consume the wakeup for // that Post while waiting for the Send to complete, which means that when // we exit this loop, we need to issue another WakeUp, or else the Posted // message won't be processed in a timely manner. if (waited) { // socketserver有两个使用场景 // 1.像这种给别的线程投递了阻塞任务后,进行wait等到执行完毕 // 2.Thread::Get()函数中获取消息的时候,如果获取不到就会陷入永久的wait直到被wakup() // 对于第二点,此处提到了一个问题,A向B投递了一个阻塞任务task1后wait等待结果,此时别的线程 // 向A的队列投递了一个任务task1,投递的时候会有wakeup()的操作,那么上面检测ready的loop会把 // 这个wakeup()给吃掉,当任务完成时,由于wakeup被吃掉了,导致线程获取task得时候会陷入wait // 无法及时处理task,(表述上确实如此,但代码上看似乎没有这样得问题,因为是先检测队列是否为空 // 再继续wait的) current_thread->socketserver()->WakeUp(); } } else { // 非常google thread done_event->Wait(rtc::Event::kForever); } }
2.4 API类的异步代理类
signaling_thread_ 是用来处理Api层次任务的线程,类对外的接口会通过代理类,将内部的接口任务投递到signaling_thread_ 中;
比如类PeerConnection,在api\peer_connection_proxy.h
定义了其代理类如下
BEGIN_PROXY_MAP(PeerConnection) PROXY_PRIMARY_THREAD_DESTRUCTOR() PROXY_METHOD0(rtc::scoped_refptr<StreamCollectionInterface>, local_streams) PROXY_METHOD0(rtc::scoped_refptr<StreamCollectionInterface>, remote_streams) PROXY_METHOD1(bool, AddStream, MediaStreamInterface*) PROXY_METHOD1(void, RemoveStream, MediaStreamInterface*) PROXY_METHOD2(RTCErrorOr<rtc::scoped_refptr<RtpSenderInterface>>, AddTrack, rtc::scoped_refptr<MediaStreamTrackInterface>, const std::vector<std::string>&) .....
这样的代理类是通过api/proxy.h
的一组宏完成的的,这组宏的用法在文件有很详细的说明:
// // Example usage: // 1. 创建interface类 // class TestInterface : public rtc::RefCountInterface { // public: // std::string FooA() = 0; // std::string FooB(bool arg1) const = 0; // std::string FooC(bool arg1) = 0; // }; // // Note that return types can not be a const reference. // 2.继承接口类,实现 // class Test : public TestInterface { // ... implementation of the interface. // }; // // 3. 通过宏生命代理类 // BEGIN_PROXY_MAP(Test) // PROXY_PRIMARY_THREAD_DESTRUCTOR() // PROXY_METHOD0(std::string, FooA) // PROXY_CONSTMETHOD1(std::string, FooB, arg1) // PROXY_SECONDARY_METHOD1(std::string, FooC, arg1) // END_PROXY_MAP() // // Where the destructor and first two methods are invoked on the primary // thread, and the third is invoked on the secondary thread. // // The proxy can be created using // 4.创建代理对象 // TestProxy::Create(Thread* signaling_thread, Thread* worker_thread, // TestInterface*). //
将PeerConnection的代理宏展开
template <class INTERNAL_CLASS> class PeerConnectionProxyWithInternal; typedef PeerConnectionProxyWithInternal<PeerConnectionInterface> PeerConnectionProxy; // 代理类继承PeerConnectionInterface接口 template <class INTERNAL_CLASS> class PeerConnectionProxyWithInternal : public PeerConnectionInterface { protected: typedef PeerConnectionInterface C; public: const INTERNAL_CLASS* internal() const { return c_; } INTERNAL_CLASS* internal() { return c_; } protected: PeerConnectionProxyWithInternal(rtc::Thread* primary_thread, rtc::Thread* secondary_thread, INTERNAL_CLASS* PeerConnection) : primary_thread_(primary_thread), secondary_thread_(secondary_thread), c_(PeerConnection) {} private: // 放入的两个线程 mutable rtc::Thread* primary_thread_; mutable rtc::Thread* secondary_thread_; protected: ~PeerConnectionProxyWithInternal() { MethodCall<PeerConnectionProxyWithInternal, void> call( this, &PeerConnectionProxyWithInternal::DestroyInternal); call.Marshal(::rtc::Location(__FUNCTION__, "E:\\git\\webrtc\\webrtc-checkout\\src\\api\\peer_connection_proxy.h", 28), destructor_thread()); } private: void DestroyInternal() { c_ = nullptr; } rtc::scoped_refptr<INTERNAL_CLASS> c_; public: // 创建代理对象静态方法 static rtc::scoped_refptr<PeerConnectionProxyWithInternal> Create( rtc::Thread* primary_thread, rtc::Thread* secondary_thread, INTERNAL_CLASS* PeerConnection) { return new rtc::RefCountedObject<PeerConnectionProxyWithInternal>( primary_thread, secondary_thread, PeerConnection); } private: rtc::Thread* destructor_thread() const { return primary_thread_; } public: // local_streams的代理方法 rtc::scoped_refptr<StreamCollectionInterface> local_streams() override { MethodCall<C, rtc::scoped_refptr<StreamCollectionInterface>> call(c_, &C::local_streams); return call.Marshal(::rtc::Location(__FUNCTION__, "E:\\git\\webrtc\\webrtc-checkout\\src\\api\\peer_connection_proxy.h", 30), primary_thread_); }
代理类创建的时候可以放入两个目的线程primary_thread_
和secondary_thread_
,用来提供给代理方法使用。
local_streams()
的代理方法是通过宏PROXY_METHOD0()
创建的,该宏会创建一个MethodCall<> call
封装要执行的函数local_streams()
,然后通过call.Marshal()
将任务投递到primary_thread_
Marshal()
方法如下所示:
R Marshal(const rtc::Location& posted_from, rtc::Thread* t) { if (t->IsCurrent()) { // 是当前线程,Invoke Invoke(std::index_sequence_for<Args...>()); } else { // 不是则PostTask 然后阻塞等 t->PostTask(std::unique_ptr<QueuedTask>(this)); event_.Wait(rtc::Event::kForever); } return r_.moved_result(); }
这篇关于webrtc源码分析-线程任务管理的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23增量更新怎么做?-icode9专业技术文章分享
- 2024-11-23压缩包加密方案有哪些?-icode9专业技术文章分享
- 2024-11-23用shell怎么写一个开机时自动同步远程仓库的代码?-icode9专业技术文章分享
- 2024-11-23webman可以同步自己的仓库吗?-icode9专业技术文章分享
- 2024-11-23在 Webman 中怎么判断是否有某命令进程正在运行?-icode9专业技术文章分享
- 2024-11-23如何重置new Swiper?-icode9专业技术文章分享
- 2024-11-23oss直传有什么好处?-icode9专业技术文章分享
- 2024-11-23如何将oss直传封装成一个组件在其他页面调用时都可以使用?-icode9专业技术文章分享
- 2024-11-23怎么使用laravel 11在代码里获取路由列表?-icode9专业技术文章分享
- 2024-11-22怎么实现ansible playbook 备份代码中命名包含时间戳功能?-icode9专业技术文章分享