SNode.C
Loading...
Searching...
No Matches
core::socket::stream::SocketWriter Class Referenceabstract

#include <SocketWriter.h>

Inheritance diagram for core::socket::stream::SocketWriter:
Collaboration diagram for core::socket::stream::SocketWriter:

Public Member Functions

 SocketWriter ()=delete
 
- Public Member Functions inherited from core::DescriptorEventReceiver
 DescriptorEventReceiver (const std::string &name, DescriptorEventPublisher &descriptorEventPublisher, const utils::Timeval &timeout=TIMEOUT::DISABLE)
 
int getRegisteredFd () const
 
bool isEnabled () const
 
bool isSuspended () const
 
void setTimeout (const utils::Timeval &timeout)
 
utils::Timeval getTimeout (const utils::Timeval &currentTime) const
 
void checkTimeout (const utils::Timeval &currentTime)
 
- Public Member Functions inherited from core::EventReceiver
 EventReceiver (const std::string &name)
 
 EventReceiver (EventReceiver &)=delete
 
 EventReceiver (EventReceiver &&)=delete
 
EventReceiveroperator= (EventReceiver &)=delete
 
EventReceiveroperator= (EventReceiver &&)=delete
 
virtual void destruct ()
 
void span ()
 
void relax ()
 
const std::string & getName () const
 

Protected Member Functions

 SocketWriter (const std::string &instanceName, const std::function< void(int)> &onStatus, const utils::Timeval &timeout, std::size_t blockSize, const utils::Timeval &terminateTimeout)
 
virtual ssize_t write (const char *chunk, std::size_t chunkLen)
 
void setBlockSize (std::size_t writeBlockSize)
 
void sendToPeer (const char *chunk, std::size_t chunkLen)
 
bool streamToPeer (core::pipe::Source *source)
 
void streamEof ()
 
void shutdownWrite (const std::function< void()> &onShutdown)
 
- Protected Member Functions inherited from core::eventreceiver::WriteEventReceiver
 WriteEventReceiver (const std::string &name, const utils::Timeval &timeout)
 
virtual void writeTimeout ()
 
- Protected Member Functions inherited from core::DescriptorEventReceiver
bool enable (int fd)
 
void disable ()
 
void suspend ()
 
void resume ()
 
- Protected Member Functions inherited from core::Observer
void observed ()
 
void unObserved ()
 
virtual void unobservedEvent ()=0
 
 Observer ()=default
 
 Observer (Observer &)=delete
 
 Observer (Observer &&)=delete
 
virtual ~Observer ()
 
- Protected Member Functions inherited from core::EventReceiver
virtual ~EventReceiver ()=default
 
 EventReceiver (const std::string &name)
 
 EventReceiver (EventReceiver &)=delete
 
 EventReceiver (EventReceiver &&)=delete
 
EventReceiveroperator= (EventReceiver &)=delete
 
EventReceiveroperator= (EventReceiver &&)=delete
 
virtual void destruct ()
 
void span ()
 
void relax ()
 
const std::string & getName () const
 

Protected Attributes

bool markShutdown = false
 
std::function< void()> onShutdown
 
std::vector< char > writePuffer
 
bool shutdownInProgress = false
 
utils::Timeval terminateTimeout
 

Private Member Functions

void writeEvent () final
 
void signalEvent (int sigNum) final
 
void doWrite ()
 
virtual bool onSignal (int sigNum)=0
 
virtual void doWriteShutdown (const std::function< void()> &onShutdown)=0
 

Private Attributes

std::function< void(int)> onStatus
 
core::pipe::Sourcesource = nullptr
 
std::size_t blockSize = 0
 

Additional Inherited Members

- Static Public Member Functions inherited from core::EventReceiver
static void atNextTick (const std::function< void(void)> &callBack)
 
- Static Protected Member Functions inherited from core::EventReceiver
static void atNextTick (const std::function< void(void)> &callBack)
 

Detailed Description

Definition at line 65 of file SocketWriter.h.

Constructor & Destructor Documentation

◆ SocketWriter() [1/2]

core::socket::stream::SocketWriter::SocketWriter ( )
delete

◆ SocketWriter() [2/2]

core::socket::stream::SocketWriter::SocketWriter ( const std::string & instanceName,
const std::function< void(int)> & onStatus,
const utils::Timeval & timeout,
std::size_t blockSize,
const utils::Timeval & terminateTimeout )
explicitprotected

Member Function Documentation

◆ doWrite()

void core::socket::stream::SocketWriter::doWrite ( )
private

Definition at line 84 of file SocketWriter.cpp.

84 {
85 if (!writePuffer.empty()) {
86 const std::size_t writeLen = (writePuffer.size() < blockSize) ? writePuffer.size() : blockSize;
87 const ssize_t retWrite = write(writePuffer.data(), writeLen);
88
89 if (retWrite > 0) {
90 writePuffer.erase(writePuffer.begin(), writePuffer.begin() + retWrite);
91
92 if (writePuffer.capacity() > writePuffer.size() * 2) {
93 writePuffer.shrink_to_fit();
94 }
95
96 if (!isSuspended()) {
97 suspend();
98 }
99 span();
100 } else if ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) && isSuspended()) {
101 resume();
102 } else {
103 onStatus(retWrite == 0 ? 0 : errno);
104 }
105 } else {
106 if (!isSuspended()) {
107 suspend();
108 }
109
110 if (markShutdown) {
111 LOG(TRACE) << getName() << ": Shutdown restart";
113 } else if (source != nullptr) {
114 source->resume();
115 }
116 }
117 }
const std::string & getName() const
virtual void resume()=0
virtual void doWriteShutdown(const std::function< void()> &onShutdown)=0
std::function< void()> onShutdown
virtual ssize_t write(const char *chunk, std::size_t chunkLen)

