CARLA
 
载入中...
搜索中...
未找到
Dispatcher.cpp
浏览该文件的文档.
1// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
2// de Barcelona (UAB).
3//
4// This work is licensed under the terms of the MIT license.
5// For a copy, see <https://opensource.org/licenses/MIT>.
6
8
9#include "carla/Exception.h"
10#include "carla/Logging.h"
12
13#include <exception>
14
15namespace carla {
16namespace streaming {
17namespace detail {
18
20 // Disconnect all the sessions from their streams, this should kill any
21 // session remaining since at this point the io_context should be already
22 // stopped.
23 for (auto &pair : _stream_map) {
24#ifndef LIBCARLA_NO_EXCEPTIONS
25 try {
26#endif // LIBCARLA_NO_EXCEPTIONS
27 auto stream_state = pair.second;
28 stream_state->ClearSessions();
29#ifndef LIBCARLA_NO_EXCEPTIONS
30 } catch (const std::exception &e) {
31 log_error("failed to clear sessions:", e.what());
32 }
33#endif // LIBCARLA_NO_EXCEPTIONS
34 }
35 }
36
38 std::lock_guard<std::mutex> lock(_mutex);
39 ++_cached_token._token.stream_id; // id zero only happens in overflow.
41 std::shared_ptr<MultiStreamState> ptr;
42 auto search = _stream_map.find(_cached_token.get_stream_id());
43 if (search == _stream_map.end()) {
44 // creating new stream
45 ptr = std::make_shared<MultiStreamState>(_cached_token);
46 auto result = _stream_map.emplace(std::make_pair(_cached_token.get_stream_id(), ptr));
47 if (!result.second) {
48 throw_exception(std::runtime_error("failed to create stream!"));
49 }
50 log_debug("Stream created");
51 return carla::streaming::Stream(ptr);
52 } else {
53 // reusing existing stream
54 log_debug("Stream reused");
55 ptr = search->second;
56 return carla::streaming::Stream(ptr);
57 }
58 }
59
61 std::lock_guard<std::mutex> lock(_mutex);
62 log_debug("Calling CloseStream for ", id);
63 auto search = _stream_map.find(id);
64 if (search != _stream_map.end()) {
65 auto stream_state = search->second;
66 if (stream_state) {
67 log_debug("Disconnecting all sessions (stream ", id, ")");
68 stream_state->ClearSessions();
69 }
70 _stream_map.erase(search);
71 }
72 }
73
74 bool Dispatcher::RegisterSession(std::shared_ptr<Session> session) {
75 DEBUG_ASSERT(session != nullptr);
76 std::lock_guard<std::mutex> lock(_mutex);
77 auto search = _stream_map.find(session->get_stream_id());
78 if (search != _stream_map.end()) {
79 auto stream_state = search->second;
80 if (stream_state) {
81 log_debug("Connecting session (stream ", session->get_stream_id(), ")");
82 stream_state->ConnectSession(std::move(session));
83 log_debug("Current streams: ", _stream_map.size());
84 return true;
85 }
86 }
87 log_error("Invalid session: no stream available with id", session->get_stream_id());
88 return false;
89 }
90
91 void Dispatcher::DeregisterSession(std::shared_ptr<Session> session) {
92 DEBUG_ASSERT(session != nullptr);
93 std::lock_guard<std::mutex> lock(_mutex);
94 log_debug("Calling DeregisterSession for ", session->get_stream_id());
95 auto search = _stream_map.find(session->get_stream_id());
96 if (search != _stream_map.end()) {
97 auto stream_state = search->second;
98 if (stream_state) {
99 log_debug("Disconnecting session (stream ", session->get_stream_id(), ")");
100 stream_state->DisconnectSession(session);
101 log_debug("Current streams: ", _stream_map.size());
102 }
103 }
104 }
105
107 std::lock_guard<std::mutex> lock(_mutex);
108 log_debug("Searching sensor id: ", sensor_id);
109 auto search = _stream_map.find(sensor_id);
110 if (search != _stream_map.end()) {
111 log_debug("Found sensor id: ", sensor_id);
112 auto stream_state = search->second;
113 stream_state->ForceActive();
114 log_debug("Getting token from stream ", sensor_id, " on port ", stream_state->token().get_port());
115 return stream_state->token();
116 } else {
117 log_debug("Not Found sensor id, creating sensor stream: ", sensor_id);
118 token_type temp_token(_cached_token);
119 temp_token.set_stream_id(sensor_id);
120 auto ptr = std::make_shared<MultiStreamState>(temp_token);
121 auto result = _stream_map.emplace(std::make_pair(temp_token.get_stream_id(), ptr));
122 ptr->ForceActive();
123 if (!result.second) {
124 log_debug("Failed to create multistream for stream ", sensor_id, " on port ", temp_token.get_port());
125 }
126 log_debug("Created token from stream ", sensor_id, " on port ", temp_token.get_port());
127 return temp_token;
128 }
129 return token_type();
130 }
131
132} // namespace detail
133} // namespace streaming
134} // namespace carla
#define DEBUG_ASSERT(predicate)
Definition Debug.h:66
void DeregisterSession(std::shared_ptr< Session > session)
carla::streaming::Stream MakeStream()
bool RegisterSession(std::shared_ptr< Session > session)
void CloseStream(carla::streaming::detail::stream_id_type id)
token_type GetToken(stream_id_type sensor_id)
Serializes a stream endpoint.
void set_stream_id(stream_id_type id)
uint32_t stream_id_type
Definition Types.h:18
detail::Stream< detail::MultiStreamState > Stream
A stream represents an unidirectional channel for sending data from server to client.
Definition Stream.h:19
This file contains definitions of common data structures used in traffic manager.
Definition Carla.cpp:133
void throw_exception(const std::exception &e)
Definition Carla.cpp:135
static void log_error(Args &&... args)
Definition Logging.h:110
static void log_debug(Args &&... args)
Definition Logging.h:68