CARLA
 
载入中...
搜索中...
未找到
MultiStreamState.h
浏览该文件的文档.
1// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
2// de Barcelona (UAB).
3//
4// This work is licensed under the terms of the MIT license.
5// For a copy, see <https://opensource.org/licenses/MIT>.
6
7#pragma once
8
10#include "carla/Logging.h"
13
14#include <mutex>
15#include <vector>
16#include <atomic>
17
18namespace carla {
19namespace streaming {
20namespace detail {
21
22 /// A stream state that can hold any number of sessions.
23 ///
24 /// @todo Lacking some optimization.
25 class MultiStreamState final : public StreamStateBase {
26 public:
27
29
34
35 template <typename... Buffers>
36 void Write(Buffers... buffers) {
37 // try write single stream
38 auto session = _session.load();
39 if (session != nullptr) {
40 auto message = Session::MakeMessage(buffers...);
41 session->Write(std::move(message));
42 log_debug("sensor ", session->get_stream_id()," data sent");
43 // Return here, _session is only valid if we have a
44 // single session.
45 return;
46 }
47
48 // try write multiple stream
49 std::lock_guard<std::mutex> lock(_mutex);
50 if (_sessions.size() > 0) {
51 auto message = Session::MakeMessage(buffers...);
52 for (auto &s : _sessions) {
53 if (s != nullptr) {
54 s->Write(message);
55 log_debug("sensor ", s->get_stream_id()," data sent ");
56 }
57 }
58 }
59 }
60
61 void ForceActive() {
62 _force_active = true;
63 }
64
65 void EnableForROS() {
66 _enabled_for_ros = true;
67 }
68
70 _enabled_for_ros = false;
71 }
72
74 return _enabled_for_ros;
75 }
76
78 return (_sessions.size() > 0 || _force_active || _enabled_for_ros);
79 }
80
81 void ConnectSession(std::shared_ptr<Session> session) final {
82 DEBUG_ASSERT(session != nullptr);
83 std::lock_guard<std::mutex> lock(_mutex);
84 _sessions.emplace_back(std::move(session));
85 log_debug("Connecting multistream sessions:", _sessions.size());
86 if (_sessions.size() == 1) {
87 _session.store(_sessions[0]);
88 }
89 else if (_sessions.size() > 1) {
90 _session.store(nullptr);
91 }
92 }
93
94 void DisconnectSession(std::shared_ptr<Session> session) final {
95 DEBUG_ASSERT(session != nullptr);
96 std::lock_guard<std::mutex> lock(_mutex);
97 log_debug("Calling DisconnectSession for ", session->get_stream_id());
98 if (_sessions.size() == 0) return;
99 if (_sessions.size() == 1) {
100 DEBUG_ASSERT(session == _session.load());
101 _session.store(nullptr);
102 _sessions.clear();
103 _force_active = false;
104 log_debug("Last session disconnected");
105 } else {
106 _sessions.erase(
107 std::remove(_sessions.begin(), _sessions.end(), session),
108 _sessions.end());
109
110 // set single session if only one
111 if (_sessions.size() == 1)
112 _session.store(_sessions[0]);
113 else
114 _session.store(nullptr);
115 }
116 log_debug("Disconnecting multistream sessions:", _sessions.size());
117 }
118
119 void ClearSessions() final {
120 std::lock_guard<std::mutex> lock(_mutex);
121 for (auto &s : _sessions) {
122 if (s != nullptr) {
123 s->Close();
124 }
125 }
126 _sessions.clear();
127 _force_active = false;
128 _session.store(nullptr);
129 log_debug("Disconnecting all multistream sessions");
130 }
131
132 private:
133
134 std::mutex _mutex;
135
136 // if there is only one session, then we use atomic
138 // if there are more than one session, we use vector of sessions with mutex
139 std::vector<std::shared_ptr<Session>> _sessions;
140 bool _force_active {false};
141 bool _enabled_for_ros {false};
142 };
143
144} // namespace detail
145} // namespace streaming
146} // namespace carla
#define DEBUG_ASSERT(predicate)
Definition Debug.h:66
A very simple atomic shared ptr with release-acquire memory order.
A stream state that can hold any number of sessions.
void DisconnectSession(std::shared_ptr< Session > session) final
std::vector< std::shared_ptr< Session > > _sessions
void ConnectSession(std::shared_ptr< Session > session) final
Shared state among all the copies of a stream.
static auto MakeMessage(Buffers... buffers)
Serializes a stream endpoint.
This file contains definitions of common data structures used in traffic manager.
Definition Carla.cpp:133
static void log_debug(Args &&... args)
Definition Logging.h:68