14#include <boost/asio/read.hpp>
15#include <boost/asio/write.hpp>
16#include <boost/asio/bind_executor.hpp>
17#include <boost/asio/post.hpp>
28 boost::asio::io_context &io_context,
37 _deadline(io_context),
43 boost::system::error_code ec;
44 _socket.shutdown(boost::asio::socket_base::shutdown_both, ec);
57 const boost::asio::ip::tcp::no_delay option(
true);
63 on_opened(shared_from_this());
68 void Primary::Write(std::shared_ptr<const carla::streaming::detail::tcp::Message> message) {
71 std::weak_ptr<Primary> weak = shared_from_this();
72 boost::asio::post(
_strand, [=]() {
73 auto self = weak.lock();
75 if (!self->_socket.is_open()) {
79 auto handle_sent = [weak, message](
const boost::system::error_code &ec,
size_t DEBUG_ONLY(bytes)) {
80 auto self = weak.lock();
83 log_error(
"session ", self->_session_id,
": error sending data: ", ec.message());
90 self->_deadline.expires_from_now(self->_timeout);
91 boost::asio::async_write(
93 message->GetBufferSequence(),
94 boost::asio::bind_executor(self->_strand, handle_sent));
99 std::weak_ptr<Primary> weak = shared_from_this();
100 boost::asio::post(
_strand, [=]() {
101 auto self = weak.lock();
103 if (!self->_socket.is_open()) {
108 self->_deadline.expires_from_now(self->_timeout);
109 int this_size = text.size();
110 boost::asio::async_write(
112 boost::asio::buffer(&this_size,
sizeof(this_size)),
113 boost::asio::bind_executor(self->_strand, [](
const boost::system::error_code &,
size_t){ }));
115 boost::asio::async_write(
117 boost::asio::buffer(text.c_str(), text.size()),
118 boost::asio::bind_executor(self->_strand, [](
const boost::system::error_code &,
size_t){ }));
123 std::weak_ptr<Primary> weak = shared_from_this();
124 boost::asio::post(
_strand, [weak]() {
125 auto self = weak.lock();
128 auto message = std::make_shared<IncomingMessage>(self->_buffer_pool->Pop());
130 auto handle_read_data = [weak, message](boost::system::error_code ec,
size_t DEBUG_ONLY(bytes)) {
131 auto self = weak.lock();
138 self->_on_response(self, message->pop());
139 std::cout <<
"Getting data on listener\n";
143 log_error(
"primary server: failed to read data: ", ec.message());
147 auto handle_read_header = [weak, message, handle_read_data](
148 boost::system::error_code ec,
150 auto self = weak.lock();
152 if (!ec && (message->size() > 0u)) {
155 boost::asio::async_read(
158 boost::asio::bind_executor(self->_strand, handle_read_data));
161 log_error(
"Primary server: failed to read header: ", ec.message());
169 boost::asio::async_read(
171 message->size_as_buffer(),
172 boost::asio::bind_executor(self->_strand, handle_read_header));
177 std::weak_ptr<Primary> weak = shared_from_this();
178 boost::asio::post(
_strand, [weak]() {
179 auto self = weak.lock();
186 if (
_deadline.expires_at() <= boost::asio::deadline_timer::traits_type::now()) {
190 std::weak_ptr<Primary> weak = shared_from_this();
191 _deadline.async_wait([weak](boost::system::error_code ec) {
192 auto self = weak.lock();
197 log_error(
"session ", self->_session_id,
" timed out error: ", ec.message());
208 boost::system::error_code ec2;
209 _socket.shutdown(boost::asio::socket_base::shutdown_both, ec2);
#define DEBUG_ASSERT(predicate)
#define DEBUG_ASSERT_EQ(lhs, rhs)
#define DEBUG_ASSERT_NE(lhs, rhs)
#define LIBCARLA_INITIALIZE_LIFETIME_PROFILER(display_name)
std::function< void(std::shared_ptr< Primary >)> callback_function_type
std::function< void(std::shared_ptr< Primary >, carla::Buffer)> callback_function_type_response
boost::asio::deadline_timer _deadline
void Open(Listener::callback_function_type on_opened, Listener::callback_function_type on_closed, Listener::callback_function_type_response on_response)
Starts the session and calls on_opened after successfully reading the stream id, and on_closed once t...
Listener::callback_function_type _on_closed
Primary(boost::asio::io_context &io_context, time_duration timeout, Listener &server)
boost::asio::io_context::strand _strand
Listener::callback_function_type_response _on_response
void Write(std::shared_ptr< const carla::streaming::detail::tcp::Message > message)
Writes some data to the socket.
void CloseNow(boost::system::error_code ec=boost::system::error_code())
void Close()
Post a job to close the session.
Positive time duration up to milliseconds resolution.
static std::atomic_size_t SESSION_COUNTER
This file contains definitions of common data structures used in traffic manager.
static void log_error(Args &&... args)
static void log_debug(Args &&... args)