C++ 异步请求 MySQL 数据库

在现代高性能 C++ 应用中,异步数据库操作是提升性能和资源利用率的关键技术。本文将详细介绍 C++ 中异步操作 MySQL 数据库的多种优化方案,涵盖从基础实现到高级架构设计。

一、技术选型与架构设计

1.1 主流异步技术栈

C++ 异步 MySQL 操作主要有以下几种技术路线:

  1. Boost.Asio + MySQL Connector/C++:基于事件驱动的异步 I/O 模型,适合构建高性能网络服务

  2. C++20 协程:使用协程简化异步代码编写,提供同步风格的异步编程体验

  3. REACT-CPP:基于回调的异步库,适合事件驱动型应用

  4. 线程池 +Future/Promise:传统但可靠的异步方案,兼容性最好

1.2 分层架构设计

推荐采用分层架构设计,各层职责明确:

+---------------------+
| HTTP/API接口层      |
| (如Boost.Beast)     |
+---------------------+
| 异步任务调度层      |
| (协程/线程池)       |
+---------------------+
| 数据库连接池层      |
| (MySQL连接管理)     |
+---------------------+
| 业务逻辑层          |
+---------------------+

这种架构下,HTTP 请求处理层负责接收请求,异步任务层负责调度数据库操作,连接池层管理物理连接,业务层处理核心逻辑。

二、环境搭建与依赖配置

2.1 基础依赖安装

对于 Boost.Asio 方案:

# Ubuntu/Debian系统
sudo apt-get install libboost-all-dev libmysqlcppconn-dev

# macOS系统
brew install boost mysql-connector-c++

对于 C++20 协程方案,需要支持 C++20 的编译器(g++ ≥ 13 或 clang++ ≥ 16)。

2.2 CMake配置示例

cmake_minimum_required(VERSION 3.10)
project(AsyncMySQL)

set(CMAKE_CXX_STANDARD 20)

# 查找Boost库
find_package(Boost 1.82 REQUIRED COMPONENTS system thread asio)
include_directories(${Boost_INCLUDE_DIRS})

# 查找MySQL Connector
find_package(MySQLConnectorCpp REQUIRED)
include_directories(${MYSQLCONNECTORCPP_INCLUDE_DIRS})

# 添加可执行文件
add_executable(server main.cpp database.cpp)

# 链接库
target_link_libraries(server
    ${Boost_LIBRARIES}
    ${MYSQLCONNECTORCPP_LIBRARIES}
    pthread
)

三、核心组件实现

3.1 异步连接池设计

数据库连接池是提高性能的关键组件,以下是线程安全的异步连接池实现:

class AsyncMySQLPool {
public:
    using Callback = std::function<void(std::shared_ptr<sql::ResultSet>)>;
    
    struct Task {
        std::string query;
        Callback callback;
    };

    AsyncMySQLPool(const std::string& host, const std::string& user, 
                  const std::string& password, const std::string& database,
                  size_t pool_size = 10);
    
    ~AsyncMySQLPool();
    
    void execute(const std::string& query, Callback callback);
    
private:
    class Worker {
    public:
        void add_task(Task task);
        bool check_connection();
        
    private:
        void run();
        void execute_task(const Task& task);
        void connect();
        
        std::queue<Task> m_queue;
        std::mutex m_mutex;
        std::condition_variable m_cv;
        std::unique_ptr<sql::Connection> m_conn;
        bool m_running = true;
    };
    
    std::vector<std::unique_ptr<Worker>> m_workers;
    std::atomic<size_t> m_next_worker = 0;
};

该连接池为每个工作线程维护独立的连接和任务队列,减少锁竞争。

3.2 基于协程的异步查询

对于支持 C++20 的项目,可以使用协程简化异步代码:

struct DBQueryAwaiter {
    std::string sql;
    MySQLPool& pool;
    std::exception_ptr error;
    std::string json_result;
    
    bool await_ready() { return false; }
    
