SNode.C
Loading...
Searching...
No Matches
core::socket::stream::SocketWriter Class Referenceabstract

#include <SocketWriter.h>

Inheritance diagram for core::socket::stream::SocketWriter:
Collaboration diagram for core::socket::stream::SocketWriter:

Public Member Functions

 SocketWriter ()=delete
Public Member Functions inherited from core::DescriptorEventReceiver
 DescriptorEventReceiver (const std::string &name, DescriptorEventPublisher &descriptorEventPublisher, const utils::Timeval &timeout=TIMEOUT::DISABLE)
int getRegisteredFd () const
bool isEnabled () const
bool isSuspended () const
void setTimeout (const utils::Timeval &timeout)
utils::Timeval getTimeout (const utils::Timeval &currentTime) const
void checkTimeout (const utils::Timeval &currentTime)
Public Member Functions inherited from core::EventReceiver
 EventReceiver (const std::string &name)
 EventReceiver (EventReceiver &)=delete
 EventReceiver (EventReceiver &&)=delete
EventReceiveroperator= (EventReceiver &)=delete
EventReceiveroperator= (EventReceiver &&)=delete
virtual void destruct ()
void span ()
void relax ()
const std::string & getName () const

Protected Member Functions

 SocketWriter (const std::string &instanceName, const std::function< void(int)> &onStatus, const utils::Timeval &timeout, std::size_t blockSize, const utils::Timeval &terminateTimeout)
std::size_t getTotalSent () const
std::size_t getTotalQueued () const
virtual ssize_t write (const char *chunk, std::size_t chunkLen)
void setBlockSize (std::size_t writeBlockSize)
void sendToPeer (const char *chunk, std::size_t chunkLen)
bool streamToPeer (core::pipe::Source *source)
void streamEof ()
void shutdownWrite (const std::function< void()> &onShutdown)
Protected Member Functions inherited from core::eventreceiver::WriteEventReceiver
 WriteEventReceiver (const std::string &name, const utils::Timeval &timeout)
virtual void writeTimeout ()
Protected Member Functions inherited from core::DescriptorEventReceiver
bool enable (int fd)
void disable ()
void suspend ()
void resume ()
Protected Member Functions inherited from core::Observer
void observed ()
void unObserved ()
virtual void unobservedEvent ()=0
 Observer ()=default
 Observer (Observer &)=delete
 Observer (Observer &&)=delete
virtual ~Observer ()
Protected Member Functions inherited from core::EventReceiver
virtual ~EventReceiver ()=default
 EventReceiver (const std::string &name)
 EventReceiver (EventReceiver &)=delete
 EventReceiver (EventReceiver &&)=delete
EventReceiveroperator= (EventReceiver &)=delete
EventReceiveroperator= (EventReceiver &&)=delete
virtual void destruct ()
void span ()
void relax ()
const std::string & getName () const

Protected Attributes

bool markShutdown = false
std::function< void()> onShutdown
std::vector< char > writePuffer
bool shutdownInProgress = false
utils::Timeval terminateTimeout

Private Member Functions

void writeEvent () final
void signalEvent (int sigNum) final
void doWrite ()
virtual bool onSignal (int sigNum)=0
virtual void doWriteShutdown (const std::function< void()> &onShutdown)=0

Private Attributes

std::function< void(int)> onStatus
core::pipe::Sourcesource = nullptr
std::size_t blockSize = 0
std::size_t totalQueued = 0
std::size_t totalSent = 0

Additional Inherited Members

Static Public Member Functions inherited from core::EventReceiver
static void atNextTick (const std::function< void(void)> &callBack)
Static Protected Member Functions inherited from core::EventReceiver
static void atNextTick (const std::function< void(void)> &callBack)

Detailed Description

Definition at line 65 of file SocketWriter.h.

Constructor & Destructor Documentation

◆ SocketWriter() [1/2]

core::socket::stream::SocketWriter::SocketWriter ( )
delete

◆ SocketWriter() [2/2]

core::socket::stream::SocketWriter::SocketWriter ( const std::string & instanceName,
const std::function< void(int)> & onStatus,
const utils::Timeval & timeout,
std::size_t blockSize,
const utils::Timeval & terminateTimeout )
explicitprotected

Definition at line 56 of file SocketWriter.cpp.

61 : core::eventreceiver::WriteEventReceiver(instanceName, timeout)
65 }
std::function< void(int)> onStatus

References blockSize, onStatus, terminateTimeout, utils::Timeval::Timeval(), and core::eventreceiver::WriteEventReceiver::WriteEventReceiver().

Here is the call graph for this function:

Member Function Documentation

◆ doWrite()

void core::socket::stream::SocketWriter::doWrite ( )
private

Definition at line 91 of file SocketWriter.cpp.

