CARLA
 
载入中...
搜索中...
未找到
secondary.cpp
浏览该文件的文档.
1// Copyright (c) 2022 Computer Vision Center (CVC) at the Universitat Autonoma
2// de Barcelona (UAB).
3//
4// This work is licensed under the terms of the MIT license.
5// For a copy, see <https://opensource.org/licenses/MIT>.
6
9
10#include "carla/BufferPool.h"
11#include "carla/Debug.h"
12#include "carla/Exception.h"
13#include "carla/Logging.h"
14#include "carla/Time.h"
15
16#include <boost/asio/connect.hpp>
17#include <boost/asio/read.hpp>
18#include <boost/asio/write.hpp>
19#include <boost/asio/post.hpp>
20#include <boost/asio/bind_executor.hpp>
21
22#include <exception>
23
24namespace carla {
25namespace multigpu {
26
28 boost::asio::ip::tcp::endpoint ep,
30 _pool(),
31 _socket(_pool.io_context()),
32 _endpoint(ep),
33 _strand(_pool.io_context()),
34 _connection_timer(_pool.io_context()),
35 _buffer_pool(std::make_shared<BufferPool>()) {
36
37 _commander.set_callback(callback);
38 }
39
40
42 std::string ip,
43 uint16_t port,
45 _pool(),
46 _socket(_pool.io_context()),
47 _strand(_pool.io_context()),
48 _connection_timer(_pool.io_context()),
49 _buffer_pool(std::make_shared<BufferPool>()) {
50
51 boost::asio::ip::address ip_address = boost::asio::ip::address::from_string(ip);
52 _endpoint = boost::asio::ip::tcp::endpoint(ip_address, port);
53 _commander.set_callback(callback);
54 }
55
59
61 AsyncRun(2u);
62
63 _commander.set_secondary(shared_from_this());
64
65 std::weak_ptr<Secondary> weak = shared_from_this();
66 boost::asio::post(_strand, [weak]() {
67 auto self = weak.lock();
68 if (!self) return;
69
70 if (self->_done) {
71 return;
72 }
73
74 if (self->_socket.is_open()) {
75 self->_socket.close();
76 }
77
78 auto handle_connect = [weak](boost::system::error_code ec) {
79 auto self = weak.lock();
80 if (!self) return;
81 if (ec) {
82 log_error("secondary server: connection failed:", ec.message());
83 if (!self->_done)
84 self->Reconnect();
85 return;
86 }
87
88 if (self->_done) {
89 return;
90 }
91
92 // This forces not using Nagle's algorithm.
93 // Improves the sync mode velocity on Linux by a factor of ~3.
94 self->_socket.set_option(boost::asio::ip::tcp::no_delay(true));
95
96 log_info("secondary server: connected to ", self->_endpoint);
97
98 self->ReadData();
99 };
100
101 self->_socket.async_connect(self->_endpoint, boost::asio::bind_executor(self->_strand, handle_connect));
102 });
103 }
104
105 void Secondary::Stop() {
106 _connection_timer.cancel();
107 std::weak_ptr<Secondary> weak = shared_from_this();
108 boost::asio::post(_strand, [weak]() {
109 auto self = weak.lock();
110 if (!self) return;
111 self->_done = true;
112 if (self->_socket.is_open()) {
113 self->_socket.close();
114 }
115 });
116 }
117
118 void Secondary::Reconnect() {
119 std::weak_ptr<Secondary> weak = shared_from_this();
120 _connection_timer.expires_from_now(time_duration::seconds(1u));
121 _connection_timer.async_wait([weak](boost::system::error_code ec) {
122 auto self = weak.lock();
123 if (!self) return;
124 if (!ec) {
125 self->Connect();
126 }
127 });
128 }
129
130 void Secondary::AsyncRun(size_t worker_threads) {
131 _pool.AsyncRun(worker_threads);
132 }
133
134 void Secondary::Write(std::shared_ptr<const carla::streaming::detail::tcp::Message> message) {
135 DEBUG_ASSERT(message != nullptr);
136 DEBUG_ASSERT(!message->empty());
137 std::weak_ptr<Secondary> weak = shared_from_this();
138 boost::asio::post(_strand, [=]() {
139 auto self = weak.lock();
140 if (!self) return;
141 if (!self->_socket.is_open()) {
142 return;
143 }
144
145 auto handle_sent = [weak, message](const boost::system::error_code &ec, size_t DEBUG_ONLY(bytes)) {
146 auto self = weak.lock();
147 if (!self) return;
148 if (ec) {
149 log_error("error sending data: ", ec.message());
150 }
151 };
152
153 // _deadline.expires_from_now(_timeout);
154 boost::asio::async_write(
155 self->_socket,
156 message->GetBufferSequence(),
157 boost::asio::bind_executor(self->_strand, handle_sent));
158 });
159 }
160
161 void Secondary::Write(Buffer buffer) {
162 auto view_data = carla::BufferView::CreateFrom(std::move(buffer));
163 auto message = Secondary::MakeMessage(view_data);
164
165 DEBUG_ASSERT(message != nullptr);
166 DEBUG_ASSERT(!message->empty());
167 std::weak_ptr<Secondary> weak = shared_from_this();
168 boost::asio::post(_strand, [=]() {
169 auto self = weak.lock();
170 if (!self) return;
171 if (!self->_socket.is_open()) {
172 return;
173 }
174
175 auto handle_sent = [weak, message](const boost::system::error_code &ec, size_t DEBUG_ONLY(bytes)) {
176 auto self = weak.lock();
177 if (!self) return;
178 if (ec) {
179 log_error("error sending data: ", ec.message());
180 }
181 };
182
183 // _deadline.expires_from_now(_timeout);
184 boost::asio::async_write(
185 self->_socket,
186 message->GetBufferSequence(),
187 boost::asio::bind_executor(self->_strand, handle_sent));
188 });
189 }
190
191 void Secondary::Write(std::string text) {
192 std::weak_ptr<Secondary> weak = shared_from_this();
193 boost::asio::post(_strand, [=]() {
194 auto self = weak.lock();
195 if (!self) return;
196 if (!self->_socket.is_open()) {
197 return;
198 }
199
200 auto handle_sent = [weak](const boost::system::error_code &ec, size_t DEBUG_ONLY(bytes)) {
201 auto self = weak.lock();
202 if (!self) return;
203 if (ec) {
204 log_error("error sending data: ", ec.message());
205 }
206 };
207
208 // _deadline.expires_from_now(_timeout);
209 // sent first size buffer
210 int this_size = text.size();
211 boost::asio::async_write(
212 self->_socket,
213 boost::asio::buffer(&this_size, sizeof(this_size)),
214 boost::asio::bind_executor(self->_strand, handle_sent));
215
216 // send characters
217 boost::asio::async_write(
218 self->_socket,
219 boost::asio::buffer(text.c_str(), text.size()),
220 boost::asio::bind_executor(self->_strand, handle_sent));
221 });
222 }
223
224 void Secondary::ReadData() {
225 std::weak_ptr<Secondary> weak = shared_from_this();
226 boost::asio::post(_strand, [weak]() {
227 auto self = weak.lock();
228 if (!self) return;
229 if (self->_done) {
230 return;
231 }
232
233 auto message = std::make_shared<IncomingMessage>(self->_buffer_pool->Pop());
234
235 auto handle_read_data = [weak, message](boost::system::error_code ec, size_t DEBUG_ONLY(bytes)) {
236 auto self = weak.lock();
237 if (!self) return;
238 if (!ec) {
239 DEBUG_ASSERT_EQ(bytes, message->size());
240 DEBUG_ASSERT_NE(bytes, 0u);
241 // Move the buffer to the callback function and start reading the next
242 // piece of data.
243 self->GetCommander().process_command(message->pop());
244 self->ReadData();
245 } else {
246 // As usual, if anything fails start over from the very top.
247 log_error("secondary server: failed to read data: ", ec.message());
248 // Connect();
249 }
250 };
251
252 auto handle_read_header = [weak, message, handle_read_data](
253 boost::system::error_code ec,
254 size_t DEBUG_ONLY(bytes)) {
255 auto self = weak.lock();
256 if (!self) return;
257 if (!ec && (message->size() > 0u)) {
259 if (self->_done) {
260 return;
261 }
262 // Now that we know the size of the coming buffer, we can allocate our
263 // buffer and start putting data into it.
264 boost::asio::async_read(
265 self->_socket,
266 message->buffer(),
267 boost::asio::bind_executor(self->_strand, handle_read_data));
268 } else if (!self->_done) {
269 log_error("secondary server: failed to read header: ", ec.message());
270 // DEBUG_ONLY(printf("size = ", message->size()));
271 // DEBUG_ONLY(printf("bytes = ", bytes));
272 // Connect();
273 }
274 };
275
276 // Read the size of the buffer that is coming.
277 boost::asio::async_read(
278 self->_socket,
279 message->size_as_buffer(),
280 boost::asio::bind_executor(self->_strand, handle_read_header));
281 });
282 }
283
284} // namespace multigpu
285} // namespace carla
#define DEBUG_ASSERT(predicate)
Definition Debug.h:66
#define DEBUG_ASSERT_EQ(lhs, rhs)
Definition Debug.h:76
#define DEBUG_ASSERT_NE(lhs, rhs)
Definition Debug.h:77
#define DEBUG_ONLY(code)
Definition Debug.h:55
A pool of Buffer.
Definition BufferPool.h:30
static std::shared_ptr< BufferView > CreateFrom(Buffer &&buffer)
Definition BufferView.h:56
A piece of raw data.
void Stop()
Stop the ThreadPool and join all its threads.
Definition ThreadPool.h:76
std::function< void(MultiGPUCommand, carla::Buffer)> callback_type
void set_secondary(std::shared_ptr< Secondary > secondary)
void set_callback(callback_type callback)
boost::asio::ip::tcp::endpoint _endpoint
Definition secondary.h:77
Secondary(boost::asio::ip::tcp::endpoint ep, SecondaryCommands::callback_type callback)
Definition secondary.cpp:27
boost::asio::io_context::strand _strand
Definition secondary.h:78
SecondaryCommands _commander
Definition secondary.h:82
void AsyncRun(size_t worker_threads)
static time_duration seconds(size_t timeout)
Definition Time.h:22
uint32_t message_size_type
Definition Types.h:20
This file contains definitions of common data structures used in traffic manager.
Definition Carla.cpp:133
static void log_error(Args &&... args)
Definition Logging.h:110
static void log_info(Args &&... args)
Definition Logging.h:82