40 MariaDBConnection::MariaDBConnection(MariaDBClient* mariaDBClient,
const MariaDBConnectionDetails& connectionDetails)
41 : ReadEventReceiver(
"MariaDBConnectionRead", core::DescriptorEventReceiver::TIMEOUT::DISABLE)
42 , WriteEventReceiver(
"MariaDBConnectionWrite", core::DescriptorEventReceiver::TIMEOUT::DISABLE)
43 , ExceptionalConditionEventReceiver(
"MariaDBConnectionExceptional", core::DescriptorEventReceiver::TIMEOUT::DISABLE)
44 , mariaDBClient(mariaDBClient)
45 , mysql(mysql_init(
nullptr))
46 , commandStartEvent(
"MariaDBCommandStartEvent",
this) {
47 mysql_options(mysql, MYSQL_OPT_NONBLOCK,
nullptr);
49 execute_async(std::move(MariaDBCommandSequence().execute_async(
new database::mariadb::commands::async::MariaDBConnectCommand(
52 if (mysql_errno(mysql) == 0) {
53 const int fd = mysql_get_socket(mysql);
55 LOG(DEBUG) <<
"MariaDB: Got valid descriptor: " << fd;
57 if (ReadEventReceiver::enable(fd) && WriteEventReceiver::enable(fd) && ExceptionalConditionEventReceiver::enable(fd)) {
58 ReadEventReceiver::suspend();
59 WriteEventReceiver::suspend();
60 ExceptionalConditionEventReceiver::suspend();
64 if (ReadEventReceiver::isEnabled()) {
65 ReadEventReceiver::disable();
67 if (WriteEventReceiver::isEnabled()) {
68 WriteEventReceiver::disable();
70 if (ExceptionalConditionEventReceiver::isEnabled()) {
71 ExceptionalConditionEventReceiver::disable();
75 LOG(WARNING) <<
"MariaDB: Got no valid descriptor: " << mysql_error(mysql) <<
", " << mysql_errno(mysql);
79 LOG(DEBUG) <<
"MariaDB: Connect success";
81 [](
const std::string& errorString,
unsigned int errorNumber) {
82 LOG(WARNING) <<
"MariaDB: Connect error: " << errorString <<
" : " << errorNumber;
86 MariaDBConnection::~MariaDBConnection() {
87 for (MariaDBCommandSequence& mariaDBCommandSequence : commandSequenceQueue) {
88 for (MariaDBCommand* mariaDBCommand : mariaDBCommandSequence.sequence()) {
89 if (core::SNodeC::state() == core::State::RUNNING && connected) {
90 mariaDBCommand->commandError(mysql_error(mysql), mysql_errno(mysql));
93 delete mariaDBCommand;
97 if (mariaDBClient !=
nullptr) {
98 mariaDBClient->connectionVanished();
105 MariaDBCommandSequence& MariaDBConnection::execute_async(MariaDBCommandSequence&& commandSequence) {
106 if (currentCommand ==
nullptr && commandSequenceQueue.empty()) {
107 commandStartEvent.span();
110 commandSequenceQueue.emplace_back(std::move(commandSequence));
112 return commandSequenceQueue.back();
115 void MariaDBConnection::execute_sync(MariaDBCommand* mariaDBCommand) {
116 mariaDBCommand->commandStart(mysql, utils::Timeval::currentTime());
118 if (mysql_errno(mysql) == 0) {
119 if (mariaDBCommand->commandCompleted()) {
122 mariaDBCommand->commandError(mysql_error(mysql), mysql_errno(mysql));
125 delete mariaDBCommand;
128 void MariaDBConnection::commandStart(
const utils::Timeval& currentTime) {
129 if (!commandSequenceQueue.empty()) {
130 currentCommand = commandSequenceQueue.front().nextCommand();
132 LOG(DEBUG) <<
"MariaDB: Start: " << currentCommand->commandInfo();
134 currentCommand->setMariaDBConnection(
this);
135 checkStatus(currentCommand->commandStart(mysql, currentTime));
136 }
else if (mariaDBClient !=
nullptr) {
137 if (ReadEventReceiver::isSuspended() && ReadEventReceiver::isEnabled()) {
138 ReadEventReceiver::resume();
141 ReadEventReceiver::disable();
142 WriteEventReceiver::disable();
143 ExceptionalConditionEventReceiver::disable();
147 void MariaDBConnection::commandContinue(
int status) {
148 if (currentCommand !=
nullptr) {
149 checkStatus(currentCommand->commandContinue(status));
150 }
else if ((status & MYSQL_WAIT_READ) != 0 && commandSequenceQueue.empty()) {
151 ReadEventReceiver::disable();
152 WriteEventReceiver::disable();
153 ExceptionalConditionEventReceiver::disable();
157 void MariaDBConnection::commandCompleted() {
158 LOG(DEBUG) <<
"MariaDB: Completed: " << currentCommand->commandInfo();
159 commandSequenceQueue.front().commandCompleted();
161 if (commandSequenceQueue.front().empty()) {
162 commandSequenceQueue.pop_front();
165 delete currentCommand;
166 currentCommand =
nullptr;
173 void MariaDBConnection::checkStatus(
int status) {
175 if ((status & MYSQL_WAIT_READ) != 0) {
176 if (ReadEventReceiver::isSuspended() && ReadEventReceiver::isEnabled()) {
177 ReadEventReceiver::resume();
179 }
else if (!ReadEventReceiver::isSuspended()) {
180 ReadEventReceiver::suspend();
183 if ((status & MYSQL_WAIT_WRITE) != 0) {
184 if (WriteEventReceiver::isSuspended() && WriteEventReceiver::isEnabled()) {
185 WriteEventReceiver::resume();
187 }
else if (!WriteEventReceiver::isSuspended()) {
188 WriteEventReceiver::suspend();
191 if ((status & MYSQL_WAIT_EXCEPT) != 0) {
192 if (ExceptionalConditionEventReceiver::isSuspended() && ExceptionalConditionEventReceiver::isEnabled()) {
193 ExceptionalConditionEventReceiver::resume();
195 }
else if (!ExceptionalConditionEventReceiver::isSuspended()) {
196 ExceptionalConditionEventReceiver::suspend();
199 if ((status & MYSQL_WAIT_TIMEOUT) != 0) {
200 ReadEventReceiver::setTimeout(mysql_get_timeout_value(mysql));
204 ReadEventReceiver::setTimeout(core::DescriptorEventReceiver::TIMEOUT::DEFAULT);
210 if (mysql_errno(mysql) == 0) {
211 if (currentCommand->commandCompleted()) {
215 currentCommand->commandError(mysql_error(mysql), mysql_errno(mysql));
218 commandStartEvent.span();
221 currentCommand->commandError(mysql_error(mysql), mysql_errno(mysql));
255 MariaDBCommandStartEvent::MariaDBCommandStartEvent(
const std::string& name, MariaDBConnection* mariaDBConnection)
256 : core::EventReceiver(name)
257 , mariaDBConnection(mariaDBConnection) {