C++ 异步请求 MySQL 数据库
在现代高性能 C++ 应用中,异步数据库操作是提升性能和资源利用率的关键技术。本文将详细介绍 C++ 中异步操作 MySQL 数据库的多种优化方案,涵盖从基础实现到高级架构设计。
一、技术选型与架构设计
1.1 主流异步技术栈
C++ 异步 MySQL 操作主要有以下几种技术路线:
Boost.Asio + MySQL Connector/C++:基于事件驱动的异步 I/O 模型,适合构建高性能网络服务
C++20 协程:使用协程简化异步代码编写,提供同步风格的异步编程体验
REACT-CPP:基于回调的异步库,适合事件驱动型应用
线程池 +Future/Promise:传统但可靠的异步方案,兼容性最好
1.2 分层架构设计
推荐采用分层架构设计,各层职责明确:
+---------------------+
| HTTP/API接口层 |
| (如Boost.Beast) |
+---------------------+
| 异步任务调度层 |
| (协程/线程池) |
+---------------------+
| 数据库连接池层 |
| (MySQL连接管理) |
+---------------------+
| 业务逻辑层 |
+---------------------+这种架构下,HTTP 请求处理层负责接收请求,异步任务层负责调度数据库操作,连接池层管理物理连接,业务层处理核心逻辑。
二、环境搭建与依赖配置
2.1 基础依赖安装
对于 Boost.Asio 方案:
# Ubuntu/Debian系统
sudo apt-get install libboost-all-dev libmysqlcppconn-dev
# macOS系统
brew install boost mysql-connector-c++对于 C++20 协程方案,需要支持 C++20 的编译器(g++ ≥ 13 或 clang++ ≥ 16)。
2.2 CMake配置示例
cmake_minimum_required(VERSION 3.10)
project(AsyncMySQL)
set(CMAKE_CXX_STANDARD 20)
# 查找Boost库
find_package(Boost 1.82 REQUIRED COMPONENTS system thread asio)
include_directories(${Boost_INCLUDE_DIRS})
# 查找MySQL Connector
find_package(MySQLConnectorCpp REQUIRED)
include_directories(${MYSQLCONNECTORCPP_INCLUDE_DIRS})
# 添加可执行文件
add_executable(server main.cpp database.cpp)
# 链接库
target_link_libraries(server
${Boost_LIBRARIES}
${MYSQLCONNECTORCPP_LIBRARIES}
pthread
)三、核心组件实现
3.1 异步连接池设计
数据库连接池是提高性能的关键组件,以下是线程安全的异步连接池实现:
class AsyncMySQLPool {
public:
using Callback = std::function<void(std::shared_ptr<sql::ResultSet>)>;
struct Task {
std::string query;
Callback callback;
};
AsyncMySQLPool(const std::string& host, const std::string& user,
const std::string& password, const std::string& database,
size_t pool_size = 10);
~AsyncMySQLPool();
void execute(const std::string& query, Callback callback);
private:
class Worker {
public:
void add_task(Task task);
bool check_connection();
private:
void run();
void execute_task(const Task& task);
void connect();
std::queue<Task> m_queue;
std::mutex m_mutex;
std::condition_variable m_cv;
std::unique_ptr<sql::Connection> m_conn;
bool m_running = true;
};
std::vector<std::unique_ptr<Worker>> m_workers;
std::atomic<size_t> m_next_worker = 0;
};该连接池为每个工作线程维护独立的连接和任务队列,减少锁竞争。
3.2 基于协程的异步查询
对于支持 C++20 的项目,可以使用协程简化异步代码:
struct DBQueryAwaiter {
std::string sql;
MySQLPool& pool;
std::exception_ptr error;
std::string json_result;
bool await_ready() { return false; }
void await_suspend(std::coroutine_handle<> h) {
std::thread([this, h] {
try {
auto conn = pool.get();
MYSQL_RES* res = nullptr;
mysql_query(conn, sql.c_str());
res = mysql_use_result(conn);
std::ostringstream oss;
// 将结果转为JSON格式
oss << "[";
bool first_row = true;
while (auto row = mysql_fetch_row(res)) {
if (!first_row) oss << ",";
oss << "{";
// 处理每行数据
for (int i = 0; i < mysql_num_fields(res); ++i) {
if (i > 0) oss << ",";
oss << "\"" << mysql_fetch_field_direct(res, i)->name
<< "\":\"" << (row[i] ? row[i] : "") << "\"";
}
oss << "}";
first_row = false;
}
oss << "]";
mysql_free_result(res);
pool.release(conn);
json_result = oss.str();
} catch (...) {
error = std::current_exception();
}
h.resume();
}).detach();
}
std::string await_resume() {
if (error) std::rethrow_exception(error);
return json_result;
}
};
inline DBQueryAwaiter query(MySQLPool& pool, const std::string& sql) {
return DBQueryAwaiter{sql, pool};
}使用方式:
Task<> handle_users(MySQLPool& pool) {
try {
std::string json = co_await query(pool, "SELECT id, name, age FROM users LIMIT 10");
std::cout << "[Result] " << json << "\n";
} catch (const std::exception& e) {
std::cerr << "[DB Error] " << e.what() << "\n";
}
co_return;
}这种协程方式使异步代码看起来像同步代码一样直观。
3.3 Future/Promise模式实现
对于不支持协程的环境,可以使用 Future/Promise 模式:
auto AsyncQuery(MySQLPool& pool, const std::string& sql) {
auto promise = std::make_shared<std::promise<QueryResult>>();
std::thread([=, &pool] {
try {
auto conn = pool.getConnection();
auto stmt = conn->createStatement();
auto res = stmt->executeQuery(sql);
QueryResult result;
while (res->next()) {
// 处理结果集
}
promise->set_value(result);
pool.releaseConnection(conn);
} catch (...) {
promise->set_exception(std::current_exception());
}
}).detach();
return promise->get_future();
}使用方式:
auto future = AsyncQuery(pool, "SELECT * FROM users");
auto result = future.get(); // 阻塞等待结果四、性能优化策略
4.1 连接池优化
动态扩容:根据负载自动调整连接池大小
健康检查:定期检查连接有效性,自动重建失效连接
空闲超时:释放长时间未使用的连接
void ConnectionPool::health_check() {
while (m_health_check_running) {
std::this_thread::sleep_for(15s);
size_t healthy_count = 0;
for (auto& worker : m_workers) {
if (worker->check_connection()) {
healthy_count++;
}
}
std::cout << "Health check: " << healthy_count << "/"
<< m_workers.size() << " workers healthy\n";
}
}4.2 查询优化
预编译语句:减少 SQL 解析开销
批量操作:合并多个 INSERT/UPDATE
结果集处理:使用高效数据结构
vector<User> fetchUsers(int startId, int endId) {
vector<User> users;
users.reserve(100); // 预分配内存
auto conn = pool.getConnection();
auto pstmt = conn->prepareStatement(
"SELECT id, name, email FROM users WHERE id BETWEEN ? AND ?");
pstmt->setInt(1, startId);
pstmt->setInt(2, endId);
auto res = pstmt->executeQuery();
while(res->next()) {
users.emplace_back(User{
res->getInt("id"),
res->getString("name"),
res->getString("email")
});
}
pool.releaseConnection(conn);
return users;
}4.3 异步模式选择
五、错误处理与监控
5.1 统一错误处理
std::string wrap_error(const std::string& msg) {
return "{\"error\": \"" + msg + "\"}";
}
try {
auto result = co_await query(pool, sql);
// 处理结果
} catch (const sql::SQLException& e) {
std::cerr << "SQL Error: " << e.what()
<< " (code: " << e.getErrorCode() << ")\n";
co_return wrap_error("Database error");
} catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << "\n";
co_return wrap_error("System error");
}5.2 监控指标
连接池使用率
查询平均耗时
错误率统计
队列等待时间
六、扩展功能
6.1 事务支持
Task<> transfer_funds(MySQLPool& pool, int from, int to, double amount) {
try {
co_await query(pool, "BEGIN");
co_await query(pool, fmt::format(
"UPDATE accounts SET balance = balance - {} WHERE id = {}", amount, from));
co_await query(pool, fmt::format(
"UPDATE accounts SET balance = balance + {} WHERE id = {}", amount, to));
co_await query(pool, "COMMIT");
} catch (...) {
co_await query(pool, "ROLLBACK");
throw;
}
}6.2 参数化查询防注入
auto pstmt = conn->prepareStatement(
"SELECT * FROM users WHERE username = ? AND password = ?");
pstmt->setString(1, username);
pstmt->setString(2, password);
auto res = pstmt->executeQuery();七、完整示例:异步HTTP服务器
结合 Boost.Beast 和异步 MySQL 的完整示例:
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <mysql_connection.h>
#include <mysql_driver.h>
namespace asio = boost::asio;
namespace beast = boost::beast;
using tcp = asio::ip::tcp;
class AsyncMySQLHandler {
public:
AsyncMySQLHandler(asio::io_context& ioc, sql::mysql::MySQL_Driver* driver)
: m_ioc(ioc), m_driver(driver) {}
void handle_request(
beast::tcp_stream& stream,
beast::http::request<beast::http::string_body> req) {
if (req.target() == "/users") {
m_driver->async_query(
"SELECT id, name FROM users LIMIT 10",
[this, &stream, req = std::move(req)](auto ec, auto result) {
if (ec) {
send_error(stream, req, ec.message());
return;
}
beast::http::response<beast::http::string_body> res{
beast::http::status::ok, req.version()};
res.set(beast::http::field::content_type, "application/json");
res.body() = to_json(result);
res.prepare_payload();
stream.async_write(
res,
[&stream](auto ec, auto) { stream.close(); });
});
}
}
private:
asio::io_context& m_ioc;
sql::mysql::MySQL_Driver* m_driver;
void send_error(beast::tcp_stream& stream,
const beast::http::request<beast::http::string_body>& req,
const std::string& msg) {
beast::http::response<beast::http::string_body> res{
beast::http::status::internal_server_error, req.version()};
res.body() = "Error: " + msg;
res.prepare_payload();
stream.async_write(
res,
[&stream](auto ec, auto) { stream.close(); });
}
std::string to_json(const sql::ResultSet& res) {
// 转换结果集为JSON
}
};
int main() {
asio::io_context ioc;
tcp::acceptor acceptor(ioc, {tcp::v4(), 8080});
auto driver = sql::mysql::get_mysql_driver_instance();
AsyncMySQLHandler handler(ioc, driver);
beast::tcp_stream stream(ioc);
acceptor.async_accept(stream.socket(), [&](auto ec) {
if (!ec) {
beast::http::request<beast::http::string_body> req;
beast::http::async_read(stream, buffer, req,
[&](auto ec, auto bytes) {
if (!ec) handler.handle_request(stream, std::move(req));
});
}
});
ioc.run();
return 0;
}这个示例展示了如何将异步 MySQL 操作集成到异步 HTTP 服务器中,实现高性能的 Web 服务。
八、总结与最佳实践
连接池是基础:无论采用哪种异步方案,连接池都是提高性能的关键组件
根据场景选择技术:高并发选 Boost.Asio,代码简洁选协程,兼容性选线程池
监控不可少:实现连接健康检查和性能监控
错误处理要全面:统一处理数据库异常和网络错误
逐步优化:从简单实现开始,根据性能分析结果针对性优化
通过合理选择和组合这些技术,可以构建出高性能、可扩展的 C++ MySQL 异步数据库访问层,满足从简单应用到高并发服务的各种需求。