◆ doWriteShutdown()

virtual void core::socket::stream::SocketWriter::doWriteShutdown ( const std::function< void()> & onShutdown)
privatepure virtual

◆ onSignal()

virtual bool core::socket::stream::SocketWriter::onSignal ( int sigNum)
privatepure virtual

◆ sendToPeer()

void core::socket::stream::SocketWriter::sendToPeer ( const char * chunk,
std::size_t chunkLen )
protected

Definition at line 123 of file SocketWriter.cpp.

123 {
125 if (isEnabled()) {
126 if (writePuffer.empty()) {
127 resume();
128 }
129
130 writePuffer.insert(writePuffer.end(), chunk, chunk + chunkLen);
131
132 if (source != nullptr && writePuffer.size() > 5 * blockSize) {
133 source->suspend();
134 }
135 } else {
136 LOG(WARNING) << getName() << ": Send while not enabled";
137 }
138 } else {
139 LOG(WARNING) << getName() << ": Send while shutdown in progress";
140 }
141 }
virtual void suspend()=0

◆ setBlockSize()

void core::socket::stream::SocketWriter::setBlockSize ( std::size_t writeBlockSize)
protected

Definition at line 119 of file SocketWriter.cpp.

119 {
120 blockSize = writeBlockSize;
121 }

◆ shutdownWrite()

void core::socket::stream::SocketWriter::shutdownWrite ( const std::function< void()> & onShutdown)
protected

Definition at line 172 of file SocketWriter.cpp.

172 {
173 if (!shutdownInProgress) {
174 shutdownInProgress = true;
175
177 if (writePuffer.empty()) {
178 LOG(TRACE) << getName() << ": Shutdown start";
180 } else {
181 markShutdown = true;
182 LOG(TRACE) << getName() << ": Shutdown delayed due to queued data";
183 }
184 }
185 }

◆ signalEvent()

void core::socket::stream::SocketWriter::signalEvent ( int sigNum)
finalprivatevirtual

Implements core::DescriptorEventReceiver.

Definition at line 72 of file SocketWriter.cpp.

72 {
73 if (onSignal(sigNum)) {
74 shutdownWrite([this]() {
76 });
77 }
78 }
virtual bool onSignal(int sigNum)=0
void shutdownWrite(const std::function< void()> &onShutdown)

◆ streamEof()

void core::socket::stream::SocketWriter::streamEof ( )
protected

Definition at line 167 of file SocketWriter.cpp.

167 {
168 LOG(TRACE) << getName() << ": Stream EOF";
169 this->source = nullptr;
170 }

◆ streamToPeer()

bool core::socket::stream::SocketWriter::streamToPeer ( core::pipe::Source * source)
protected

Definition at line 143 of file SocketWriter.cpp.

143 {
144 bool success = false;
145
147 if (isEnabled()) {
148 success = source != nullptr;
149
150 if (success) {
151 LOG(TRACE) << getName() << ": Stream started";
152 } else {
153 LOG(WARNING) << getName() << ": Stream source is nullptr";
154 }
155 } else {
156 LOG(WARNING) << getName() << ": Stream while not enabled";
157 }
158 } else {
159 LOG(WARNING) << getName() << ": Stream while shutdown in progress";
160 }
161
162 this->source = source;
163
164 return success;
165 }

◆ write()

ssize_t core::socket::stream::SocketWriter::write ( const char * chunk,
std::size_t chunkLen )
protectedvirtual

Reimplemented in core::socket::stream::tls::SocketWriter.

Definition at line 80 of file SocketWriter.cpp.

80 {
81 return core::system::send(this->getRegisteredFd(), chunk, chunkLen, MSG_NOSIGNAL);
82 }
ssize_t send(int sockfd, const void *buf, std::size_t len, int flags)
Definition socket.cpp:96

◆ writeEvent()

void core::socket::stream::SocketWriter::writeEvent ( )
finalprivatevirtual

Implements core::eventreceiver::WriteEventReceiver.

Definition at line 68 of file SocketWriter.cpp.

Member Data Documentation

◆ blockSize

std::size_t core::socket::stream::SocketWriter::blockSize = 0
private

Definition at line 112 of file SocketWriter.h.

◆ markShutdown

bool core::socket::stream::SocketWriter::markShutdown = false
protected

Definition at line 97 of file SocketWriter.h.

◆ onShutdown

std::function<void()> core::socket::stream::SocketWriter::onShutdown
protected

Definition at line 103 of file SocketWriter.h.

◆ onStatus

std::function<void(int)> core::socket::stream::SocketWriter::onStatus
private

Definition at line 100 of file SocketWriter.h.

◆ shutdownInProgress

bool core::socket::stream::SocketWriter::shutdownInProgress = false
protected

Definition at line 107 of file SocketWriter.h.

◆ source

core::pipe::Source* core::socket::stream::SocketWriter::source = nullptr
private

Definition at line 110 of file SocketWriter.h.

◆ terminateTimeout

utils::Timeval core::socket::stream::SocketWriter::terminateTimeout
protected

Definition at line 115 of file SocketWriter.h.

◆ writePuffer

std::vector<char> core::socket::stream::SocketWriter::writePuffer
protected

Definition at line 105 of file SocketWriter.h.


The documentation for this class was generated from the following files: