SNode.C
Loading...
Searching...
No Matches
SocketWriter.cpp
Go to the documentation of this file.
1/*
2 * SNode.C - a slim toolkit for network communication
3 * Copyright (C) Volker Christian <me@vchrist.at>
4 * 2020, 2021, 2022, 2023, 2024, 2025
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU Lesser General Public License as published
8 * by the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#include "core/socket/stream/SocketWriter.h"
21
22#include "core/pipe/Source.h"
23
24#ifndef DOXYGEN_SHOULD_SKIP_THIS
25
26#include "core/system/socket.h"
27#include "log/Logger.h"
28
29#include <cerrno>
30
31#endif // DOXYGEN_SHOULD_SKIP_THIS
32
33namespace core::socket::stream {
34
45
47 doWrite();
48 }
49
51 if (onSignal(sigNum)) {
52 shutdownWrite([this]() {
54 });
55 }
56 }
57
59 return core::system::send(this->getRegisteredFd(), chunk, chunkLen, MSG_NOSIGNAL);
60 }
61
63 if (!writePuffer.empty()) {
66
67 if (retWrite > 0) {
69
70 if (writePuffer.capacity() > writePuffer.size() * 2) {
72 }
73
74 if (!isSuspended()) {
75 suspend();
76 }
77 span();
78 } else if ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) && isSuspended()) {
79 resume();
80 } else {
81 onStatus(retWrite == 0 ? 0 : errno);
82 }
83 } else {
84 if (!isSuspended()) {
85 suspend();
86 }
87
88 if (markShutdown) {
89 LOG(TRACE) << getName() << ": Shutdown restart";
91 } else if (source != nullptr) {
92 source->resume();
93 }
94 }
95 }
96
100
103 if (isEnabled()) {
104 if (writePuffer.empty()) {
105 resume();
106 }
107
109
110 if (source != nullptr && writePuffer.size() > 5 * blockSize) {
111 source->suspend();
112 }
113 } else {
114 LOG(WARNING) << getName() << ": Send while not enabled";
115 }
116 } else {
117 LOG(WARNING) << getName() << ": Send while shutdown in progress";
118 }
119 }
120
122 bool success = false;
123
125 if (isEnabled()) {
126 success = source != nullptr;
127
128 if (success) {
129 LOG(TRACE) << getName() << ": Stream started";
130 } else {
131 LOG(WARNING) << getName() << ": Stream source is nullptr";
132 }
133 } else {
134 LOG(WARNING) << getName() << ": Stream while not enabled";
135 }
136 } else {
137 LOG(WARNING) << getName() << ": Stream while shutdown in progress";
138 }
139
140 this->source = source;
141
142 return success;
143 }
144
146 LOG(TRACE) << getName() << ": Stream EOF";
147 this->source = nullptr;
148 }
149
151 if (!shutdownInProgress) {
152 shutdownInProgress = true;
153
155 if (writePuffer.empty()) {
156 LOG(TRACE) << getName() << ": Shutdown start";
158 } else {
159 markShutdown = true;
160 LOG(TRACE) << getName() << ": Shutdown delayed due to queued data";
161 }
162 }
163 }
164
165} // namespace core::socket::stream