    void await_suspend(std::coroutine_handle<> h) {
        std::thread([this, h] {
            try {
                auto conn = pool.get();
                MYSQL_RES* res = nullptr;
                mysql_query(conn, sql.c_str());
                res = mysql_use_result(conn);
                
                std::ostringstream oss;
                // 将结果转为JSON格式
                oss << "[";
                bool first_row = true;
                while (auto row = mysql_fetch_row(res)) {
                    if (!first_row) oss << ",";
                    oss << "{";
                    // 处理每行数据
                    for (int i = 0; i < mysql_num_fields(res); ++i) {
                        if (i > 0) oss << ",";
                        oss << "\"" << mysql_fetch_field_direct(res, i)->name 
                            << "\":\"" << (row[i] ? row[i] : "") << "\"";
                    }
                    oss << "}";
                    first_row = false;
                }
                oss << "]";
                mysql_free_result(res);
                pool.release(conn);
                json_result = oss.str();
            } catch (...) {
                error = std::current_exception();
            }
            h.resume();
        }).detach();
    }
    
    std::string await_resume() {
        if (error) std::rethrow_exception(error);
        return json_result;
    }
};

inline DBQueryAwaiter query(MySQLPool& pool, const std::string& sql) {
    return DBQueryAwaiter{sql, pool};
}

使用方式:

Task<> handle_users(MySQLPool& pool) {
    try {
        std::string json = co_await query(pool, "SELECT id, name, age FROM users LIMIT 10");
        std::cout << "[Result] " << json << "\n";
    } catch (const std::exception& e) {
        std::cerr << "[DB Error] " << e.what() << "\n";
    }
    co_return;
}

这种协程方式使异步代码看起来像同步代码一样直观。

3.3 Future/Promise模式实现

对于不支持协程的环境,可以使用 Future/Promise 模式:

auto AsyncQuery(MySQLPool& pool, const std::string& sql) {
    auto promise = std::make_shared<std::promise<QueryResult>>();
    
    std::thread([=, &pool] {
        try {
            auto conn = pool.getConnection();
            auto stmt = conn->createStatement();
            auto res = stmt->executeQuery(sql);
            
            QueryResult result;
            while (res->next()) {
                // 处理结果集
            }
            
            promise->set_value(result);
            pool.releaseConnection(conn);
        } catch (...) {
            promise->set_exception(std::current_exception());
        }
    }).detach();
    
    return promise->get_future();
}

使用方式:

auto future = AsyncQuery(pool, "SELECT * FROM users");
auto result = future.get(); // 阻塞等待结果

四、性能优化策略

4.1 连接池优化

  1. 动态扩容:根据负载自动调整连接池大小

  2. 健康检查:定期检查连接有效性,自动重建失效连接

  3. 空闲超时:释放长时间未使用的连接

void ConnectionPool::health_check() {
    while (m_health_check_running) {
        std::this_thread::sleep_for(15s);
        
        size_t healthy_count = 0;
        for (auto& worker : m_workers) {
            if (worker->check_connection()) {
                healthy_count++;
            }
        }
        
        std::cout << "Health check: " << healthy_count << "/" 
                 << m_workers.size() << " workers healthy\n";
    }
}

4.2 查询优化

  1. 预编译语句:减少 SQL 解析开销

  2. 批量操作:合并多个 INSERT/UPDATE

  3. 结果集处理:使用高效数据结构

vector<User> fetchUsers(int startId, int endId) {
    vector<User> users;
    users.reserve(100); // 预分配内存
    
    auto conn = pool.getConnection();
    auto pstmt = conn->prepareStatement(
        "SELECT id, name, email FROM users WHERE id BETWEEN ? AND ?");
    
    pstmt->setInt(1, startId);
    pstmt->setInt(2, endId);
    
    auto res = pstmt->executeQuery();
    while(res->next()) {
        users.emplace_back(User{
            res->getInt("id"),
            res->getString("name"),
            res->getString("email")
        });
    }
    
    pool.releaseConnection(conn);
    return users;
}

4.3 异步模式选择

场景

推荐方案

特点

高并发 HTTP 服务

Boost.Asio + 连接池

高吞吐量,低延迟

后台批处理任务

线程池 + Future

资源控制精细

现代 C++ 项目

C++20 协程

代码简洁,维护性好

简单应用

同步连接池

实现简单,适合低并发

五、错误处理与监控

5.1 统一错误处理

std::string wrap_error(const std::string& msg) {
    return "{\"error\": \"" + msg + "\"}";
}

try {
    auto result = co_await query(pool, sql);
    // 处理结果
} catch (const sql::SQLException& e) {
    std::cerr << "SQL Error: " << e.what() 
              << " (code: " << e.getErrorCode() << ")\n";
    co_return wrap_error("Database error");
} catch (const std::exception& e) {
    std::cerr << "Error: " << e.what() << "\n";
    co_return wrap_error("System error");
}

