2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
43#include "database/mariadb/MariaDBConnection.h"
45#include "database/mariadb/MariaDBClient.h"
46#include "database/mariadb/commands/async/MariaDBConnectCommand.h"
48#ifndef DOXYGEN_SHOULD_SKIP_THIS
50#include "core/SNodeC.h"
51#include "log/Logger.h"
52#include "utils/Timeval.h"
60namespace database::mariadb {
62 MariaDBConnection::MariaDBConnection(MariaDBClient* mariaDBClient,
const MariaDBConnectionDetails& connectionDetails)
63 : ReadEventReceiver(
"MariaDBConnectionRead", core::DescriptorEventReceiver::TIMEOUT::DISABLE)
64 , WriteEventReceiver(
"MariaDBConnectionWrite", core::DescriptorEventReceiver::TIMEOUT::DISABLE)
65 , ExceptionalConditionEventReceiver(
"MariaDBConnectionExceptional", core::DescriptorEventReceiver::TIMEOUT::DISABLE)
66 , mariaDBClient(mariaDBClient)
67 , mysql(mysql_init(
nullptr))
68 , commandStartEvent(
"MariaDBCommandStartEvent",
this) {
69 mysql_options(mysql, MYSQL_OPT_NONBLOCK,
nullptr);
71 execute_async(std::move(MariaDBCommandSequence().execute_async(
new database::mariadb::commands::async::MariaDBConnectCommand(
74 if (mysql_errno(mysql) == 0) {
75 const int fd = mysql_get_socket(mysql);
77 LOG(DEBUG) <<
"MariaDB: Got valid descriptor: " << fd;
79 if (ReadEventReceiver::enable(fd) && WriteEventReceiver::enable(fd) && ExceptionalConditionEventReceiver::enable(fd)) {
80 ReadEventReceiver::suspend();
81 WriteEventReceiver::suspend();
82 ExceptionalConditionEventReceiver::suspend();
86 if (ReadEventReceiver::isEnabled()) {
87 ReadEventReceiver::disable();
89 if (WriteEventReceiver::isEnabled()) {
90 WriteEventReceiver::disable();
92 if (ExceptionalConditionEventReceiver::isEnabled()) {
93 ExceptionalConditionEventReceiver::disable();
97 LOG(WARNING) <<
"MariaDB: Got no valid descriptor: " << mysql_error(mysql) <<
", " << mysql_errno(mysql);
101 LOG(DEBUG) <<
"MariaDB: Connect success";
103 [](
const std::string& errorString,
unsigned int errorNumber) {
104 LOG(WARNING) <<
"MariaDB: Connect error: " << errorString <<
" : " << errorNumber;
108 MariaDBConnection::~MariaDBConnection() {
109 for (MariaDBCommandSequence& mariaDBCommandSequence : commandSequenceQueue) {
110 for (MariaDBCommand* mariaDBCommand : mariaDBCommandSequence.sequence()) {
111 if (core::SNodeC::state() == core::State::RUNNING && connected) {
112 mariaDBCommand->commandError(mysql_error(mysql), mysql_errno(mysql));
115 delete mariaDBCommand;
119 if (mariaDBClient !=
nullptr) {
120 mariaDBClient->connectionVanished();
127 MariaDBCommandSequence& MariaDBConnection::execute_async(MariaDBCommandSequence&& commandSequence) {
128 if (currentCommand ==
nullptr && commandSequenceQueue.empty()) {
129 commandStartEvent.span();
132 commandSequenceQueue.emplace_back(std::move(commandSequence));
134 return commandSequenceQueue.back();
137 void MariaDBConnection::execute_sync(MariaDBCommand* mariaDBCommand) {
138 mariaDBCommand->commandStart(mysql, utils::Timeval::currentTime());
140 if (mysql_errno(mysql) == 0) {
141 if (mariaDBCommand->commandCompleted()) {
144 mariaDBCommand->commandError(mysql_error(mysql), mysql_errno(mysql));
147 delete mariaDBCommand;
150 void MariaDBConnection::commandStart(
const utils::Timeval& currentTime) {
151 if (!commandSequenceQueue.empty()) {
152 currentCommand = commandSequenceQueue.front().nextCommand();
154 LOG(DEBUG) <<
"MariaDB: Start: " << currentCommand->commandInfo();
156 currentCommand->setMariaDBConnection(
this);
157 checkStatus(currentCommand->commandStart(mysql, currentTime));
158 }
else if (mariaDBClient !=
nullptr) {
159 if (ReadEventReceiver::isSuspended() && ReadEventReceiver::isEnabled()) {
160 ReadEventReceiver::resume();
163 ReadEventReceiver::disable();
164 WriteEventReceiver::disable();
165 ExceptionalConditionEventReceiver::disable();
169 void MariaDBConnection::commandContinue(
int status) {
170 if (currentCommand !=
nullptr) {
171 checkStatus(currentCommand->commandContinue(status));
172 }
else if ((status & MYSQL_WAIT_READ) != 0 && commandSequenceQueue.empty()) {
173 ReadEventReceiver::disable();
174 WriteEventReceiver::disable();
175 ExceptionalConditionEventReceiver::disable();
179 void MariaDBConnection::commandCompleted() {
180 LOG(DEBUG) <<
"MariaDB: Completed: " << currentCommand->commandInfo();
181 commandSequenceQueue.front().commandCompleted();
183 if (commandSequenceQueue.front().empty()) {
184 commandSequenceQueue.pop_front();
187 delete currentCommand;
188 currentCommand =
nullptr;
191 void MariaDBConnection::unmanaged() {
192 mariaDBClient =
nullptr;
195 void MariaDBConnection::checkStatus(
int status) {
197 if ((status & MYSQL_WAIT_READ) != 0) {
198 if (ReadEventReceiver::isSuspended() && ReadEventReceiver::isEnabled()) {
199 ReadEventReceiver::resume();
201 }
else if (!ReadEventReceiver::isSuspended()) {
202 ReadEventReceiver::suspend();
205 if ((status & MYSQL_WAIT_WRITE) != 0) {
206 if (WriteEventReceiver::isSuspended() && WriteEventReceiver::isEnabled()) {
207 WriteEventReceiver::resume();
209 }
else if (!WriteEventReceiver::isSuspended()) {
210 WriteEventReceiver::suspend();
213 if ((status & MYSQL_WAIT_EXCEPT) != 0) {
214 if (ExceptionalConditionEventReceiver::isSuspended() && ExceptionalConditionEventReceiver::isEnabled()) {
215 ExceptionalConditionEventReceiver::resume();
217 }
else if (!ExceptionalConditionEventReceiver::isSuspended()) {
218 ExceptionalConditionEventReceiver::suspend();
221 if ((status & MYSQL_WAIT_TIMEOUT) != 0) {
222 ReadEventReceiver::setTimeout(mysql_get_timeout_value(mysql));
226 ReadEventReceiver::setTimeout(core::DescriptorEventReceiver::TIMEOUT::DEFAULT);
232 if (mysql_errno(mysql) == 0) {
233 if (currentCommand->commandCompleted()) {
237 currentCommand->commandError(mysql_error(mysql), mysql_errno(mysql));
240 commandStartEvent.span();
243 currentCommand->commandError(mysql_error(mysql), mysql_errno(mysql));
249 void MariaDBConnection::readEvent() {
250 commandContinue(MYSQL_WAIT_READ);
253 void MariaDBConnection::writeEvent() {
254 commandContinue(MYSQL_WAIT_WRITE);
257 void MariaDBConnection::outOfBandEvent() {
258 commandContinue(MYSQL_WAIT_EXCEPT);
261 void MariaDBConnection::readTimeout() {
262 commandContinue(MYSQL_WAIT_TIMEOUT);
265 void MariaDBConnection::writeTimeout() {
266 commandContinue(MYSQL_WAIT_TIMEOUT);
269 void MariaDBConnection::outOfBandTimeout() {
270 commandContinue(MYSQL_WAIT_TIMEOUT);
273 void MariaDBConnection::unobservedEvent() {
277 MariaDBCommandStartEvent::MariaDBCommandStartEvent(
const std::string& name, MariaDBConnection* mariaDBConnection)
278 : core::EventReceiver(name)
279 , mariaDBConnection(mariaDBConnection) {
282 void MariaDBCommandStartEvent::onEvent(
const utils::Timeval& currentTime) {
283 mariaDBConnection->commandStart(currentTime);
286 void MariaDBCommandStartEvent::destruct() {
287 delete mariaDBConnection;