SNode.C
Loading...
Searching...
No Matches
EventMultiplexer.cpp
Go to the documentation of this file.
1/*
2 * SNode.C - a slim toolkit for network communication
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2020, 2021, 2022, 2023, 2024, 2025
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU Lesser General Public License as published
8 * by the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#include "core/EventMultiplexer.h"
21
22#include "core/DescriptorEventPublisher.h"
23#include "core/DescriptorEventReceiver.h"
24#include "core/DynamicLoader.h"
25#include "core/Event.h"
26#include "core/TimerEventPublisher.h"
27
28#ifndef DOXYGEN_SHOULD_SKIP_THIS
29
30#include "utils/Timeval.h"
31
32#include <algorithm>
33#include <cerrno>
34#include <numeric>
35#include <utility>
36
37#endif /* DOXYGEN_SHOULD_SKIP_THIS */
38
39namespace core {
40
47
49 for (DescriptorEventPublisher* descriptorEventPublisher : descriptorEventPublishers) {
50 delete descriptorEventPublisher;
51 }
53 }
54
56 return *descriptorEventPublishers[dispType];
57 }
58
62
63 void EventMultiplexer::span(Event* event) {
64 eventQueue.insert(event);
65 }
66
68 eventQueue.remove(event);
69 }
70
71 TickStatus EventMultiplexer::tick(const utils::Timeval& tickTimeOut, const sigset_t& sigMask) {
72 const utils::Timeval currentTime = utils::Timeval::currentTime();
73
74 int activeDescriptorCount = 0;
75
76 const TickStatus tickStatus = waitForEvents(tickTimeOut, currentTime, sigMask, activeDescriptorCount);
77
78 if (tickStatus == TickStatus::SUCCESS) {
79 spanActiveEvents(currentTime, activeDescriptorCount);
80 executeEventQueue(currentTime);
81 checkTimedOutEvents(currentTime);
83 }
84
85 return tickStatus;
86 }
87
88 void EventMultiplexer::signal(int sigNum) {
89 for (DescriptorEventPublisher* const descriptorEventPublisher : descriptorEventPublishers) {
90 descriptorEventPublisher->signal(sigNum);
91 }
93
95 }
96
98 for (DescriptorEventPublisher* const descriptorEventPublisher : descriptorEventPublishers) {
99 descriptorEventPublisher->disable();
100 }
102
104 }
105
107 eventQueue.clear();
108 }
109
111 const utils::Timeval& currentTime,
112 const sigset_t& sigMask,
113 int& activeDescriptorCount) {
114 TickStatus tickStatus = TickStatus::SUCCESS;
115
116 if (observedEventReceiverCount() > 0 || !timerEventPublisher->empty() || !eventQueue.empty()) {
117 utils::Timeval nextTimeout = std::min(getNextTimeout(currentTime), tickTimeOut);
118
119 activeDescriptorCount = monitorDescriptors(nextTimeout, sigMask);
120
121 if (activeDescriptorCount < 0) {
122 if (errno == EINTR) {
123 tickStatus = TickStatus::INTERRUPTED;
124 } else {
125 tickStatus = TickStatus::TRACE;
126 }
127 }
128 } else {
129 tickStatus = TickStatus::NOOBSERVER;
130 }
131
132 return tickStatus;
133 }
134
135 void EventMultiplexer::spanActiveEvents(const utils::Timeval& currentTime, int activeDescriptorCount) {
137 spanActiveEvents(activeDescriptorCount);
138 }
139
140 void EventMultiplexer::executeEventQueue(const utils::Timeval& currentTime) {
141 eventQueue.execute(currentTime);
142 }
143
144 void EventMultiplexer::checkTimedOutEvents(const utils::Timeval& currentTime) {
145 for (DescriptorEventPublisher* const descriptorEventPublisher : descriptorEventPublishers) {
146 descriptorEventPublisher->checkTimedOutEvents(currentTime);
147 }
148 }
149
150 void EventMultiplexer::releaseExpiredResources(const utils::Timeval& currentTime) {
151 for (DescriptorEventPublisher* const descriptorEventPublisher : descriptorEventPublishers) {
152 descriptorEventPublisher->releaseDisabledEvents(currentTime);
153 }
156 }
157
158 utils::Timeval EventMultiplexer::getNextTimeout(const utils::Timeval& currentTime) {
160
161 if (eventQueue.empty()) {
162 for (const DescriptorEventPublisher* const descriptorEventPublisher : descriptorEventPublishers) {
163 nextTimeout = std::min(descriptorEventPublisher->getNextTimeout(currentTime), nextTimeout);
164 }
165 nextTimeout = std::min(timerEventPublisher->getNextTimeout(currentTime), nextTimeout);
166 nextTimeout = std::max(nextTimeout, utils::Timeval()); // In case nextTimeout is negative
167 } else {
168 nextTimeout = 0;
169 }
170
171 return nextTimeout;
172 }
173
175 return std::accumulate(descriptorEventPublishers.begin(),
176 descriptorEventPublishers.end(),
177 0,
178 [](int count, const DescriptorEventPublisher* descriptorEventPublisher) -> int {
179 return count + descriptorEventPublisher->getObservedEventReceiverCount();
180 });
181 }
182
184 return std::accumulate(descriptorEventPublishers.begin(),
185 descriptorEventPublishers.end(),
186 -1,
187 [](int count, const DescriptorEventPublisher* descriptorEventPublisher) -> int {
188 return std::max(descriptorEventPublisher->maxFd(), count);
189 });
190 }
191
196
198 delete executeQueue;
199 delete publishQueue;
200 }
201
203 publishQueue->push_back(event); // do not allow two or more same events in one tick
204 }
205
207 publishQueue->remove(event); // in case of erase remove the event from the published queue
208 }
209
210 void EventMultiplexer::EventQueue::execute(const utils::Timeval& currentTime) {
211 std::swap(executeQueue, publishQueue);
212
213 for (Event* event : *executeQueue) {
214 event->dispatch(currentTime);
215 }
216
217 executeQueue->clear();
218 }
219
221 return publishQueue->empty();
222 }
223
225 std::swap(executeQueue, publishQueue);
226
227 for (const Event* event : *executeQueue) {
228 event->getEventReceiver()->destruct();
229 }
230
231 executeQueue->clear();
232 }
233
234} // namespace core
static void execDlCloseDeleyed()
void execute(const utils::Timeval &currentTime)
virtual int monitorDescriptors(utils::Timeval &tickTimeOut, const sigset_t &sigMask)=0
void releaseExpiredResources(const utils::Timeval &currentTime)
utils::Timeval getNextTimeout(const utils::Timeval &currentTime)
void checkTimedOutEvents(const utils::Timeval &currentTime)
TickStatus waitForEvents(const utils::Timeval &tickTimeOut, const utils::Timeval &currentTime, const sigset_t &sigMask, int &activeDescriptorCount)
TimerEventPublisher & getTimerEventPublisher()
TickStatus tick(const utils::Timeval &tickTimeOut, const sigset_t &sigMask)
void spanActiveEvents(const utils::Timeval &currentTime, int activeDescriptorCount)
void span(core::Event *event)
virtual void spanActiveEvents(int activeDescriptorCount)=0
DescriptorEventPublisher & getDescriptorEventPublisher(DISP_TYPE dispType)
void relax(core::Event *event)
EventMultiplexer(DescriptorEventPublisher *readDescriptorEventPublisher, DescriptorEventPublisher *writeDescriptorEventPublisher, DescriptorEventPublisher *exceptionDescriptorEventPublisher)
void executeEventQueue(const utils::Timeval &currentTime)
core::TimerEventPublisher *const timerEventPublisher
utils::Timeval getNextTimeout(const utils::Timeval &currentTime)
void spanActiveEvents(const utils::Timeval &currentTime)
static Timeval currentTime()
Definition Timeval.cpp:54
Timeval() noexcept
Definition Timeval.cpp:33
Timeval & operator=(const Timeval &timeVal)
Definition Timeval.cpp:61
TickStatus
Definition TickStatus.h:29