CARLA
 
载入中...
搜索中...
未找到
ServerSession.cpp
浏览该文件的文档.
1// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
2// de Barcelona (UAB).
3//
4// This work is licensed under the terms of the MIT license.
5// For a copy, see <https://opensource.org/licenses/MIT>.
6
9
10#include "carla/Debug.h"
11#include "carla/Logging.h"
12
13#include <boost/asio/read.hpp>
14#include <boost/asio/write.hpp>
15#include <boost/asio/bind_executor.hpp>
16#include <boost/asio/post.hpp>
17
18#include <atomic>
19#include <thread>
20
21namespace carla {
22namespace streaming {
23namespace detail {
24namespace tcp {
25
26 static std::atomic_size_t SESSION_COUNTER{0u};
27
29 boost::asio::io_context &io_context,
30 const time_duration timeout,
31 Server &server)
33 std::string("tcp server session ") + std::to_string(SESSION_COUNTER)),
34 _server(server),
35 _session_id(SESSION_COUNTER++),
36 _socket(io_context),
37 _timeout(timeout),
38 _deadline(io_context),
39 _strand(io_context) {}
40
42 callback_function_type on_opened,
43 callback_function_type on_closed) {
44 DEBUG_ASSERT(on_opened && on_closed);
45 _on_closed = std::move(on_closed);
46
47 // This forces not using Nagle's algorithm.
48 // Improves the sync mode velocity on Linux by a factor of ~3.
49 const boost::asio::ip::tcp::no_delay option(true);
50 _socket.set_option(option);
51
52 StartTimer();
53 auto self = shared_from_this(); // To keep myself alive.
54 boost::asio::post(_strand, [=]() {
55
56 auto handle_query = [this, self, callback=std::move(on_opened)](
57 const boost::system::error_code &ec,
58 size_t DEBUG_ONLY(bytes_received)) {
59 if (!ec) {
60 DEBUG_ASSERT_EQ(bytes_received, sizeof(_stream_id));
61 log_debug("session", _session_id, "for stream", _stream_id, " started");
62 boost::asio::post(_strand.context(), [=]() { callback(self); });
63 } else {
64 log_error("session", _session_id, ": error retrieving stream id :", ec.message());
65 CloseNow(ec);
66 }
67 };
68
69 // Read the stream id.
70 _deadline.expires_from_now(_timeout);
71 boost::asio::async_read(
72 _socket,
73 boost::asio::buffer(&_stream_id, sizeof(_stream_id)),
74 boost::asio::bind_executor(_strand, handle_query));
75 });
76 }
77
78 void ServerSession::Write(std::shared_ptr<const Message> message) {
79 DEBUG_ASSERT(message != nullptr);
80 DEBUG_ASSERT(!message->empty());
81 auto self = shared_from_this();
82 boost::asio::post(_strand, [=]() {
83 if (!_socket.is_open()) {
84 return;
85 }
86 if (_is_writing) {
88 // wait until previous message has been sent
89 while (_is_writing) {
90 std::this_thread::yield();
91 }
92 } else {
93 // ignore this message
94 log_debug("session", _session_id, ": connection too slow: message discarded");
95 return;
96 }
97 }
98 _is_writing = true;
99
100 auto handle_sent = [this, self, message](const boost::system::error_code &ec, size_t DEBUG_ONLY(bytes)) {
101 _is_writing = false;
102 if (ec) {
103 log_info("session", _session_id, ": error sending data :", ec.message());
104 CloseNow(ec);
105 } else {
106 DEBUG_ONLY(log_debug("session", _session_id, ": successfully sent", bytes, "bytes"));
107 DEBUG_ASSERT_EQ(bytes, sizeof(message_size_type) + message->size());
108 }
109 };
110
111 log_debug("session", _session_id, ": sending message of", message->size(), "bytes");
112
113 _deadline.expires_from_now(_timeout);
114 boost::asio::async_write(
115 _socket,
116 message->GetBufferSequence(),
117 handle_sent);
118 });
119 }
120
121 void ServerSession::Close() {
122 boost::asio::post(_strand, [self=shared_from_this()]() { self->CloseNow(); });
123 }
124
125 void ServerSession::StartTimer() {
126 if (_deadline.expires_at() <= boost::asio::deadline_timer::traits_type::now()) {
127 log_debug("session", _session_id, "timed out");
128 Close();
129 } else {
130 _deadline.async_wait([this, self=shared_from_this()](boost::system::error_code ec) {
131 if (!ec) {
132 StartTimer();
133 } else {
134 log_debug("session", _session_id, "timed out error:", ec.message());
135 }
136 });
137 }
138 }
139
140 void ServerSession::CloseNow(boost::system::error_code ec) {
141 _deadline.cancel();
142 if (!ec)
143 {
144 if (_socket.is_open()) {
145 boost::system::error_code ec2;
146 _socket.shutdown(boost::asio::socket_base::shutdown_both, ec2);
147 _socket.close();
148 }
149 }
150 _on_closed(shared_from_this());
151 log_debug("session", _session_id, "closed");
152 }
153
154} // namespace tcp
155} // namespace detail
156} // namespace streaming
157} // namespace carla
#define DEBUG_ASSERT(predicate)
Definition Debug.h:66
#define DEBUG_ASSERT_EQ(lhs, rhs)
Definition Debug.h:76
#define DEBUG_ONLY(code)
Definition Debug.h:55
#define LIBCARLA_INITIALIZE_LIFETIME_PROFILER(display_name)
void CloseNow(boost::system::error_code ec=boost::system::error_code())
ServerSession(boost::asio::io_context &io_context, time_duration timeout, Server &server)
void Open(callback_function_type on_opened, callback_function_type on_closed)
Starts the session and calls on_opened after successfully reading the stream id, and on_closed once t...
boost::asio::io_context::strand _strand
std::function< void(std::shared_ptr< ServerSession >)> callback_function_type
void Write(std::shared_ptr< const Message > message)
Writes some data to the socket.
Positive time duration up to milliseconds resolution.
Definition Time.h:19
static std::atomic_size_t SESSION_COUNTER
uint32_t message_size_type
Definition Types.h:20
This file contains definitions of common data structures used in traffic manager.
Definition Carla.cpp:133
static void log_error(Args &&... args)
Definition Logging.h:110
static void log_info(Args &&... args)
Definition Logging.h:82
static void log_debug(Args &&... args)
Definition Logging.h:68