SNode.C
Loading...
Searching...
No Matches
core::socket::stream::SocketWriter Class Referenceabstract

#include <SocketWriter.h>

Inheritance diagram for core::socket::stream::SocketWriter:
Collaboration diagram for core::socket::stream::SocketWriter:

Public Member Functions

 SocketWriter ()=delete
 
- Public Member Functions inherited from core::DescriptorEventReceiver
 DescriptorEventReceiver (const std::string &name, DescriptorEventPublisher &descriptorEventPublisher, const utils::Timeval &timeout=TIMEOUT::DISABLE)
 
int getRegisteredFd () const
 
bool isEnabled () const
 
bool isSuspended () const
 
void setTimeout (const utils::Timeval &timeout)
 
utils::Timeval getTimeout (const utils::Timeval &currentTime) const
 
void checkTimeout (const utils::Timeval &currentTime)
 
- Public Member Functions inherited from core::EventReceiver
 EventReceiver (const std::string &name)
 
 EventReceiver (EventReceiver &)=delete
 
 EventReceiver (EventReceiver &&)=delete
 
EventReceiveroperator= (EventReceiver &)=delete
 
EventReceiveroperator= (EventReceiver &&)=delete
 
virtual void destruct ()
 
void span ()
 
void relax ()
 
const std::string & getName () const
 

Protected Member Functions

 SocketWriter (const std::string &instanceName, const std::function< void(int)> &onStatus, const utils::Timeval &timeout, std::size_t blockSize, const utils::Timeval &terminateTimeout)
 
virtual ssize_t write (const char *chunk, std::size_t chunkLen)
 
void setBlockSize (std::size_t writeBlockSize)
 
void sendToPeer (const char *chunk, std::size_t chunkLen)
 
bool streamToPeer (core::pipe::Source *source)
 
void streamEof ()
 
void shutdownWrite (const std::function< void()> &onShutdown)
 
- Protected Member Functions inherited from core::eventreceiver::WriteEventReceiver
 WriteEventReceiver (const std::string &name, const utils::Timeval &timeout)
 
virtual void writeTimeout ()
 
- Protected Member Functions inherited from core::DescriptorEventReceiver
bool enable (int fd)
 
void disable ()
 
void suspend ()
 
void resume ()
 
- Protected Member Functions inherited from core::Observer
void observed ()
 
void unObserved ()
 
virtual void unobservedEvent ()=0
 
 Observer ()=default
 
 Observer (Observer &)=delete
 
 Observer (Observer &&)=delete
 
virtual ~Observer ()
 
- Protected Member Functions inherited from core::EventReceiver
virtual ~EventReceiver ()=default
 
 EventReceiver (const std::string &name)
 
 EventReceiver (EventReceiver &)=delete
 
 EventReceiver (EventReceiver &&)=delete
 
EventReceiveroperator= (EventReceiver &)=delete
 
EventReceiveroperator= (EventReceiver &&)=delete
 
virtual void destruct ()
 
void span ()
 
void relax ()
 
const std::string & getName () const
 

Protected Attributes

bool markShutdown = false
 
std::function< void()> onShutdown
 
std::vector< char > writePuffer
 
bool shutdownInProgress = false
 
utils::Timeval terminateTimeout
 

Private Member Functions

void writeEvent () final
 
void signalEvent (int sigNum) final
 
void doWrite ()
 
virtual bool onSignal (int sigNum)=0
 
virtual void doWriteShutdown (const std::function< void()> &onShutdown)=0
 

Private Attributes

std::function< void(int)> onStatus
 
core::pipe::Sourcesource = nullptr
 
std::size_t blockSize = 0
 

Additional Inherited Members

- Static Public Member Functions inherited from core::EventReceiver
static void atNextTick (const std::function< void(void)> &callBack)
 
- Static Protected Member Functions inherited from core::EventReceiver
static void atNextTick (const std::function< void(void)> &callBack)
 

Detailed Description

Definition at line 43 of file SocketWriter.h.

Constructor & Destructor Documentation

◆ SocketWriter() [1/2]

core::socket::stream::SocketWriter::SocketWriter ( )
delete

◆ SocketWriter() [2/2]

core::socket::stream::SocketWriter::SocketWriter ( const std::string & instanceName,
const std::function< void(int)> & onStatus,
const utils::Timeval & timeout,
std::size_t blockSize,
const utils::Timeval & terminateTimeout )
explicitprotected

Member Function Documentation

◆ doWrite()

void core::socket::stream::SocketWriter::doWrite ( )
private

Definition at line 62 of file SocketWriter.cpp.

62 {
63 if (!writePuffer.empty()) {
64 const std::size_t writeLen = (writePuffer.size() < blockSize) ? writePuffer.size() : blockSize;
65 const ssize_t retWrite = write(writePuffer.data(), writeLen);
66
67 if (retWrite > 0) {
68 writePuffer.erase(writePuffer.begin(), writePuffer.begin() + retWrite);
69
70 if (writePuffer.capacity() > writePuffer.size() * 2) {
71 writePuffer.shrink_to_fit();
72 }
73
74 if (!isSuspended()) {
75 suspend();
76 }
77 span();
78 } else if ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) && isSuspended()) {
79 resume();
80 } else {
81 onStatus(retWrite == 0 ? 0 : errno);
82 }
83 } else {
84 if (!isSuspended()) {
85 suspend();
86 }
87
88 if (markShutdown) {
89 LOG(TRACE) << getName() << ": Shutdown restart";
91 } else if (source != nullptr) {
92 source->resume();
93 }
94 }
95 }
const std::string & getName() const
virtual void resume()=0
virtual void doWriteShutdown(const std::function< void()> &onShutdown)=0
std::function< void()> onShutdown
virtual ssize_t write(const char *chunk, std::size_t chunkLen)

