CARLA
 
载入中...
搜索中...
未找到
primary.cpp
浏览该文件的文档.
1// Copyright (c) 2022 Computer Vision Center (CVC) at the Universitat Autonoma
2// de Barcelona (UAB).
3//
4// This work is licensed under the terms of the MIT license.
5// For a copy, see <https://opensource.org/licenses/MIT>.
6
8
9#include "carla/Debug.h"
10#include "carla/Logging.h"
13
14#include <boost/asio/read.hpp>
15#include <boost/asio/write.hpp>
16#include <boost/asio/bind_executor.hpp>
17#include <boost/asio/post.hpp>
18
19#include <atomic>
20#include <thread>
21
22namespace carla {
23namespace multigpu {
24
25 static std::atomic_size_t SESSION_COUNTER{0u};
26
28 boost::asio::io_context &io_context,
29 const time_duration timeout,
30 Listener &server)
32 std::string("tcp multigpu server session ") + std::to_string(SESSION_COUNTER)),
33 _server(server),
34 _session_id(SESSION_COUNTER++),
35 _socket(io_context),
36 _timeout(timeout),
37 _deadline(io_context),
38 _strand(io_context),
39 _buffer_pool(std::make_shared<BufferPool>()) {}
40
42 if (_socket.is_open()) {
43 boost::system::error_code ec;
44 _socket.shutdown(boost::asio::socket_base::shutdown_both, ec);
45 _socket.close();
46 }
47 }
48
53 DEBUG_ASSERT(on_opened && on_closed);
54
55 // This forces not using Nagle's algorithm.
56 // Improves the sync mode velocity on Linux by a factor of ~3.
57 const boost::asio::ip::tcp::no_delay option(true);
58 _socket.set_option(option);
59
60 // callbacks
61 _on_closed = std::move(on_closed);
62 _on_response = std::move(on_response);
63 on_opened(shared_from_this());
64
65 ReadData();
66 }
67
68 void Primary::Write(std::shared_ptr<const carla::streaming::detail::tcp::Message> message) {
69 DEBUG_ASSERT(message != nullptr);
70 DEBUG_ASSERT(!message->empty());
71 std::weak_ptr<Primary> weak = shared_from_this();
72 boost::asio::post(_strand, [=]() {
73 auto self = weak.lock();
74 if (!self) return;
75 if (!self->_socket.is_open()) {
76 return;
77 }
78
79 auto handle_sent = [weak, message](const boost::system::error_code &ec, size_t DEBUG_ONLY(bytes)) {
80 auto self = weak.lock();
81 if (!self) return;
82 if (ec) {
83 log_error("session ", self->_session_id, ": error sending data: ", ec.message());
84 self->CloseNow(ec);
85 } else {
86 // DEBUG_ASSERT_EQ(bytes, sizeof(message_size_type) + message->size());
87 }
88 };
89
90 self->_deadline.expires_from_now(self->_timeout);
91 boost::asio::async_write(
92 self->_socket,
93 message->GetBufferSequence(),
94 boost::asio::bind_executor(self->_strand, handle_sent));
95 });
96 }
97
98 void Primary::Write(std::string text) {
99 std::weak_ptr<Primary> weak = shared_from_this();
100 boost::asio::post(_strand, [=]() {
101 auto self = weak.lock();
102 if (!self) return;
103 if (!self->_socket.is_open()) {
104 return;
105 }
106
107 // sent first size buffer
108 self->_deadline.expires_from_now(self->_timeout);
109 int this_size = text.size();
110 boost::asio::async_write(
111 self->_socket,
112 boost::asio::buffer(&this_size, sizeof(this_size)),
113 boost::asio::bind_executor(self->_strand, [](const boost::system::error_code &, size_t){ }));
114 // send characters
115 boost::asio::async_write(
116 self->_socket,
117 boost::asio::buffer(text.c_str(), text.size()),
118 boost::asio::bind_executor(self->_strand, [](const boost::system::error_code &, size_t){ }));
119 });
120 }
121
123 std::weak_ptr<Primary> weak = shared_from_this();
124 boost::asio::post(_strand, [weak]() {
125 auto self = weak.lock();
126 if (!self) return;
127
128 auto message = std::make_shared<IncomingMessage>(self->_buffer_pool->Pop());
129
130 auto handle_read_data = [weak, message](boost::system::error_code ec, size_t DEBUG_ONLY(bytes)) {
131 auto self = weak.lock();
132 if (!self) return;
133 if (!ec) {
134 DEBUG_ASSERT_EQ(bytes, message->size());
135 DEBUG_ASSERT_NE(bytes, 0u);
136 // Move the buffer to the callback function and start reading the next
137 // piece of data.
138 self->_on_response(self, message->pop());
139 std::cout << "Getting data on listener\n";
140 self->ReadData();
141 } else {
142 // As usual, if anything fails start over from the very top.
143 log_error("primary server: failed to read data: ", ec.message());
144 }
145 };
146
147 auto handle_read_header = [weak, message, handle_read_data](
148 boost::system::error_code ec,
149 size_t DEBUG_ONLY(bytes)) {
150 auto self = weak.lock();
151 if (!self) return;
152 if (!ec && (message->size() > 0u)) {
153 // Now that we know the size of the coming buffer, we can allocate our
154 // buffer and start putting data into it.
155 boost::asio::async_read(
156 self->_socket,
157 message->buffer(),
158 boost::asio::bind_executor(self->_strand, handle_read_data));
159 } else {
160 if (ec) {
161 log_error("Primary server: failed to read header: ", ec.message());
162 }
163 // Connect();
164 self->Close();
165 }
166 };
167
168 // Read the size of the buffer that is coming.
169 boost::asio::async_read(
170 self->_socket,
171 message->size_as_buffer(),
172 boost::asio::bind_executor(self->_strand, handle_read_header));
173 });
174 }
175
177 std::weak_ptr<Primary> weak = shared_from_this();
178 boost::asio::post(_strand, [weak]() {
179 auto self = weak.lock();
180 if (!self) return;
181 self->CloseNow();
182 });
183 }
184
186 if (_deadline.expires_at() <= boost::asio::deadline_timer::traits_type::now()) {
187 log_debug("session ", _session_id, " time out");
188 Close();
189 } else {
190 std::weak_ptr<Primary> weak = shared_from_this();
191 _deadline.async_wait([weak](boost::system::error_code ec) {
192 auto self = weak.lock();
193 if (!self) return;
194 if (!ec) {
195 self->StartTimer();
196 } else {
197 log_error("session ", self->_session_id, " timed out error: ", ec.message());
198 }
199 });
200 }
201 }
202
203 void Primary::CloseNow(boost::system::error_code ec) {
204 _deadline.cancel();
205 if (!ec)
206 {
207 if (_socket.is_open()) {
208 boost::system::error_code ec2;
209 _socket.shutdown(boost::asio::socket_base::shutdown_both, ec2);
210 _socket.close();
211 }
212 }
213 _on_closed(shared_from_this());
214 log_debug("session", _session_id, "closed");
215 }
216
217} // namespace multigpu
218} // namespace carla
#define DEBUG_ASSERT(predicate)
Definition Debug.h:66
#define DEBUG_ASSERT_EQ(lhs, rhs)
Definition Debug.h:76
#define DEBUG_ASSERT_NE(lhs, rhs)
Definition Debug.h:77
#define DEBUG_ONLY(code)
Definition Debug.h:55
#define LIBCARLA_INITIALIZE_LIFETIME_PROFILER(display_name)
A pool of Buffer.
Definition BufferPool.h:30
std::function< void(std::shared_ptr< Primary >)> callback_function_type
Definition listener.h:32
std::function< void(std::shared_ptr< Primary >, carla::Buffer)> callback_function_type_response
Definition listener.h:33
const size_t _session_id
Definition primary.h:89
boost::asio::deadline_timer _deadline
Definition primary.h:95
socket_type _socket
Definition primary.h:91
void Open(Listener::callback_function_type on_opened, Listener::callback_function_type on_closed, Listener::callback_function_type_response on_response)
Starts the session and calls on_opened after successfully reading the stream id, and on_closed once t...
Definition primary.cpp:49
Listener::callback_function_type _on_closed
Definition primary.h:99
Primary(boost::asio::io_context &io_context, time_duration timeout, Listener &server)
Definition primary.cpp:27
boost::asio::io_context::strand _strand
Definition primary.h:97
Listener::callback_function_type_response _on_response
Definition primary.h:101
void Write(std::shared_ptr< const carla::streaming::detail::tcp::Message > message)
Writes some data to the socket.
Definition primary.cpp:68
void CloseNow(boost::system::error_code ec=boost::system::error_code())
Definition primary.cpp:203
void Close()
Post a job to close the session.
Definition primary.cpp:176
void ReadData()
read data
Definition primary.cpp:122
Positive time duration up to milliseconds resolution.
Definition Time.h:19
static std::atomic_size_t SESSION_COUNTER
Definition primary.cpp:25
This file contains definitions of common data structures used in traffic manager.
Definition Carla.cpp:133
static void log_error(Args &&... args)
Definition Logging.h:110
static void log_debug(Args &&... args)
Definition Logging.h:68