CARLA
 
载入中...
搜索中...
未找到
streaming/detail/tcp/Client.cpp
浏览该文件的文档.
1// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
2// de Barcelona (UAB).
3//
4// This work is licensed under the terms of the MIT license.
5// For a copy, see <https://opensource.org/licenses/MIT>.
6
8
9#include "carla/BufferPool.h"
10#include "carla/Debug.h"
11#include "carla/Exception.h"
12#include "carla/Logging.h"
13#include "carla/Time.h"
14
15#include <boost/asio/connect.hpp>
16#include <boost/asio/read.hpp>
17#include <boost/asio/write.hpp>
18#include <boost/asio/post.hpp>
19#include <boost/asio/bind_executor.hpp>
20
21#include <exception>
22
23namespace carla {
24namespace streaming {
25namespace detail {
26namespace tcp {
27
28 // ===========================================================================
29 // -- IncomingMessage --------------------------------------------------------
30 // ===========================================================================
31
32 /// Helper for reading incoming TCP messages. Allocates the whole message in
33 /// a single buffer.
35 public:
36
38
39 boost::asio::mutable_buffer size_as_buffer() {
40 return boost::asio::buffer(&_size, sizeof(_size));
41 }
42
43 boost::asio::mutable_buffer buffer() {
44 DEBUG_ASSERT(_size > 0u);
46 return _message.buffer();
47 }
48
49 auto size() const {
50 return _size;
51 }
52
53 auto pop() {
54 return std::move(_message);
55 }
56
57 private:
58
60
62 };
63
64 // ===========================================================================
65 // -- Client -----------------------------------------------------------------
66 // ===========================================================================
67
69 boost::asio::io_context &io_context,
70 const token_type &token,
73 std::string("tcp client ") + std::to_string(token.get_stream_id())),
74 _token(token),
75 _callback(std::move(callback)),
76 _socket(io_context),
77 _strand(io_context),
78 _connection_timer(io_context),
79 _buffer_pool(std::make_shared<BufferPool>()) {
80 if (!_token.protocol_is_tcp()) {
81 throw_exception(std::invalid_argument("invalid token, only TCP tokens supported"));
82 }
83 }
84
85 Client::~Client() = default;
86
88 auto self = shared_from_this();
89 boost::asio::post(_strand, [this, self]() {
90 if (_done) {
91 return;
92 }
93
94 using boost::system::error_code;
95
96 if (_socket.is_open()) {
97 _socket.close();
98 }
99
102 const auto ep = _token.to_tcp_endpoint();
103
104 auto handle_connect = [this, self, ep](error_code ec) {
105 if (!ec) {
106 if (_done) {
107 return;
108 }
109 // This forces not using Nagle's algorithm.
110 // Improves the sync mode velocity on Linux by a factor of ~3.
111 _socket.set_option(boost::asio::ip::tcp::no_delay(true));
112 log_debug("streaming client: connected to", ep);
113 // Send the stream id to subscribe to the stream.
114 const auto &stream_id = _token.get_stream_id();
115 log_debug("streaming client: sending stream id", stream_id);
116 boost::asio::async_write(
117 _socket,
118 boost::asio::buffer(&stream_id, sizeof(stream_id)),
119 boost::asio::bind_executor(_strand, [=](error_code ec, size_t DEBUG_ONLY(bytes)) {
120 // Ensures to stop the execution once the connection has been stopped.
121 if (_done) {
122 return;
123 }
124 if (!ec) {
125 DEBUG_ASSERT_EQ(bytes, sizeof(stream_id));
126 // If succeeded start reading data.
127 ReadData();
128 } else {
129 // Else try again.
130 log_debug("streaming client: failed to send stream id:", ec.message());
131 Connect();
132 }
133 }));
134 } else {
135 log_info("streaming client: connection failed:", ec.message());
136 Reconnect();
137 }
138 };
139
140 log_debug("streaming client: connecting to", ep);
141 _socket.async_connect(ep, boost::asio::bind_executor(_strand, handle_connect));
142 });
143 }
144
146 _connection_timer.cancel();
147 auto self = shared_from_this();
148 boost::asio::post(_strand, [this, self]() {
149 _done = true;
150 if (_socket.is_open()) {
151 _socket.close();
152 }
153 });
154 }
155
157 auto self = shared_from_this();
158 _connection_timer.expires_from_now(time_duration::seconds(1u));
159 _connection_timer.async_wait([this, self](boost::system::error_code ec) {
160 if (!ec) {
161 Connect();
162 }
163 });
164 }
165
167 auto self = shared_from_this();
168 boost::asio::post(_strand, [this, self]() {
169 if (_done) {
170 return;
171 }
172
173 // log_debug("streaming client: Client::ReadData");
174
175 auto message = std::make_shared<IncomingMessage>(_buffer_pool->Pop());
176
177 auto handle_read_data = [this, self, message](boost::system::error_code ec, size_t DEBUG_ONLY(bytes)) {
178 DEBUG_ONLY(log_debug("streaming client: Client::ReadData.handle_read_data", bytes, "bytes"));
179 if (!ec) {
180 DEBUG_ASSERT_EQ(bytes, message->size());
181 DEBUG_ASSERT_NE(bytes, 0u);
182 // Move the buffer to the callback function and start reading the next
183 // piece of data.
184 // log_debug("streaming client: success reading data, calling the callback");
185 boost::asio::post(_strand, [self, message]() { self->_callback(message->pop()); });
186 ReadData();
187 } else {
188 // As usual, if anything fails start over from the very top.
189 log_debug("streaming client: failed to read data:", ec.message());
190 Connect();
191 }
192 };
193
194 auto handle_read_header = [this, self, message, handle_read_data](
195 boost::system::error_code ec,
196 size_t DEBUG_ONLY(bytes)) {
197 DEBUG_ONLY(log_debug("streaming client: Client::ReadData.handle_read_header", bytes, "bytes"));
198 if (!ec && (message->size() > 0u)) {
199 DEBUG_ASSERT_EQ(bytes, sizeof(message_size_type));
200 if (_done) {
201 return;
202 }
203 // Now that we know the size of the coming buffer, we can allocate our
204 // buffer and start putting data into it.
205 boost::asio::async_read(
206 _socket,
207 message->buffer(),
208 boost::asio::bind_executor(_strand, handle_read_data));
209 } else if (!_done) {
210 log_debug("streaming client: failed to read header:", ec.message());
211 DEBUG_ONLY(log_debug("size = ", message->size()));
212 DEBUG_ONLY(log_debug("bytes = ", bytes));
213 Connect();
214 }
215 };
216
217 // Read the size of the buffer that is coming.
218 boost::asio::async_read(
219 _socket,
220 message->size_as_buffer(),
221 boost::asio::bind_executor(_strand, handle_read_header));
222 });
223 }
224
225} // namespace tcp
226} // namespace detail
227} // namespace streaming
228} // namespace carla
#define DEBUG_ASSERT(predicate)
Definition Debug.h:66
#define DEBUG_ASSERT_EQ(lhs, rhs)
Definition Debug.h:76
#define DEBUG_ASSERT_NE(lhs, rhs)
Definition Debug.h:77
#define DEBUG_ONLY(code)
Definition Debug.h:55
#define LIBCARLA_INITIALIZE_LIFETIME_PROFILER(display_name)
A pool of Buffer.
Definition BufferPool.h:30
A piece of raw data.
void reset(size_type size)
Reset the size of this buffer.
boost::asio::const_buffer buffer() const noexcept
Make a boost::asio::buffer from this buffer.
std::function< void(Buffer)> callback_function_type
Serializes a stream endpoint.
boost::asio::ip::tcp::endpoint to_tcp_endpoint() const
static time_duration seconds(size_t timeout)
Definition Time.h:22
uint32_t message_size_type
Definition Types.h:20
This file contains definitions of common data structures used in traffic manager.
Definition Carla.cpp:133
void throw_exception(const std::exception &e)
Definition Carla.cpp:135
static void log_info(Args &&... args)
Definition Logging.h:82
static void log_debug(Args &&... args)
Definition Logging.h:68