15#include <boost/asio/connect.hpp>
16#include <boost/asio/read.hpp>
17#include <boost/asio/write.hpp>
18#include <boost/asio/post.hpp>
19#include <boost/asio/bind_executor.hpp>
40 return boost::asio::buffer(&
_size,
sizeof(
_size));
43 boost::asio::mutable_buffer
buffer() {
69 boost::asio::io_context &io_context,
73 std::string(
"tcp client ") +
std::to_string(token.get_stream_id())),
75 _callback(
std::move(callback)),
78 _connection_timer(io_context),
81 throw_exception(std::invalid_argument(
"invalid token, only TCP tokens supported"));
88 auto self = shared_from_this();
89 boost::asio::post(
_strand, [
this, self]() {
94 using boost::system::error_code;
104 auto handle_connect = [
this, self, ep](error_code ec) {
111 _socket.set_option(boost::asio::ip::tcp::no_delay(
true));
112 log_debug(
"streaming client: connected to", ep);
115 log_debug(
"streaming client: sending stream id", stream_id);
116 boost::asio::async_write(
118 boost::asio::buffer(&stream_id,
sizeof(stream_id)),
119 boost::asio::bind_executor(
_strand, [=](error_code ec,
size_t DEBUG_ONLY(bytes)) {
130 log_debug(
"streaming client: failed to send stream id:", ec.message());
135 log_info(
"streaming client: connection failed:", ec.message());
140 log_debug(
"streaming client: connecting to", ep);
141 _socket.async_connect(ep, boost::asio::bind_executor(
_strand, handle_connect));
147 auto self = shared_from_this();
148 boost::asio::post(
_strand, [
this, self]() {
157 auto self = shared_from_this();
167 auto self = shared_from_this();
168 boost::asio::post(
_strand, [
this, self]() {
175 auto message = std::make_shared<IncomingMessage>(
_buffer_pool->Pop());
177 auto handle_read_data = [
this, self, message](boost::system::error_code ec,
size_t DEBUG_ONLY(bytes)) {
178 DEBUG_ONLY(
log_debug(
"streaming client: Client::ReadData.handle_read_data", bytes,
"bytes"));
185 boost::asio::post(
_strand, [self, message]() { self->_callback(message->pop()); });
189 log_debug(
"streaming client: failed to read data:", ec.message());
194 auto handle_read_header = [
this, self, message, handle_read_data](
195 boost::system::error_code ec,
197 DEBUG_ONLY(
log_debug(
"streaming client: Client::ReadData.handle_read_header", bytes,
"bytes"));
198 if (!ec && (message->size() > 0u)) {
205 boost::asio::async_read(
208 boost::asio::bind_executor(
_strand, handle_read_data));
210 log_debug(
"streaming client: failed to read header:", ec.message());
218 boost::asio::async_read(
220 message->size_as_buffer(),
221 boost::asio::bind_executor(
_strand, handle_read_header));
#define DEBUG_ASSERT(predicate)
#define DEBUG_ASSERT_EQ(lhs, rhs)
#define DEBUG_ASSERT_NE(lhs, rhs)
#define LIBCARLA_INITIALIZE_LIFETIME_PROFILER(display_name)
void reset(size_type size)
Reset the size of this buffer.
boost::asio::const_buffer buffer() const noexcept
Make a boost::asio::buffer from this buffer.
boost::asio::io_context::strand _strand
std::function< void(Buffer)> callback_function_type
boost::asio::deadline_timer _connection_timer
std::shared_ptr< BufferPool > _buffer_pool
boost::asio::ip::tcp::socket _socket
Helper for reading incoming TCP messages.
IncomingMessage(Buffer &&buffer)
boost::asio::mutable_buffer size_as_buffer()
boost::asio::mutable_buffer buffer()
Serializes a stream endpoint.
boost::asio::ip::tcp::endpoint to_tcp_endpoint() const
const auto & get_stream_id() const
bool protocol_is_tcp() const
static time_duration seconds(size_t timeout)
uint32_t message_size_type
This file contains definitions of common data structures used in traffic manager.
void throw_exception(const std::exception &e)
static void log_info(Args &&... args)
static void log_debug(Args &&... args)