16#include <boost/asio/connect.hpp>
17#include <boost/asio/read.hpp>
18#include <boost/asio/write.hpp>
21#include <boost/asio/bind_executor.hpp>
42 return boost::asio::buffer(&
_size,
sizeof(
_size));
46 boost::asio::mutable_buffer
buffer() {
74 boost::asio::io_context &io_context,
78 std::string(
"tcp client ") +
std::to_string(token.get_stream_id())),
80 _callback(
std::move(callback)),
83 _connection_timer(io_context),
86 throw_exception(std::invalid_argument(
"invalid token, only TCP tokens supported"));
95 auto self = shared_from_this();
100 using boost::system::error_code;
110 auto handle_connect = [
this, self, ep](error_code ec) {
123 _socket.set_option(boost::asio::ip::tcp::no_delay(
true));
124 log_debug(
"streaming client: connected to", ep);
127 log_debug(
"streaming client: sending stream id", stream_id);
128 boost::asio::async_write(
130 boost::asio::buffer(&stream_id,
sizeof(stream_id)),
131 boost::asio::bind_executor(
_strand, [=](error_code ec,
size_t DEBUG_ONLY(bytes)) {
142 log_debug(
"streaming client: failed to send stream id:", ec.message());
147 log_info(
"streaming client: connection failed:", ec.message());
152 log_debug(
"streaming client: connecting to", ep);
153 _socket.async_connect(ep, boost::asio::bind_executor(
_strand, handle_connect));
160 auto self = shared_from_this();
170 auto self = shared_from_this();
182 auto self = shared_from_this();
189 auto message = std::make_shared<IncomingMessage>(
_buffer_pool->Pop());
191 auto handle_read_data = [
this, self, message](boost::system::error_code ec,
size_t DEBUG_ONLY(bytes)) {
192 DEBUG_ONLY(
log_debug(
"streaming client: Client::ReadData.handle_read_data", bytes,
"bytes"));
198 self->_callback(message->pop());
202 log_debug(
"streaming client: failed to read data:", ec.message());
207 auto handle_read_header = [
this, self, message, handle_read_data](
208 boost::system::error_code ec,
210 DEBUG_ONLY(
log_debug(
"streaming client: Client::ReadData.handle_read_header", bytes,
"bytes"));
211 if (!ec && (message->size() > 0u)) {
217 boost::asio::async_read(
220 boost::asio::bind_executor(
_strand, handle_read_data));
222 log_debug(
"streaming client: failed to read header:", ec.message());
230 boost::asio::async_read(
232 message->size_as_buffer(),
233 boost::asio::bind_executor(
_strand, handle_read_header));
#define DEBUG_ASSERT(predicate)
#define DEBUG_ASSERT_EQ(lhs, rhs)
#define DEBUG_ASSERT_NE(lhs, rhs)
#define LIBCARLA_INITIALIZE_LIFETIME_PROFILER(display_name)
一个缓冲区池。 从这个池中弹出的缓冲区在销毁时会自动返回到池中, 这样分配的内存可以被重用。
一块原始数据。 请注意,如果需要更多容量,则会分配一个新的内存块,并 删除旧的内存块。这意味着默认情况下,缓冲区只能增长。要释放内存,使用 clear 或 pop。
void reset(size_type size)
重置缓冲区的大小。如果容量不足,当前内存将被丢弃,并分配一个新的大小为 size的内存块。 allocated.
boost::asio::const_buffer buffer() const noexcept
从这个缓冲区创建一个boost::asio::buffer。
std::function< void(Buffer)> callback_function_type
回调函数类型,接收一个Buffer作为参数。
boost::asio::io_context::strand _strand
序列化对套接字的访问。
boost::asio::deadline_timer _connection_timer
连接超时定时器。
std::shared_ptr< BufferPool > _buffer_pool
指向缓冲区池的共享指针。
std::atomic_bool _done
表示客户端是否已完成工作的原子布尔值。
boost::asio::ip::tcp::socket _socket
TCP套接字,用于与流建立连接。
const token_type _token
存储流的唯一标识令牌。
读取传入TCP消息的助手。在单个缓冲区中分配整个消息。
IncomingMessage(Buffer &&buffer)
boost::asio::mutable_buffer size_as_buffer()
boost::asio::mutable_buffer buffer()
静态断言,用于确保token_data结构体的大小与Token::data的大小相同。
bool is_valid() const
检查令牌是否有效。
boost::asio::ip::tcp::endpoint to_tcp_endpoint() const
将令牌转换为TCP端点。
const auto & get_stream_id() const
获取流ID的引用。
bool protocol_is_tcp() const
检查协议是否为TCP。
static time_duration seconds(size_t timeout)
uint32_t message_size_type
消息大小的类型定义。
void throw_exception(const std::exception &e)
static void log_info(Args &&... args)
static void log_debug(Args &&... args)