5.2 监控指标

  1. 连接池使用率

  2. 查询平均耗时

  3. 错误率统计

  4. 队列等待时间

六、扩展功能

6.1 事务支持

Task<> transfer_funds(MySQLPool& pool, int from, int to, double amount) {
    try {
        co_await query(pool, "BEGIN");
        co_await query(pool, fmt::format(
            "UPDATE accounts SET balance = balance - {} WHERE id = {}", amount, from));
        co_await query(pool, fmt::format(
            "UPDATE accounts SET balance = balance + {} WHERE id = {}", amount, to));
        co_await query(pool, "COMMIT");
    } catch (...) {
        co_await query(pool, "ROLLBACK");
        throw;
    }
}

6.2 参数化查询防注入

auto pstmt = conn->prepareStatement(
    "SELECT * FROM users WHERE username = ? AND password = ?");
pstmt->setString(1, username);
pstmt->setString(2, password);
auto res = pstmt->executeQuery();

七、完整示例:异步HTTP服务器

结合 Boost.Beast 和异步 MySQL 的完整示例:

#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <mysql_connection.h>
#include <mysql_driver.h>

namespace asio = boost::asio;
namespace beast = boost::beast;
using tcp = asio::ip::tcp;

class AsyncMySQLHandler {
public:
    AsyncMySQLHandler(asio::io_context& ioc, sql::mysql::MySQL_Driver* driver)
        : m_ioc(ioc), m_driver(driver) {}
    
    void handle_request(
        beast::tcp_stream& stream,
        beast::http::request<beast::http::string_body> req) {
        
        if (req.target() == "/users") {
            m_driver->async_query(
                "SELECT id, name FROM users LIMIT 10",
                [this, &stream, req = std::move(req)](auto ec, auto result) {
                    if (ec) {
                        send_error(stream, req, ec.message());
                        return;
                    }
                    
                    beast::http::response<beast::http::string_body> res{
                        beast::http::status::ok, req.version()};
                    res.set(beast::http::field::content_type, "application/json");
                    res.body() = to_json(result);
                    res.prepare_payload();
                    
                    stream.async_write(
                        res,
                        [&stream](auto ec, auto) { stream.close(); });
                });
        }
    }
    
private:
    asio::io_context& m_ioc;
    sql::mysql::MySQL_Driver* m_driver;
    
    void send_error(beast::tcp_stream& stream, 
                   const beast::http::request<beast::http::string_body>& req,
                   const std::string& msg) {
        beast::http::response<beast::http::string_body> res{
            beast::http::status::internal_server_error, req.version()};
        res.body() = "Error: " + msg;
        res.prepare_payload();
        stream.async_write(
            res,
            [&stream](auto ec, auto) { stream.close(); });
    }
    
    std::string to_json(const sql::ResultSet& res) {
        // 转换结果集为JSON
    }
};

int main() {
    asio::io_context ioc;
    tcp::acceptor acceptor(ioc, {tcp::v4(), 8080});
    
    auto driver = sql::mysql::get_mysql_driver_instance();
    AsyncMySQLHandler handler(ioc, driver);
    
    beast::tcp_stream stream(ioc);
    acceptor.async_accept(stream.socket(), [&](auto ec) {
        if (!ec) {
            beast::http::request<beast::http::string_body> req;
            beast::http::async_read(stream, buffer, req,
                [&](auto ec, auto bytes) {
                    if (!ec) handler.handle_request(stream, std::move(req));
                });
        }
    });
    
    ioc.run();
    return 0;
}

这个示例展示了如何将异步 MySQL 操作集成到异步 HTTP 服务器中,实现高性能的 Web 服务。

八、总结与最佳实践

  1. 连接池是基础:无论采用哪种异步方案,连接池都是提高性能的关键组件

  2. 根据场景选择技术:高并发选 Boost.Asio,代码简洁选协程,兼容性选线程池

  3. 监控不可少:实现连接健康检查和性能监控

  4. 错误处理要全面:统一处理数据库异常和网络错误

  5. 逐步优化:从简单实现开始,根据性能分析结果针对性优化

通过合理选择和组合这些技术,可以构建出高性能、可扩展的 C++ MySQL 异步数据库访问层,满足从简单应用到高并发服务的各种需求。


C++ 异步请求 MySQL 数据库
https://uniomo.com/archives/c-yi-bu-qing-qiu-mysql-shu-ju-ku
作者
雨落秋垣
发布于
2025年09月20日
许可协议