CARLA
 
载入中...
搜索中...
未找到
streaming/detail/tcp/Client.cpp
浏览该文件的文档.
1// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
2// de Barcelona (UAB).
3//
4// This work is licensed under the terms of the MIT license.
5// For a copy, see <https://opensource.org/licenses/MIT>.
6
8
9#include "carla/BufferPool.h"
10#include "carla/Debug.h"
11#include "carla/Exception.h"
12#include "carla/Logging.h"
13#include "carla/Time.h"
14
15// C++ Boost Asio是一个基于事件驱动的网络编程库,提供了异步的、非阻塞的网络编程接口。
16#include <boost/asio/connect.hpp>
17#include <boost/asio/read.hpp>
18#include <boost/asio/write.hpp>
19// 通过停止使用boost post,删除了ServerSession和Client中Write函数的并行化,
20// 这会导致客户机和服务器之间的不同步,并最终导致泄漏:https://github.com/carla-simulator/carla/pull/8130
21#include <boost/asio/bind_executor.hpp>
22
23#include <exception>
24
25namespace carla {
26namespace streaming {
27namespace detail {
28namespace tcp {
29
30 // ===========================================================================
31 // -- 传入消息 IncomingMessage ------------------------------------------------
32 // ===========================================================================
33
34 /// 读取传入TCP消息的助手。在单个缓冲区中分配整个消息。
36 public:
37
39
40 // 获取缓冲区的大小
41 boost::asio::mutable_buffer size_as_buffer() {
42 return boost::asio::buffer(&_size, sizeof(_size));
43 }
44
45 // 获取消息的缓冲区
46 boost::asio::mutable_buffer buffer() {
47 DEBUG_ASSERT(_size > 0u);
49 return _message.buffer();
50 }
51
52 auto size() const {
53 return _size;
54 }
55
56 auto pop() {
57 // std::move 将左值转换为右值(转移所有权或启用对象的移动语义)
58 // 移动语义允许开发人员有效地将资源(如内存或文件句柄)从一个对象传输到另一个对象,而无需进行不必要的复制。
59 return std::move(_message);
60 }
61
62 private:
63
65
67 };
68
69 // ===========================================================================
70 // -- 客户端 ------------------------------------------------------------------
71 // ===========================================================================
72
74 boost::asio::io_context &io_context,
75 const token_type &token,
78 std::string("tcp client ") + std::to_string(token.get_stream_id())),
79 _token(token),
80 _callback(std::move(callback)),
81 _socket(io_context),
82 _strand(io_context),
83 _connection_timer(io_context),
84 _buffer_pool(std::make_shared<BufferPool>()) {
85 if (!_token.protocol_is_tcp()) {
86 throw_exception(std::invalid_argument("invalid token, only TCP tokens supported"));
87 }
88 }
89
90 Client::~Client() = default;
91
92
93 // 连接
95 auto self = shared_from_this();
96 if (_done) {
97 return;
98 }
99
100 using boost::system::error_code;
101
102 if (_socket.is_open()) {
103 _socket.close();
104 }
105
108 const auto ep = _token.to_tcp_endpoint();
109
110 auto handle_connect = [this, self, ep](error_code ec) {
111 if (!ec) {
112 if (_done) {
113 return;
114 }
115 // 强制不使用Nagle(内格尔)算法。
116 // Nagle算法:当一个TCP连接上有数据要发送时,并不立即发送出去,
117 // 而是等待一小段时间(通常是由一个RTT,即往返时延来估计),看看是否有更多的数据要发送。
118 // 如果在这段时间内有额外的数据产生,那么这些数据就会被组装成一个更大的报文一起发送。
119 // 这样做可以减少网络中由于过多的小包而引起的拥塞。
120 // 然而,它也可能引入额外的延迟,特别是在同步模式中,用户可能感觉到响应变慢,
121 // 禁用Nagle算法,将Linux上的同步模式速度提高了约3倍。
122 // 以牺牲带宽效率为代价,换取更低的延迟。
123 _socket.set_option(boost::asio::ip::tcp::no_delay(true));
124 log_debug("streaming client: connected to", ep);
125 // 发送流id以订阅流。
126 const auto &stream_id = _token.get_stream_id();
127 log_debug("streaming client: sending stream id", stream_id);
128 boost::asio::async_write(
129 _socket,
130 boost::asio::buffer(&stream_id, sizeof(stream_id)),
131 boost::asio::bind_executor(_strand, [=](error_code ec, size_t DEBUG_ONLY(bytes)) {
132 // 确保在连接停止后停止执行。
133 if (_done) {
134 return;
135 }
136 if (!ec) {
137 DEBUG_ASSERT_EQ(bytes, sizeof(stream_id));
138 // 如果成功,开始读取数据。
139 ReadData();
140 } else {
141 // 否则再尝试连接一次。
142 log_debug("streaming client: failed to send stream id:", ec.message());
143 Connect();
144 }
145 }));
146 } else {
147 log_info("streaming client: connection failed:", ec.message());
148 Reconnect();
149 }
150 };
151
152 log_debug("streaming client: connecting to", ep);
153 _socket.async_connect(ep, boost::asio::bind_executor(_strand, handle_connect));
154 }
155
156
157 // 停止连接
159 _connection_timer.cancel();
160 auto self = shared_from_this();
161 _done = true;
162 if (_socket.is_open()) {
163 _socket.close();
164 }
165 }
166
167
168 // 重新连接
170 auto self = shared_from_this();
171 _connection_timer.expires_from_now(time_duration::seconds(1u));
172 _connection_timer.async_wait([this, self](boost::system::error_code ec) {
173 if (!ec) {
174 Connect();
175 }
176 });
177 }
178
179
180 // 读取数据
182 auto self = shared_from_this();
183 if (_done) {
184 return;
185 }
186
187 // log_debug("streaming client: Client::ReadData");
188
189 auto message = std::make_shared<IncomingMessage>(_buffer_pool->Pop());
190
191 auto handle_read_data = [this, self, message](boost::system::error_code ec, size_t DEBUG_ONLY(bytes)) {
192 DEBUG_ONLY(log_debug("streaming client: Client::ReadData.handle_read_data", bytes, "bytes"));
193 if (!ec) {
194 DEBUG_ASSERT_EQ(bytes, message->size());
195 DEBUG_ASSERT_NE(bytes, 0u);
196 // 将缓冲区移动到回调函数并开始读取下一块数据。
197 // log_debug("streaming client: success reading data, calling the callback");
198 self->_callback(message->pop());
199 ReadData();
200 } else {
201 // 像往常一样,如果出了什么问题,就从头再来。
202 log_debug("streaming client: failed to read data:", ec.message());
203 Connect();
204 }
205 };
206
207 auto handle_read_header = [this, self, message, handle_read_data](
208 boost::system::error_code ec,
209 size_t DEBUG_ONLY(bytes)) {
210 DEBUG_ONLY(log_debug("streaming client: Client::ReadData.handle_read_header", bytes, "bytes"));
211 if (!ec && (message->size() > 0u)) {
212 DEBUG_ASSERT_EQ(bytes, sizeof(message_size_type));
213 if (_done) {
214 return;
215 }
216 // 现在我们知道了即将到来的缓冲区的大小,我们可以分配缓冲区并开始将数据放入其中。
217 boost::asio::async_read(
218 _socket,
219 message->buffer(),
220 boost::asio::bind_executor(_strand, handle_read_data));
221 } else if (!_done) {
222 log_debug("streaming client: failed to read header:", ec.message());
223 DEBUG_ONLY(log_debug("size = ", message->size()));
224 DEBUG_ONLY(log_debug("bytes = ", bytes));
225 Connect();
226 }
227 };
228
229 // 读取即将到来的缓冲区的大小。
230 boost::asio::async_read(
231 _socket,
232 message->size_as_buffer(),
233 boost::asio::bind_executor(_strand, handle_read_header));
234 }
235
236} // namespace tcp
237} // namespace detail
238} // namespace streaming
239} // namespace carla
#define DEBUG_ASSERT(predicate)
Definition Debug.h:68
#define DEBUG_ASSERT_EQ(lhs, rhs)
Definition Debug.h:81
#define DEBUG_ASSERT_NE(lhs, rhs)
Definition Debug.h:82
#define DEBUG_ONLY(code)
Definition Debug.h:55
#define LIBCARLA_INITIALIZE_LIFETIME_PROFILER(display_name)
一个缓冲区池。 从这个池中弹出的缓冲区在销毁时会自动返回到池中, 这样分配的内存可以被重用。
Definition BufferPool.h:29
一块原始数据。 请注意,如果需要更多容量,则会分配一个新的内存块,并 删除旧的内存块。这意味着默认情况下,缓冲区只能增长。要释放内存,使用 clear 或 pop。
void reset(size_type size)
重置缓冲区的大小。如果容量不足,当前内存将被丢弃,并分配一个新的大小为 size的内存块。 allocated.
boost::asio::const_buffer buffer() const noexcept
从这个缓冲区创建一个boost::asio::buffer。
std::function< void(Buffer)> callback_function_type
回调函数类型,接收一个Buffer作为参数。
boost::asio::io_context::strand _strand
序列化对套接字的访问。
boost::asio::deadline_timer _connection_timer
连接超时定时器。
std::shared_ptr< BufferPool > _buffer_pool
指向缓冲区池的共享指针。
std::atomic_bool _done
表示客户端是否已完成工作的原子布尔值。
boost::asio::ip::tcp::socket _socket
TCP套接字,用于与流建立连接。
const token_type _token
存储流的唯一标识令牌。
读取传入TCP消息的助手。在单个缓冲区中分配整个消息。
静态断言,用于确保token_data结构体的大小与Token::data的大小相同。
bool is_valid() const
检查令牌是否有效。
boost::asio::ip::tcp::endpoint to_tcp_endpoint() const
将令牌转换为TCP端点。
const auto & get_stream_id() const
获取流ID的引用。
bool protocol_is_tcp() const
检查协议是否为TCP。
static time_duration seconds(size_t timeout)
Definition Time.h:22
uint32_t message_size_type
消息大小的类型定义。
Definition Types.h:40
CARLA模拟器的主命名空间。
Definition Carla.cpp:139
void throw_exception(const std::exception &e)
Definition Carla.cpp:142
static void log_info(Args &&... args)
Definition Logging.h:86
static void log_debug(Args &&... args)
Definition Logging.h:71
本文件包含了网络通信相关类所需的头文件。