Discuz! Board

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 372|回复: 0
打印 上一主题 下一主题

Chromium IPC 深入分析(基于Mojo)

[复制链接]

1228

主题

1997

帖子

7580

积分

认证用户组

Rank: 5Rank: 5

积分
7580
跳转到指定楼层
楼主
发表于 2023-11-18 18:39:34 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
https://zhuanlan.zhihu.com/p/508362483

本文主要讲解是最新版的chromium ipc,需要有mojo的基础知识。
最新版的chromium ipc的底层通信是通过mojo接口通信的,ipc.mojom是ipc的mojo接口通道。
Browser与Render之间通信
如上图Render进程的RenderFrameImpl发送消息给Browser进程RenderFrameHostImpl,中间是通过ipc.mojo接口。
ipc.mojo的idl代码
module IPC.mojom;import "mojo/public/interfaces/bindings/native_struct.mojom";import "mojo/public/mojom/base/generic_pending_associated_receiver.mojom";// Typemapped such that arbitrarily large IPC::Message objects can be sent and// received with minimal copying.struct Message {  array<uint8> bytes;  array<mojo.native.SerializedHandle>? handles;};interface Channel {  // Informs the remote end of this client's PID. Must be called exactly once,  // before any calls to Receive() below.  SetPeerPid(int32 pid);  // Transmits a classical Chrome IPC message.  [UnlimitedSize]  Receive(Message message);  // Requests a Channel-associated interface.  GetAssociatedInterface(      mojo_base.mojom.GenericPendingAssociatedReceiver receiver);};// A strictly nominal interface used to identify Channel bootstrap requests.// This is only used in `AgentSchedulingGroup` initialization.interface ChannelBootstrap {};
如上可以看出mojo接口是Channel,Channel中有Receive就是消息处理和发送函数,GetAssociatedInterface是用于创建mojo接口关联函数。接下来围绕Channel讲解,
IPC核心讲解
主要代码路径在src/ipc
如上图主要类的关系:
注意Listener和Sender两个接口类,主要是发送消息和接受消息的虚接口,最终的实现调用还是MessagePipeReader类。
MessagePipeReader类是mojo::Channel的实现,MessagePipeReader这个类就是ipc消息处理和发送类。
class COMPONENT_EXPORT(IPC) MessagePipeReader : public mojom::Channel { public:  class Delegate {   public:    virtual void OnPeerPidReceived(int32_t peer_pid) = 0;    virtual void OnMessageReceived(const Message& message) = 0;    virtual void OnBrokenDataReceived() = 0;    virtual void OnPipeError() = 0;    virtual void OnAssociatedInterfaceRequest(        mojo::GenericPendingAssociatedReceiver receiver) = 0;  };.......  bool Send(std::unique_ptr<Message> message);....... protected:  void OnPipeClosed();  void OnPipeError(MojoResult error); private:  // mojom::Channel:  void SetPeerPid(int32_t peer_pid) override;  void Receive(MessageView message_view) override;  void GetAssociatedInterface(      mojo::GenericPendingAssociatedReceiver receiver) override;  void ForwardMessage(mojo::Message message);  // |delegate_| is null once the message pipe is closed.  raw_ptr<Delegate> delegate_; // 用于调用消息处理  mojo::AssociatedRemote<mojom::Channel> sender_; // AssociatedRemote用于消息发送  std::unique_ptr<mojo::ThreadSafeForwarder<mojom::Channel>>      thread_safe_sender_;  mojo::AssociatedReceiver<mojom::Channel> receiver_;  base::ThreadChecker thread_checker_;  base::WeakPtrFactory<MessagePipeReader> weak_ptr_factory_{this};};
如上代码,MessagePipeReader 继承mojom::Channel,MessagePipeReader中有mojo::AssociatedRemote<mojom::Channel> sender_成员用于消息发送。
IPC消息发送
下面是以Render进程中RenderFrameImpl的消息发送流程图
如上图,我们以RenderFrameImpl为例,最终调用的是MessagePipeReader::Send函数,通过sender_->Receive函数mojo通道发送到另一个进程
bool MessagePipeReader::Send(std::unique_ptr<Message> message) {  CHECK(message->IsValid());  TRACE_EVENT_WITH_FLOW0("toplevel.flow", "MessagePipeReader::Send",                         message->flags(), TRACE_EVENT_FLAG_FLOW_OUT);  absl:ptional<std::vector<mojo::native::SerializedHandlePtr>> handles;  MojoResult result = MOJO_RESULT_OK;  result = ChannelMojo::ReadFromMessageAttachmentSet(message.get(), &handles);  if (result != MOJO_RESULT_OK)    return false;  if (!sender_)    return false;  base::span<const uint8_t> bytes(static_cast<const uint8_t*>(message->data()),                                  message->size());  sender_->Receive(MessageView(bytes, std::move(handles)));  DVLOG(4) << "Send " << message->type() << ": " << message->size();  return true;}
IPC消息处理
下面是以Render进程中RenderFrameImpl的消息处理流程图
1.mojo通过调用MessagePipeReader::Receive然后分发给对应的类处理
void MessagePipeReader::Receive(MessageView message_view) {  if (message_view.bytes().empty()) {    delegate_->OnBrokenDataReceived();    return;  }  Message message(reinterpret_cast<const char*>(message_view.bytes().data()),                  message_view.bytes().size());  if (!message.IsValid()) {    delegate_->OnBrokenDataReceived();    return;  }  DVLOG(4) << "Receive " << message.type() << ": " << message.size();  MojoResult write_result = ChannelMojo::WriteToMessageAttachmentSet(      message_view.TakeHandles(), &message);  if (write_result != MOJO_RESULT_OK) {    OnPipeError(write_result);    return;  }  TRACE_EVENT_WITH_FLOW0("toplevel.flow", "MessagePipeReader::Receive",                         message.flags(), TRACE_EVENT_FLAG_FLOW_IN);  delegate_->OnMessageReceived(message); // 通过代理分发处理}
2.在消息处理函数中,delegate_指的是ChannelMojo,代码路径ipc/ipc_channel_mojo.h
class COMPONENT_EXPORT(IPC) ChannelMojo    : public Channel,      public Channel::AssociatedInterfaceSupport,      public internal::MessagePipeReader:elegate { public:......  bool Send(Message* message) override;......  void OnMessageReceived(const Message& message) override;...... private:......  const mojo::MessagePipeHandle pipe_;  std::unique_ptr<MojoBootstrap> bootstrap_;  raw_ptr<Listener> listener_;  std::unique_ptr<internal::MessagePipeReader> message_reader_;......};
ChannelMojo的基类有MessagePipeReader:elegate,ChannelMojo::OnMessageReceived来转发消息处理
void ChannelMojo::OnMessageReceived(const Message& message) {  const Message* message_ptr = &message;  TRACE_IPC_MESSAGE_SEND("ipc,toplevel", "ChannelMojo::OnMessageReceived",                         message_ptr);  listener_->OnMessageReceived(message);  if (message.dispatch_error())    listener_->OnBadMessageReceived(message);}
如上代码,是通过listener_来转发消息。
listener_本质上就是ChannelProxy::Context,在创建Channel的时候会把ChannelProxy::Context传入给ChannelMojo中。
void ChannelProxy::Context::CreateChannel(    std::unique_ptr<ChannelFactory> factory) {  base::AutoLock channel_lock(channel_lifetime_lock_);  DCHECK(!channel_);  DCHECK_EQ(factory->GetIPCTaskRunner(), ipc_task_runner_);  channel_ = factory->BuildChannel(this); // 创建Channel  Channel::AssociatedInterfaceSupport* support =      channel_->GetAssociatedInterfaceSupport();  if (support) {    thread_safe_channel_ = support->CreateThreadSafeChannel();    base::AutoLock filter_lock(pending_filters_lock_);    for (auto& entry : pending_io_thread_interfaces_)      support->AddGenericAssociatedInterface(entry.first, entry.second);    pending_io_thread_interfaces_.clear();  }}
3.接下来ChannelProxy::Context::OnMessageReceived处理消息
bool ChannelProxy::Context::OnMessageReceived(const Message& message) {  // First give a chance to the filters to process this message.  if (!TryFilters(message))    OnMessageReceivedNoFilter(message);  return true;}
如上代码本质上是调用ChannelProxy::Context::OnMessageReceivedNoFilter,这里我们发现有个PostTask。
为什么需要post任务的方式去处理消息?因为我们知道每个IPC消息不一定在同一个线程,所以在处理message的时候会根据消息的routing_id去查找对应的TaskRunner,这样就可以保证IPC消息在正确的线程上处理。如下代码
bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {  GetTaskRunner(message.routing_id())      ->ostTask(FROM_HERE,                 base::BindOnce(&Context::OnDispatchMessage, this, message));  return true;}
然后调用到ChannelProxy::Context::OnDispatchMessage处理消息,这里真正会调用listener_->OnMessageReceived的函数去处理消息。listener_是Listener,在这个流程中是AgentSchedulingGroup。如下代码:
// Called on the listener's threadvoid ChannelProxy::Context::OnDispatchMessage(const Message& message) {  if (!listener_)    return;  OnDispatchConnected();#if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED)  Logging* logger = Logging::GetInstance();  if (message.type() == IPC_LOGGING_ID) {    logger->OnReceivedLoggingMessage(message);    return;  }  if (logger->Enabled())    logger->OnPreDispatchMessage(message);#endif  listener_->OnMessageReceived(message); // 真正调用listener处理  if (message.dispatch_error())    listener_->OnBadMessageReceived(message);#if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED)  if (logger->Enabled())    logger->OnPostDispatchMessage(message);#endif}
4.调用AgentSchedulingGroup去处理消息,每个消息会有一个routing_id,message这个routing_id会找到对应的listener。
bool AgentSchedulingGroup::OnMessageReceived(const IPC::Message& message) {  DCHECK_NE(message.routing_id(), MSG_ROUTING_CONTROL);  auto* listener = GetListener(message.routing_id());  if (!listener)    return false;  return listener->OnMessageReceived(message);}
5.上面根据routing_id找到RenderFrameImpl,RenderFrameImpl的基类会有IPC:istener,所以调用RenderFrameImpl::OnMessageReceived进行处理对应的IPC
bool RenderFrameImpl::OnMessageReceived(const IPC::Message& msg) {  // We may get here while detaching, when the WebFrame has been deleted.  Do  // not process any messages in this state.  if (!frame_)    return false;  DCHECK(!frame_->GetDocument().IsNull());  GetContentClient()->SetActiveURL(      frame_->GetDocument().Url(),      frame_->Top()->GetSecurityOrigin().ToString().Utf8());  for (auto& observer : observers_) {    if (observer.OnMessageReceived(msg))      return true;  }  return false;}
管理IPC:istener
管理IPC:istener的类有很多,RenderThreadImpl,AgentSchedulingGroup,AgentSchedulingGroupHost,ChildThreadImpl等等。
这里主要讲解Render进程管理的AgentSchedulingGroup管理IPC:istener,主要存储在listener_map_变量中
routing_id和listener通过AgentSchedulingGroup::AddRoute进行添加的
void AgentSchedulingGroup::AddRoute(int32_t routing_id, Listener* listener) {  DCHECK(!listener_map_.Lookup(routing_id));  listener_map_.AddWithID(listener, routing_id); // 存储routing_id和listener  render_thread_.AddRoute(routing_id, listener);  // See warning in `GetAssociatedInterface`.  // Replay any `GetAssociatedInterface` calls for this route.  auto range = pending_receivers_.equal_range(routing_id);  for (auto iter = range.first; iter != range.second; ++iter) {    ReceiverData& data = iter->second;    listener->OnAssociatedInterfaceRequest(data.name,                                           data.receiver.PassHandle());  }  pending_receivers_.erase(range.first, range.second);}
文章到此结束,如果有出入请多多指教!

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

QQ|Archiver|手机版|小黑屋|firemail ( 粤ICP备15085507号-1 )

GMT+8, 2024-4-30 00:23 , Processed in 0.058780 second(s), 19 queries .

Powered by Discuz! X3

© 2001-2013 Comsenz Inc.

快速回复 返回顶部 返回列表