CARLA
 
载入中...
搜索中...
未找到
secondary.cpp
浏览该文件的文档.
1// Copyright (c) 2022 Computer Vision Center (CVC) at the Universitat Autonoma
2// de Barcelona (UAB).
3//
4// This work is licensed under the terms of the MIT license.
5// For a copy, see <https://opensource.org/licenses/MIT>.
6
7#include "carla/multigpu/incomingMessage.h" // 引入CARLA多GPU通信模块中负责处理接收消息的头文件
8// 此头文件可能定义了incomingMessage类或其他相关结构,用于处理从其他GPU或节点接收的数据包。
9
10#include "carla/multigpu/secondary.h" // 引入CARLA多GPU框架中提供次要或辅助功能的头文件
11// 此头文件可能包含了执行次要计算任务、数据预处理或后处理等功能的类和方法。
12
13#include "carla/BufferPool.h" // 引入CARLA框架中管理内存缓冲区的头文件
14// BufferPool类可能提供了缓冲区的分配、释放和重用功能,以优化内存使用和性能。
15
16#include "carla/Debug.h" // 引入CARLA框架中用于调试功能的头文件
17// 此头文件可能定义了调试宏、函数或类,用于在开发过程中输出调试信息、检查断言等。
18
19#include "carla/Exception.h" // 引入CARLA框架中处理异常的头文件
20// Exception类及其相关可能定义了自定义异常类型,用于在框架中处理错误情况。
21
22#include "carla/Logging.h" // 引入CARLA框架中负责日志记录的头文件
23// Logging类和相关可能提供了灵活的日志记录功能,包括不同级别的日志信息(如调试、信息、警告、错误等)。
24
25#include "carla/Time.h" // 引入CARLA框架中处理时间和计时功能的头文件
26// Time类及相关可能提供了时间戳获取、时间间隔测量、定时器等功能。
27
28#include <boost/asio/connect.hpp> // 引入Boost.Asio库中负责建立连接的头文件
29// connect函数模板用于启动到远程端点的异步或同步连接。
30
31#include <boost/asio/read.hpp> // 引入Boost.Asio库中负责读取数据的头文件
32// read和async_read函数模板用于从异步操作关联的流对象中读取数据。
33
34#include <boost/asio/write.hpp> // 引入Boost.Asio库中负责写入数据的头文件
35// write和async_write函数模板用于向异步操作关联的流对象中写入数据。
36
37#include <boost/asio/post.hpp> // 引入Boost.Asio库中用于异步任务调度的头文件
38// post函数模板用于将任务(函数对象、lambda表达式等)排队到指定的I/O执行器上,以便稍后异步执行。
39
40#include <boost/asio/bind_executor.hpp> // 引入Boost.Asio库中用于绑定执行器的头文件
41// bind_executor函数模板用于创建一个与指定执行器关联的可调用对象,确保任务在正确的执行器上执行。
42
43#include <exception> // 引入C++标准库中的异常处理头文件
44// 此头文件定义了std::exception类及其相关功能,用于在程序中处理异常和错误情况。
45namespace carla {
46namespace multigpu {
47
48 Secondary::Secondary( // 构造函数,接受端点和回调函数
49 boost::asio::ip::tcp::endpoint ep,
51 _pool(), // 初始化缓冲池
52 _socket(_pool.io_context()), // 初始化套接字
53 _endpoint(ep), // 设置端点
54 _strand(_pool.io_context()), // 初始化strand以确保线程安全
55 _connection_timer(_pool.io_context()),// 初始化连接计时器
56 _buffer_pool(std::make_shared<BufferPool>()) { // 创建共享的缓冲池
57
58 _commander.set_callback(callback); // 设置回调函数
59 }
60
61
62 Secondary::Secondary( // 另一个构造函数,接受IP和端口以及回调函数
63 std::string ip,
64 uint16_t port,
66 _pool(), // 初始化缓冲池
67 _socket(_pool.io_context()), // 初始化套接字
68 _strand(_pool.io_context()), // 初始化strand以确保线程安全
69 _connection_timer(_pool.io_context()),// 初始化连接计时器
70 _buffer_pool(std::make_shared<BufferPool>()) { // 创建共享的缓冲池
71
72 boost::asio::ip::address ip_address = boost::asio::ip::address::from_string(ip); // 从字符串转换为IP地址
73 _endpoint = boost::asio::ip::tcp::endpoint(ip_address, port); // 设置端点
74 _commander.set_callback(callback); // 设置回调函数
75 }
76
77 Secondary::~Secondary() { // 析构函数
78 _pool.Stop(); // 停止缓冲池
79 }
80
81 void Secondary::Connect() { // 连接函数
82 AsyncRun(2u); // 启动异步运行,使用2个工作线程
83
84 _commander.set_secondary(shared_from_this()); // 设置当前对象为命令的次要部分
85
86 std::weak_ptr<Secondary> weak = shared_from_this(); // 创建弱指针以防止循环引用
87 boost::asio::post(_strand, [weak]() { // 在strand中发布任务
88 auto self = weak.lock(); // 锁定弱指针
89 if (!self) return; // 如果对象已被销毁,返回
90
91 if (self->_done) { // 如果已完成
92 return;
93 }
94
95 if (self->_socket.is_open()) { // 如果套接字是打开的
96 self->_socket.close(); // 关闭套接字
97 }
98
99 auto handle_connect = [weak](boost::system::error_code ec) { // 处理连接结果的回调
100 auto self = weak.lock(); // 锁定弱指针
101 if (!self) return; // 如果对象已被销毁,返回
102 if (ec) { // 如果有错误
103 log_error("secondary server: connection failed:", ec.message()); // 记录错误信息
104 if (!self->_done) // 如果未完成,尝试重连
105 self->Reconnect();
106 return;
107 }
108
109 if (self->_done) { // 如果已完成
110 return;
111 }
112
113 // 此设置强制不使用Nagle算法。
114 // 在Linux上提高同步模式速度约3倍。
115 self->_socket.set_option(boost::asio::ip::tcp::no_delay(true)); // 禁用Nagle算法
116
117 log_info("secondary server: connected to ", self->_endpoint); // 记录连接信息
118
119 self->ReadData(); // 调用读取数据函数
120 };
121
122 self->_socket.async_connect(self->_endpoint, boost::asio::bind_executor(self->_strand, handle_connect)); // 异步连接
123 });
124 }
125
126 void Secondary::Stop() { // 停止函数
127 _connection_timer.cancel(); // 取消连接计时器
128 std::weak_ptr<Secondary> weak = shared_from_this(); // 创建弱指针
129 boost::asio::post(_strand, [weak]() { // 在strand中发布停止任务
130 auto self = weak.lock(); // 锁定弱指针
131 if (!self) return; // 如果对象已被销毁,返回
132 self->_done = true; // 标记为已完成
133 if (self->_socket.is_open()) { // 如果套接字是打开的
134 self->_socket.close(); // 关闭套接字
135 }
136 });
137 }
138
139 void Secondary::Reconnect() { // 重连函数
140 std::weak_ptr<Secondary> weak = shared_from_this(); // 创建弱指针
141 _connection_timer.expires_from_now(time_duration::seconds(1u)); // 设置计时器为1秒后到期
142 _connection_timer.async_wait([weak](boost::system::error_code ec) { // 等待计时器到期后的回调
143 auto self = weak.lock(); // 锁定弱指针
144 if (!self) return; // 如果对象已被销毁,返回
145 if (!ec) { // 如果没有错误
146 self->Connect(); // 重新连接
147 }
148 });
149 }
150
151 void Secondary::AsyncRun(size_t worker_threads) { // 启动异步运行,接受工作线程数量
152 _pool.AsyncRun(worker_threads); // 调用缓冲池的异步运行
153 }
154
155 void Secondary::Write(std::shared_ptr<const carla::streaming::detail::tcp::Message> message) { // 写入函数
156 DEBUG_ASSERT(message != nullptr); // 确保消息不为空
157 DEBUG_ASSERT(!message->empty()); // 确保消息不为空
158 std::weak_ptr<Secondary> weak = shared_from_this(); // 创建弱指针
159 boost::asio::post(_strand, [=]() { // 在strand中发布写入任务
160 auto self = weak.lock(); // 锁定弱指针
161 if (!self) return; // 如果对象已被销毁,返回
162 if (!self->_socket.is_open()) { // 如果套接字未打开
163 return; // 返回
164 }
165
166 auto handle_sent = [weak, message](const boost::system::error_code &ec, size_t DEBUG_ONLY(bytes)) { // 处理发送结果的回调
167 auto self = weak.lock(); // 锁定弱指针
168 if (!self) return; // 如果对象已被销毁,返回
169 if (ec) { // 如果有错误
170 log_error("error sending data: ", ec.message()); // 记录发送数据的错误
171 }
172 };
173
174 // 设置超时期限
175 boost::asio::async_write(
176 self->_socket,// 目标 socket
177 message->GetBufferSequence(),// 获取要发送的消息缓冲区序列
178 boost::asio::bind_executor(self->_strand, handle_sent)); // 绑定执行器,处理发送完成后的回调
179 });
180 }
181
182 void Secondary::Write(Buffer buffer) {
183// 创建一个视图从传入的缓冲区
184 auto view_data = carla::BufferView::CreateFrom(std::move(buffer));
185 // 创建消息
186 auto message = Secondary::MakeMessage(view_data);
187
188 DEBUG_ASSERT(message != nullptr);// 确保消息不为空
189 DEBUG_ASSERT(!message->empty());// 确保消息不为空
190 std::weak_ptr<Secondary> weak = shared_from_this(); // 创建弱指针以避免循环引用
191 boost::asio::post(_strand, [=]() {// 在strand中异步执行
192 auto self = weak.lock(); // 锁定弱指针
193 if (!self) return;// 如果对象已被销毁,返回
194 if (!self->_socket.is_open()) {// 如果socket未打开
195 return;
196 }
197
198 // 发送完成后的处理函数
199 auto handle_sent = [weak, message](const boost::system::error_code &ec, size_t DEBUG_ONLY(bytes)) {
200 auto self = weak.lock(); // 锁定弱指针
201 if (!self) return;// 如果对象已被销毁,返回
202 if (ec) {// 如果发生错误
203 log_error("error sending data: ", ec.message()); // 记录错误
204 }
205 };
206
207 // 设置超时(注释掉)
208 boost::asio::async_write(
209 self->_socket,// 异步写入socket
210 message->GetBufferSequence(),// 获取消息的缓冲区序列
211 boost::asio::bind_executor(self->_strand, handle_sent));// 绑定执行器
212 });
213 }
214
215 void Secondary::Write(std::string text) {
216 std::weak_ptr<Secondary> weak = shared_from_this(); // 创建弱指针以避免循环引用
217 boost::asio::post(_strand, [=]() {
218 auto self = weak.lock();// 锁定弱指针
219 if (!self) return;// 如果对象已被销毁,返回
220 if (!self->_socket.is_open()) {// 如果socket未打开
221 return;
222 }
223
224 // 发送完成后的处理函数
225 auto handle_sent = [weak](const boost::system::error_code &ec, size_t DEBUG_ONLY(bytes)) {
226 auto self = weak.lock();// 锁定弱指针
227 if (!self) return;// 如果对象已被销毁,返回
228 if (ec) { // 如果发生错误
229 log_error("error sending data: ", ec.message());// 记录错误
230 }
231 };
232
233 // 设置超时(注释掉)
234 // 发送大小缓冲区
235 int this_size = text.size();// 获取字符串大小
236 boost::asio::async_write(
237 self->_socket,// 异步写入socket
238 boost::asio::buffer(&this_size, sizeof(this_size)),// 写入大小
239 boost::asio::bind_executor(self->_strand, handle_sent));// 绑定执行器
240
241 // 发送字符
242 boost::asio::async_write(
243 self->_socket,// 异步写入socket
244 boost::asio::buffer(text.c_str(), text.size()),// 写入字符
245 boost::asio::bind_executor(self->_strand, handle_sent));// 绑定执行器
246 });
247 }
248
249 // 读取数据的处理函数
250 void Secondary::ReadData() {
251 std::weak_ptr<Secondary> weak = shared_from_this();// 创建弱指针以避免循环引用
252 boost::asio::post(_strand, [weak]() {// 在strand中异步执行
253 auto self = weak.lock();// 锁定弱指针
254 if (!self) return;// 如果对象已被销毁,返回
255 if (self->_done) {// 如果已完成,不再处理
256 return;
257 }
258
259 auto message = std::make_shared<IncomingMessage>(self->_buffer_pool->Pop());// 创建共享 IncomingMessage
260
261 // 读取数据的处理函数
262 auto handle_read_data = [weak, message](boost::system::error_code ec, size_t DEBUG_ONLY(bytes)) {
263 auto self = weak.lock();// 锁定弱指针
264 if (!self) return;// 如果对象已被销毁,返回
265 if (!ec) {// 如果没有错误
266 DEBUG_ASSERT_EQ(bytes, message->size());// 确保字节数匹配
267 DEBUG_ASSERT_NE(bytes, 0u);
268 // 移动缓冲区到回调函数并开始读取下一部分数据
269 self->GetCommander().process_command(message->pop());
270 self->ReadData();// 继续读取数据
271 } else {
272 // 如果发生错误,从最顶部重新开始
273 log_error("secondary server: failed to read data: ", ec.message());// 记录错误
274 // 连接(注释掉)
275 }
276 };
277
278 // 读取头部的处理函数
279 auto handle_read_header = [weak, message, handle_read_data](
280 boost::system::error_code ec,
281 size_t DEBUG_ONLY(bytes)) {
282 auto self = weak.lock();// 锁定弱指针
283 if (!self) return; // 如果对象已被销毁,返回
284 if (!ec && (message->size() > 0u)) {// 如果没有错误且消息大小大于零
285 DEBUG_ASSERT_EQ(bytes, sizeof(carla::streaming::detail::message_size_type));// 验证字节数
286 if (self->_done) { // 如果已完成,不再处理
287 return;
288 }
289 // 现在我们知道即将到来的缓冲区的大小,可以分配缓冲区并开始填充数据
290 boost::asio::async_read(
291 self->_socket,// 异步读取socket
292 message->buffer(),// 读取消息缓冲区
293 boost::asio::bind_executor(self->_strand, handle_read_data));// 绑定执行器
294 } else if (!self->_done) {// 如果发生错误且未完成
295 log_error("secondary server: failed to read header: ", ec.message());// 记录错误
296 // 调试输出(注释掉)
297 // 调试输出(注释掉)
298 // 连接(注释掉)
299 }
300 };
301
302 // 读取即将到来的缓冲区的大小.
303 boost::asio::async_read(
304 self->_socket,// 从socket异步读取数据
305 message->size_as_buffer(),// 读取消息大小的缓冲区
306 boost::asio::bind_executor(self->_strand, handle_read_header));// 绑定执行器并指定处理头部的回调函数
307 });
308 }
309
310} // namespace multigpu
311} // namespace carla
if(!Actor||!Actor->Destroy())
#define DEBUG_ASSERT(predicate)
Definition Debug.h:68
#define DEBUG_ASSERT_EQ(lhs, rhs)
Definition Debug.h:81
#define DEBUG_ASSERT_NE(lhs, rhs)
Definition Debug.h:82
#define DEBUG_ONLY(code)
Definition Debug.h:55
一个缓冲区池。 从这个池中弹出的缓冲区在销毁时会自动返回到池中, 这样分配的内存可以被重用。
Definition BufferPool.h:29
static std::shared_ptr< BufferView > CreateFrom(Buffer &&buffer)
Definition BufferView.h:60
一块原始数据。 请注意,如果需要更多容量,则会分配一个新的内存块,并 删除旧的内存块。这意味着默认情况下,缓冲区只能增长。要释放内存,使用 clear 或 pop。
std::function< void(MultiGPUCommand, carla::Buffer)> callback_type
void set_secondary(std::shared_ptr< Secondary > secondary)
void set_callback(callback_type callback)
boost::asio::ip::tcp::endpoint _endpoint
Definition secondary.h:80
Secondary(boost::asio::ip::tcp::endpoint ep, SecondaryCommands::callback_type callback)
Definition secondary.cpp:48
boost::asio::io_context::strand _strand
Definition secondary.h:81
SecondaryCommands _commander
Definition secondary.h:85
void AsyncRun(size_t worker_threads)
static time_duration seconds(size_t timeout)
Definition Time.h:22
uint32_t message_size_type
消息大小的类型定义。
Definition Types.h:40
CARLA模拟器的主命名空间。
Definition Carla.cpp:139
static void log_error(Args &&... args)
Definition Logging.h:115
static void log_info(Args &&... args)
Definition Logging.h:86