91 {
92 if (!writePuffer.empty()) {
93 const std::size_t writeLen = (writePuffer.size() < blockSize) ? writePuffer.size() : blockSize;
94 const ssize_t retWrite = write(writePuffer.data(), writeLen);
95
96 if (retWrite > 0) {
97 totalSent += static_cast<std::size_t>(retWrite);
98 writePuffer.erase(writePuffer.begin(), writePuffer.begin() + retWrite);
99
100 if (writePuffer.capacity() > writePuffer.size() * 2) {
101 writePuffer.shrink_to_fit();
102 }
103
104 if (!isSuspended()) {
105 suspend();
106 }
107 span();
108 } else if ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) && isSuspended()) {
109 resume();
110 } else {
111 onStatus(retWrite == 0 ? 0 : errno);
112 }
113 } else {
114 if (!isSuspended()) {
115 suspend();
116 }
117
118 if (markShutdown) {
119 LOG(TRACE) << getName() << ": Shutdown restart";
121 } else if (source != nullptr) {
122 source->resume();
123 }
124 }
125 }
const std::string & getName() const
virtual void doWriteShutdown(const std::function< void()> &onShutdown)=0
std::function< void()> onShutdown
virtual ssize_t write(const char *chunk, std::size_t chunkLen)

References blockSize, doWriteShutdown(), core::EventReceiver::getName(), core::DescriptorEventReceiver::isSuspended(), markShutdown, onShutdown, onStatus, core::DescriptorEventReceiver::resume(), core::pipe::Source::resume(), source, core::EventReceiver::span(), core::DescriptorEventReceiver::suspend(), totalSent, write(), and writePuffer.

Referenced by writeEvent().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ doWriteShutdown()

virtual void core::socket::stream::SocketWriter::doWriteShutdown ( const std::function< void()> & onShutdown)
privatepure virtual

◆ getTotalQueued()

std::size_t core::socket::stream::SocketWriter::getTotalQueued ( ) const
protected

Definition at line 71 of file SocketWriter.cpp.

71 {
72 return totalQueued;
73 }

References totalQueued.

◆ getTotalSent()

std::size_t core::socket::stream::SocketWriter::getTotalSent ( ) const
protected

Definition at line 67 of file SocketWriter.cpp.

67 {
68 return totalSent;
69 }

References totalSent.

◆ onSignal()

virtual bool core::socket::stream::SocketWriter::onSignal ( int sigNum)
privatepure virtual

◆ sendToPeer()

void core::socket::stream::SocketWriter::sendToPeer ( const char * chunk,
std::size_t chunkLen )
protected

Definition at line 131 of file SocketWriter.cpp.

131 {
133 if (isEnabled()) {
134 if (writePuffer.empty()) {
135 resume();
136 }
137
138 writePuffer.insert(writePuffer.end(), chunk, chunk + chunkLen);
139 totalQueued += chunkLen;
140
141 if (source != nullptr && writePuffer.size() > 5 * blockSize) {
142 source->suspend();
143 }
144 } else {
145 LOG(WARNING) << getName() << ": Send while not enabled";
146 }
147 } else {
148 LOG(WARNING) << getName() << ": Send while shutdown in progress: ignoring";
149 }
150 }

References blockSize, core::EventReceiver::getName(), core::DescriptorEventReceiver::isEnabled(), markShutdown, core::DescriptorEventReceiver::resume(), shutdownInProgress, source, core::pipe::Source::suspend(), totalQueued, and writePuffer.

Here is the call graph for this function:

◆ setBlockSize()

void core::socket::stream::SocketWriter::setBlockSize ( std::size_t writeBlockSize)
protected

Definition at line 127 of file SocketWriter.cpp.

127 {
128 blockSize = writeBlockSize;
129 }

References blockSize.

◆ shutdownWrite()

void core::socket::stream::SocketWriter::shutdownWrite ( const std::function< void()> & onShutdown)
protected

Definition at line 181 of file SocketWriter.cpp.

181 {
182 if (!shutdownInProgress) {
183 shutdownInProgress = true;
184
186 if (writePuffer.empty()) {
187 LOG(TRACE) << getName() << ": Shutdown start";
189 } else {
190 markShutdown = true;
191 LOG(TRACE) << getName() << ": Shutdown delayed due to queued data";
192 }
193 }
194 }

References doWriteShutdown(), core::EventReceiver::getName(), markShutdown, onShutdown, shutdownInProgress, and writePuffer.

Referenced by signalEvent().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ signalEvent()

void core::socket::stream::SocketWriter::signalEvent ( int sigNum)
finalprivatevirtual

Implements core::DescriptorEventReceiver.

Definition at line 79 of file SocketWriter.cpp.