◆ doWriteShutdown()

virtual void core::socket::stream::SocketWriter::doWriteShutdown ( const std::function< void()> & onShutdown)
privatepure virtual

◆ onSignal()

virtual bool core::socket::stream::SocketWriter::onSignal ( int sigNum)
privatepure virtual

◆ sendToPeer()

void core::socket::stream::SocketWriter::sendToPeer ( const char * chunk,
std::size_t chunkLen )
protected

Definition at line 101 of file SocketWriter.cpp.

101 {
103 if (isEnabled()) {
104 if (writePuffer.empty()) {
105 resume();
106 }
107
108 writePuffer.insert(writePuffer.end(), chunk, chunk + chunkLen);
109
110 if (source != nullptr && writePuffer.size() > 5 * blockSize) {
111 source->suspend();
112 }
113 } else {
114 LOG(WARNING) << getName() << ": Send while not enabled";
115 }
116 } else {
117 LOG(WARNING) << getName() << ": Send while shutdown in progress";
118 }
119 }
virtual void suspend()=0

◆ setBlockSize()

void core::socket::stream::SocketWriter::setBlockSize ( std::size_t writeBlockSize)
protected

Definition at line 97 of file SocketWriter.cpp.

97 {
98 blockSize = writeBlockSize;
99 }

◆ shutdownWrite()

void core::socket::stream::SocketWriter::shutdownWrite ( const std::function< void()> & onShutdown)
protected

Definition at line 150 of file SocketWriter.cpp.

150 {
151 if (!shutdownInProgress) {
152 shutdownInProgress = true;
153
155 if (writePuffer.empty()) {
156 LOG(TRACE) << getName() << ": Shutdown start";
158 } else {
159 markShutdown = true;
160 LOG(TRACE) << getName() << ": Shutdown delayed due to queued data";
161 }
162 }
163 }

◆ signalEvent()

void core::socket::stream::SocketWriter::signalEvent ( int sigNum)
finalprivatevirtual

Implements core::DescriptorEventReceiver.

Definition at line 50 of file SocketWriter.cpp.

50 {
51 if (onSignal(sigNum)) {
52 shutdownWrite([this]() {
54 });
55 }
56 }
virtual bool onSignal(int sigNum)=0
void shutdownWrite(const std::function< void()> &onShutdown)

◆ streamEof()

void core::socket::stream::SocketWriter::streamEof ( )
protected

Definition at line 145 of file SocketWriter.cpp.

145 {
146 LOG(TRACE) << getName() << ": Stream EOF";
147 this->source = nullptr;
148 }

◆ streamToPeer()

bool core::socket::stream::SocketWriter::streamToPeer ( core::pipe::Source * source)
protected

Definition at line 121 of file SocketWriter.cpp.

121 {
122 bool success = false;
123
125 if (isEnabled()) {
126 success = source != nullptr;
127
128 if (success) {
129 LOG(TRACE) << getName() << ": Stream started";
130 } else {
131 LOG(WARNING) << getName() << ": Stream source is nullptr";
132 }
133 } else {
134 LOG(WARNING) << getName() << ": Stream while not enabled";
135 }
136 } else {
137 LOG(WARNING) << getName() << ": Stream while shutdown in progress";
138 }
139
140 this->source = source;
141
142 return success;
143 }

◆ write()

ssize_t core::socket::stream::SocketWriter::write ( const char * chunk,
std::size_t chunkLen )
protectedvirtual

Reimplemented in core::socket::stream::tls::SocketWriter.

Definition at line 58 of file SocketWriter.cpp.

58 {
59 return core::system::send(this->getRegisteredFd(), chunk, chunkLen, MSG_NOSIGNAL);
60 }
ssize_t send(int sockfd, const void *buf, std::size_t len, int flags)
Definition socket.cpp:74

◆ writeEvent()

void core::socket::stream::SocketWriter::writeEvent ( )
finalprivatevirtual

Implements core::eventreceiver::WriteEventReceiver.

Definition at line 46 of file SocketWriter.cpp.

Member Data Documentation

◆ blockSize

std::size_t core::socket::stream::SocketWriter::blockSize = 0
private

Definition at line 90 of file SocketWriter.h.

◆ markShutdown

bool core::socket::stream::SocketWriter::markShutdown = false
protected

Definition at line 75 of file SocketWriter.h.

◆ onShutdown

std::function<void()> core::socket::stream::SocketWriter::onShutdown
protected

Definition at line 81 of file SocketWriter.h.

◆ onStatus

std::function<void(int)> core::socket::stream::SocketWriter::onStatus
private

Definition at line 78 of file SocketWriter.h.

◆ shutdownInProgress

bool core::socket::stream::SocketWriter::shutdownInProgress = false
protected

Definition at line 85 of file SocketWriter.h.

◆ source

core::pipe::Source* core::socket::stream::SocketWriter::source = nullptr
private

Definition at line 88 of file SocketWriter.h.

◆ terminateTimeout

utils::Timeval core::socket::stream::SocketWriter::terminateTimeout
protected

Definition at line 93 of file SocketWriter.h.

◆ writePuffer

std::vector<char> core::socket::stream::SocketWriter::writePuffer
protected

Definition at line 83 of file SocketWriter.h.


The documentation for this class was generated from the following files: