16#include <boost/asio/read.hpp>
17#include <boost/asio/write.hpp>
18#include <boost/asio/bind_executor.hpp>
19#include <boost/asio/post.hpp>
52 boost::asio::io_context &io_context,
61 _deadline(io_context),
71 boost::system::error_code ec;
72 _socket.shutdown(boost::asio::socket_base::shutdown_both, ec);
85 const boost::asio::ip::tcp::no_delay option(
true);
92 on_opened(shared_from_this());
98 void Primary::Write(std::shared_ptr<const carla::streaming::detail::tcp::Message> message) {
102 std::weak_ptr<Primary> weak = shared_from_this();
104 boost::asio::post(
_strand, [=]() {
106 auto self = weak.lock();
109 if (!self->_socket.is_open()) {
113 auto handle_sent = [weak, message](
const boost::system::error_code &ec,
size_t DEBUG_ONLY(bytes)) {
115 auto self = weak.lock();
120 log_error(
"session ", self->_session_id,
": error sending data: ", ec.message());
127 self->_deadline.expires_from_now(self->_timeout);
129 boost::asio::async_write(
131 message->GetBufferSequence(),
132 boost::asio::bind_executor(self->_strand, handle_sent));
145 std::weak_ptr<Primary> weak = shared_from_this();
147 boost::asio::post(
_strand, [=]() {
149 auto self = weak.lock();
152 if (!self->_socket.is_open()) {
157 self->_deadline.expires_from_now(self->_timeout);
158 int this_size = text.size();
159 boost::asio::async_write(
161 boost::asio::buffer(&this_size,
sizeof(this_size)),
163 boost::asio::bind_executor(self->_strand, [](
const boost::system::error_code &,
size_t){ }));
165 boost::asio::async_write(
167 boost::asio::buffer(text.c_str(), text.size()),
169 boost::asio::bind_executor(self->_strand, [](
const boost::system::error_code &,
size_t){ }));
182 std::weak_ptr<Primary> weak = shared_from_this();
184 boost::asio::post(
_strand, [weak]() {
186 auto self = weak.lock();
189 auto message = std::make_shared<IncomingMessage>(self->_buffer_pool->Pop());
191 auto handle_read_data = [weak, message](boost::system::error_code ec,
size_t DEBUG_ONLY(bytes)) {
193 auto self = weak.lock();
201 self->_on_response(self, message->pop());
202 std::cout <<
"Getting data on listener\n";
206 log_error(
"primary server: failed to read data: ", ec.message());
219 auto handle_read_header = [weak, message, handle_read_data](
220 boost::system::error_code ec,
222 auto self = weak.lock();
224 if (!ec && (message->size() > 0u)) {
226 boost::asio::async_read(
229 boost::asio::bind_executor(self->_strand, handle_read_data));
232 log_error(
"Primary server: failed to read header: ", ec.message());
240 boost::asio::async_read(
242 message->size_as_buffer(),
243 boost::asio::bind_executor(self->_strand, handle_read_header));
262 std::weak_ptr<Primary> weak = shared_from_this();
263 boost::asio::post(
_strand, [weak]() {
264 auto self = weak.lock();
277 if (
_deadline.expires_at() <= boost::asio::deadline_timer::traits_type::now()) {
281 std::weak_ptr<Primary> weak = shared_from_this();
282 _deadline.async_wait([weak](boost::system::error_code ec) {
283 auto self = weak.lock();
288 log_error(
"session ", self->_session_id,
" timed out error: ", ec.message());
307 boost::system::error_code ec2;
308 _socket.shutdown(boost::asio::socket_base::shutdown_both, ec2);
#define DEBUG_ASSERT(predicate)
#define DEBUG_ASSERT_EQ(lhs, rhs)
#define DEBUG_ASSERT_NE(lhs, rhs)
#define LIBCARLA_INITIALIZE_LIFETIME_PROFILER(display_name)
一个缓冲区池。 从这个池中弹出的缓冲区在销毁时会自动返回到池中, 这样分配的内存可以被重用。
警告:在它的io_context停止之前,这个服务器不能被销毁。
std::function< void(std::shared_ptr< Primary >)> callback_function_type
std::function< void(std::shared_ptr< Primary >, carla::Buffer)> callback_function_type_response
void StartTimer()
启动定时器以监控连接是否超时。
boost::asio::deadline_timer _deadline
void Open(Listener::callback_function_type on_opened, Listener::callback_function_type on_closed, Listener::callback_function_type_response on_response)
启动会话并在成功读取流 id 后调用 on_opened,会话关闭后调用 on_closed。
~Primary()
Primary类的析构函数实现。
Listener::callback_function_type _on_closed
Primary(boost::asio::io_context &io_context, time_duration timeout, Listener &server)
Primary类的构造函数。
boost::asio::io_context::strand _strand
Listener::callback_function_type_response _on_response
void Write(std::shared_ptr< const carla::streaming::detail::tcp::Message > message)
将一些数据写入套接字。
void CloseNow(boost::system::error_code ec=boost::system::error_code())
立即关闭连接并处理相关资源。
Positive time duration up to milliseconds resolution.
static std::atomic_size_t SESSION_COUNTER
用于生成唯一会话ID的静态原子计数器。
static void log_error(Args &&... args)
static void log_debug(Args &&... args)