79 {
80 if (onSignal(sigNum)) {
81 shutdownWrite([this]() {
83 });
84 }
85 }
virtual bool onSignal(int sigNum)=0
void shutdownWrite(const std::function< void()> &onShutdown)

References core::DescriptorEventReceiver::disable(), onSignal(), and shutdownWrite().

Here is the call graph for this function:

◆ streamEof()

void core::socket::stream::SocketWriter::streamEof ( )
protected

Definition at line 176 of file SocketWriter.cpp.

176 {
177 LOG(TRACE) << getName() << ": Stream EOF";
178 this->source = nullptr;
179 }

References core::EventReceiver::getName(), and source.

Here is the call graph for this function:

◆ streamToPeer()

bool core::socket::stream::SocketWriter::streamToPeer ( core::pipe::Source * source)
protected

Definition at line 152 of file SocketWriter.cpp.

152 {
153 bool success = false;
154
156 if (isEnabled()) {
157 success = source != nullptr;
158
159 if (success) {
160 LOG(TRACE) << getName() << ": Stream started";
161 } else {
162 LOG(WARNING) << getName() << ": Stream source is nullptr";
163 }
164 } else {
165 LOG(WARNING) << getName() << ": Stream while not enabled";
166 }
167 } else {
168 LOG(WARNING) << getName() << ": Stream while shutdown in progress";
169 }
170
171 this->source = source;
172
173 return success;
174 }

References core::EventReceiver::getName(), core::DescriptorEventReceiver::isEnabled(), markShutdown, shutdownInProgress, and source.

Here is the call graph for this function:

◆ write()

ssize_t core::socket::stream::SocketWriter::write ( const char * chunk,
std::size_t chunkLen )
protectedvirtual

Reimplemented in core::socket::stream::tls::SocketWriter.

Definition at line 87 of file SocketWriter.cpp.

87 {
88 return core::system::send(this->getRegisteredFd(), chunk, chunkLen, MSG_NOSIGNAL);
89 }
ssize_t send(int sockfd, const void *buf, std::size_t len, int flags)
Definition socket.cpp:96

References core::DescriptorEventReceiver::getRegisteredFd(), and core::system::send().

Referenced by doWrite(), and core::socket::stream::tls::SocketWriter::write().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ writeEvent()

void core::socket::stream::SocketWriter::writeEvent ( )
finalprivatevirtual

Implements core::eventreceiver::WriteEventReceiver.

Definition at line 75 of file SocketWriter.cpp.

References doWrite().

Here is the call graph for this function:

Member Data Documentation

◆ blockSize

std::size_t core::socket::stream::SocketWriter::blockSize = 0
private

Definition at line 114 of file SocketWriter.h.

Referenced by doWrite(), sendToPeer(), setBlockSize(), and SocketWriter().

◆ markShutdown

bool core::socket::stream::SocketWriter::markShutdown = false
protected

Definition at line 99 of file SocketWriter.h.

Referenced by doWrite(), sendToPeer(), shutdownWrite(), and streamToPeer().

◆ onShutdown

std::function<void()> core::socket::stream::SocketWriter::onShutdown
protected

Definition at line 105 of file SocketWriter.h.

Referenced by doWrite(), and shutdownWrite().

◆ onStatus

std::function<void(int)> core::socket::stream::SocketWriter::onStatus
private

Definition at line 102 of file SocketWriter.h.

Referenced by doWrite(), and SocketWriter().

◆ shutdownInProgress

bool core::socket::stream::SocketWriter::shutdownInProgress = false
protected

Definition at line 109 of file SocketWriter.h.

Referenced by sendToPeer(), shutdownWrite(), and streamToPeer().

◆ source

core::pipe::Source* core::socket::stream::SocketWriter::source = nullptr
private

Definition at line 112 of file SocketWriter.h.

Referenced by doWrite(), sendToPeer(), streamEof(), and streamToPeer().

◆ terminateTimeout

utils::Timeval core::socket::stream::SocketWriter::terminateTimeout
protected

Definition at line 120 of file SocketWriter.h.

Referenced by SocketWriter().

◆ totalQueued

std::size_t core::socket::stream::SocketWriter::totalQueued = 0
private

Definition at line 116 of file SocketWriter.h.

Referenced by getTotalQueued(), and sendToPeer().

◆ totalSent

std::size_t core::socket::stream::SocketWriter::totalSent = 0
private

Definition at line 117 of file SocketWriter.h.

Referenced by doWrite(), and getTotalSent().

◆ writePuffer

std::vector<char> core::socket::stream::SocketWriter::writePuffer
protected

Definition at line 107 of file SocketWriter.h.

Referenced by doWrite(), sendToPeer(), and shutdownWrite().


The documentation for this class was generated from the following files: