【c++】Linux MySQL连接池

佚名 / 2024-08-08 / 原文

#ifndef MYSQLCONNECTION_H
#define MYSQLCONNECTION_H
#include <iostream>
#include <mysql.h>
#include <vector>

class MySQLConnection
{
public:
	/// <summary>
	/// 初始化连接
	/// </summary>
	MySQLConnection();
	MySQLConnection(MySQLConnection& another) = delete;
	MySQLConnection& operator=(MySQLConnection& another) = delete;
	MySQLConnection(MySQLConnection&& another);
	MySQLConnection& operator=(MySQLConnection&& another);
	//断开连接
	~MySQLConnection();
	/// <summary>
	/// 连接服务器
	/// </summary>
	/// <param name="host">主机</param>
	/// <param name="user">用户名</param>
	/// <param name="pwd">密码</param>
	/// <param name="dbName">数据库名</param>
	/// <param name="port">端口(default=3306)</param>
	/// <returns></returns>
	bool connect(std::string host, std::string user, std::string pwd, std::string dbName, unsigned int port = 3306);
    
    bool disconnect();
	/// <summary>
	/// 更新数据库:insert、delete、update
	/// </summary>
	/// <param name="sqlUpdate">增删改sql语句</param>
	/// <returns></returns>
	bool update(std::string sqlUpdate);
	/// <summary>
	/// 查询数据库
	/// </summary>
	/// <param name="sqlQuery">查询sql语句</param>
	/// <returns></returns>
	bool query(std::string sqlQuery);
	/// <summary>
	/// 查看所有字段名
	/// </summary>
	/// <returns></returns>
	std::vector<std::string> fields();
	/// <summary>
	/// 遍历查询数据集,指向下一条记录
	/// </summary>
	/// <returns></returns>
	bool next();
	/// <summary>
	/// 通过字段的index获取值
	/// </summary>
	/// <param name="index"></param>
	/// <returns></returns>
	std::string value(unsigned int index);
	/// <summary>
	/// 通过字段名获取字段值
	/// </summary>
	/// <param name="fieldName"></param>
	/// <returns></returns>
	std::string value(std::string fieldName);
	/// <summary>
	/// 开启事务操作,设置成手动提交、创建保存点
	/// </summary>
	/// <returns></returns>
	bool startTransaction();
	/// <summary>
	/// 事务提交
	/// </summary>
	/// <returns></returns>
	bool commit();
	/// <summary>
	/// 事务回滚
	/// </summary>
	/// <returns></returns>
	bool rollback();
private:
	/// <summary>
	/// 清空上一次查询结果
	/// </summary>
	void freeQueryResult();

	MYSQL* mysqlConn = nullptr;	//MySQL连接
	MYSQL_RES* queryResult = nullptr;	//查询结果集
	std::vector<std::string> fieldNames;	//当前结果集的所有字段名
	MYSQL_ROW queryResultRow = nullptr;	//当前查询记录
};

#endif
#include "MySQLConnection.h"

MySQLConnection::MySQLConnection()
{
	mysqlConn = mysql_init(mysqlConn);	//初始化连接
}

MySQLConnection::MySQLConnection(MySQLConntion&& another)
{
    this->mysqlConn=another.mysqlConn;
    this->queryResult=another.mysqlConn;
    another->mysqlConn=nullptr;
    another->mysqlConn=nullptr;
}

MySQLConnection& MySQLConnection::operator=(MySQLConntion&& another)
{
    if(this->mysqlConn!=another.mysqlConn)
    {
        if(this->mysqlConn!=nullptr)
        {
            delete this->mysqlConn;
            this->mysqlConn=another.mysqlConn;
           	another.mysqlConn=nullptr;
        }
        if(this->queryResult!=nullptr)
        {
            mysql_free_result(this->queryResult);
            this->queryResult=another.queryResult;
		   this->fieldNames.swap(another.fieldNames);
        }
    }
    return *this;
}

MySQLConnection::~MySQLConnection()
{
	if (mysqlConn != nullptr)
	{
		mysql_close(mysqlConn);	//关闭连接
		freeQueryResult();	//清空结果集queryResult和结果集中字段fieldNames
	}
}

bool MySQLConnection::connect(std::string host, std::string user, std::string pwd, std::string dbName, unsigned int port)
{
	mysqlConn = mysql_real_connect(mysqlConn, host.c_str(), user.c_str(), pwd.c_str(), dbName.c_str(), 
		port, nullptr, 0);	//失败返回NULL
	return mysqlConn != nullptr;	
}

bool MySQLConnection::disconnect()
{
    if (mysqlConn != nullptr)
	{
		mysql_close(mysqlConn);	//关闭连接
		freeQueryResult();	//清空结果集queryResult和结果集中字段fieldNames
	}
}

bool MySQLConnection::update(std::string sqlUpdate)
{
	return mysql_query(mysqlConn, sqlUpdate.c_str()) == 0;	//修改成功返回0,否则返回非0
}

bool MySQLConnection::query(std::string sqlQuery)
{
	if (mysql_query(mysqlConn, sqlQuery.c_str()) != 0)	//查询成功返回非0
	{
		return false;
	}

	freeQueryResult();	//清空上次的查询结果和字段名
	queryResult = mysql_store_result(mysqlConn);	//从MySQL服务器把查询结果拉到客户端
	return true;
}

std::vector<std::string> MySQLConnection::fields()
{
	if (queryResult == nullptr)
	{
		return fieldNames;
	}

	std::vector<std::string>(0).swap(fieldNames);	//清空上次查询记录的字段
	int colCount = mysql_num_fields(queryResult);	//获取字段的个数
	MYSQL_FIELD* fields = mysql_fetch_fields(queryResult);	//获取字段名数组

	for (int i = 0; i < colCount; i++)
	{
		fieldNames.emplace_back(fields[i].name);	//遍历存储字段名
	}
	return fieldNames;
}

bool MySQLConnection::next()
{
	if (queryResult == nullptr)
	{
		return false;
	}

	queryResultRow = mysql_fetch_row(queryResult);	//从结果集取下一条记录
	return queryResultRow != nullptr;	//如果已经取完,返回NULL
}

std::string MySQLConnection::value(unsigned int index)
{
	if (queryResult == nullptr || queryResultRow == nullptr)
	{
		return std::string();	//返回空字符串
	}

	if (index < 0 || index >= mysql_num_fields(queryResult))
	{
		return std::string();
	}

	//防止某个字段的值包含'\0',这样string(char*)到'\0'就会停止
	// char str[] = { '1', '2','3','4','5','\0','6','7' };
	//std::string s(str);	//12345
	//string(char*,int)遇到'\0'也不会停止,会把指定个数的char都包含
	unsigned long* colRealLengths = mysql_fetch_lengths(queryResult);	//获取该字段值的实际长度
	return std::string(queryResultRow[index], colRealLengths[index]);	
}

std::string MySQLConnection::value(std::string fieldName)
{
	if (queryResult == nullptr || queryResultRow == nullptr)
	{
		return std::string();
	}

	if (fieldName == "")
	{
		return std::string();
	}

	for (unsigned int i = 0; i < fieldNames.size(); i++)
	{
		if (fieldName == fieldNames[i])
		{
			return value(i);
		}
	}
	return std::string();
}

void MySQLConnection::freeQueryResult()
{
	if (queryResult != nullptr)
	{
		mysql_free_result(queryResult);
		std::vector<std::string>(0).swap(fieldNames);
	}
}

bool MySQLConnection::startTransaction()
{
	return mysql_autocommit(mysqlConn, 0) == 0;
}

bool MySQLConnection::commit()
{
	return mysql_commit(mysqlConn) == 0;
}

bool MySQLConnection::rollback()
{
	return mysql_rollback(mysqlConn) == 0;
}
#ifndef MYSQLCONNPOOL_H
#define MYSQLCONNPOOL_H
#include "MySQLConntion.h"
#include<pthread.h>
#include<assert.h>
#include<vector>

class MysqlConnPool
{
private:
    int m_minSize;
    int m_maxSize;
    int m_size;
    int m_countLocked;
    vector<MySQLConntion> m_conns;
    vector<pthread_mutex_t> m_mutexs;
    pthread_t pthManage;
    
private:
    void manage(void* arg)	//连接池size调整
    {
        while(true)
        {
            if(m_countLocked>0.8*m_size && 1.2*m_size<=m_maxSize)
       		{
                //增加连接
                int i=m_size;
                m_size=1.2*m_size;
           	   	for(;i<m_size;++i)
                {
                    MySQLConntion con;
                    m_conns.push_back(con);
                    pthread_mutex_t mtx;
                    m_mutexs.push_back(mtx);
                }
       		}
       		else if(m_countLocked<0.4*m_size && 0.8*m_size>=m_minSize && 0.8*m_size<=m_maxSize)
       		{
           		//减少连接
                int i=m_size;
                m_size=0.8*m_size;
                while(i>m_size)
                {
                    if(pthread_mutex_trylock(&m_mutexs[i])==0)
                    {
                        MySQLConntion con;
                        m_conns.push_back(con);
                        pthread_mutex_t mtx;
                        m_mutexs.push_back(mtx);    
                        --i;
                    }
                }
       		}
            
            sleep(5);	//每5s执行一次
        }
    }
	
public:
	MysqlConnPool(int minSize, int maxSize, int size):m_minSize(minSize),m_maxSize(maxSize),m_size(size)
	{
        assert(size>=m_minSize&&size<=m_maxSize);
        m_countLocked=0;
        for(int i=0;i<m_size;++i)
        {
          	MySQLConntion con;
            m_conns.push_back(con);
        }
        for(int i=0;i<m_size;++i)
        {
          	pthread_mutex_t mtx;
            m_mutexs.push_back(mtx);
        }
        
        pthread_create(&pthManage,NULL,manage,NULL);
	}

	~MysqlConnPoll()
	{
		freeconns();
         pthread_join(&pthManage,NULL);
	}

	// 初始化数据库连接池。
	bool initconns()
	{
         bool ret=true;
		//创建m_size个连接到db的连接
		for (int ii=0;ii<m_size;ii++)
		{
			ret=ret&&m_conns[ii].connect("127.0.0.1","root","root","dbtest");
		}
		//初始化m_size个互斥锁
		for(int i=0;i<m_size;i++)
		{
			if(pthread_mutex_init(&m_mutexs[i],NULL)!=0)
	  			return false;
		}
	
		return ret;
	}

	MySQLConnection* getconn()
	{
		//遍历连接队列,返回第一个可以上锁的连接(没有被其他线程使用)
		for(int i=0;i<m_size;i++)
		{
			if(pthread_mutex_trylock(&m_mutexs[i])==0)
             {
             	m_countLocked++;
                return &m_conns[i];
             }
		}
	
		return NULL;	//队列没有空闲连接
	}

	void freeconn(MySQLConnection* in_conn)
	{
		if(in_conn!=NULL)
		{
			for(int i=0;i<m_size;i++)
			{
	   			if(in_conn==&m_conns[i])
	   			{
	        		pthread_mutex_unlock(&m_mutexs[i]);
                     m_countLocked--;
	   			}
			}
		}
	}

	void freeconns()
	{
		for(int i=0;i<m_size;i++)
		{
             pthread_mutex_destroy(&m_mutexs[i]);	//互斥锁销毁
			m_conns[i].disconnect();	//连接断开
		}
         m_conns.clear();
         m_mutexs.clear();
         m_countLocked=0;
	}
};

#endif