0. 前言

随着国产化进程的加速推进,众多关键业务领域正逐步开启国产化转型。例如,操作系统和数据库系统等核心组件,都在向国产解决方案迁移。相较于国外产品,国产化方案可以有效缓解“卡脖子”带来的问题,还能有效保障数据安全,提供更加定制化的服务。

崖山数据库(YaShan DataBase)是深圳计算科学研究院自主研发的一款国产数据库, 自己搞了一个”有界计算“理论,高度兼容Oracle数据库。而在11.14号崖山数据库推出了YashanDB V23.3版本,宣称1:1平替Oracle,提供单机主备、共享集群、分布式等形态,且性能强劲,可用性高。

我通过了崖山数据库的管理员认证(YCA认证),正在备考更高级的认证(YCP),听说集群版本可以公开体验,准备测试一下。

本文包括安装部署和性能测试两部分。

[TOC]

1.环境搭建

高可用使用虚拟机进行测试,搭建一主一备的环境,模拟常见使用环境.

首先安装好系统、设置主机名和固定ip地址。这里不再赘述,

角色主机名CPU/内存/硬盘局域网IP地址系统
主机YashanDB14/4GB/50GB192.168.7.227Centos 8.1
备机1YashanDB24/4GB/50GB192.168.7.231Centos 8.1
存储服务器YashanDB34/4GB/50GB192.168.7.232Centos 8.1

1.1 安装前准备

官网推荐创建一个新用户安装YashanDB数据库:

创建安装用户

# 创建用户
useradd yashan 
# 创建用户组
groupadd YASDBA && usermod -a -G YASDBA yashan
# 设置免密登录(需要有sudo命令)
echo "yashan  ALL=(ALL)NOPASSWD:ALL" | EDITOR='tee -a' visudo
# 设置密码
passwd yashan

当然,集群一个一个地设置非常麻烦,于是我写了一个脚本,需要主机有sudo命令,使用的时候只需要传密码参数即可:

使用:wget -qO- http://dl.foxhank.top/yashandb_preinstall.sh | sh /dev/stdin <新用户密码>

如:wget -qO- http://dl.foxhank.top/yashandb_preinstall.sh | sh /dev/stdin 114514

如果主机没有sudo命令,需要首先下载:apt install sudo(deb)/yum install sudo(rpm)

#!/bin/bash

# Check if a password parameter has been provided
if [ -z "$1" ]; then
    echo "Please enter the password:"
    read -s password
else
    password=$1
fi

# Add user 'yashan' with home directory, bash shell, and set the password
useradd -m -d /home/yashan -s /bin/bash yashan
echo "yashan:$password" | chpasswd

# Create the YASDBA group and add user 'yashan' to this group
groupadd YASDBA
usermod -aG YASDBA yashan

# Check for the existence of the /etc/sudoers file
if [ ! -f "/etc/sudoers" ]; then
    # Check if the visudo command exists
    if command -v visudo > /dev/null 2>&1; then
        echo "visudo command detected but /etc/sudoers file not found. This might be a non-standard configuration, please check."
    else
        echo "sudo package not detected. Please try to install it using the following methods:"
        echo "Debian/Ubuntu/Deepin: apt-get install sudo"
        echo "Redhat/Centos: yum install sudo"
        exit 1
    fi
fi

# Add 'yashan ALL=(ALL)NOPASSWD:ALL' to the sudoers file
# Use visudo to avoid potential issues from directly editing /etc/sudoers
echo "yashan  ALL=(ALL)NOPASSWD:ALL" | EDITOR='tee -a' visudo

echo "Script execution completed."
[root@localhost ~]# wget -qO- http://dl.foxhank.top/yashandb_preinstall.sh | sh /dev/stdin  114514
yashan  ALL=(ALL)NOPASSWD:ALL
Script execution completed.

image-20241219160950527

1.2 主机安装

首先需要下载安装包:YashanDB 下载中心

mkdir -p /home/yashan/install && cd /home/yashan/install
wget https://linked.yashandb.com/upload1010/yashandb-23.2.4.100-linux-x86_64.tar.gz && tar -zxvf yashandb-*.gz

1.2.1 命令行安装

切换到安装目录后生成配置文件:需要把<上文设置的密码>更改为刚才设置的密码

./bin/yasboot package ce gen \
--cluster yashandb \
-u yashan \
-p <上文设置的密码> \
--ip 192.168.7.227,192.168.7.231 \
--port 22 \
--install-path /data/yashan/yasdb_home \
--data-path /data/yashan/yasdb_data \
--begin-port 1688 \
--node 2 \
--data /dev/yas/data \
--vote /dev/yas/vote \
--ycr /dev/yas/ycr

参数详解:yasboot package | YashanDB Doc

参数含义
--cluster集群安装,集群名称名称:yashandb
-u服务器ssh用户名yashan
-pssh密码刚才设置的密码,比如我设为114514
--ip集群内服务器ip地址集群内网ip地址
--portssh端口22
--install-path安装目录,保持默认即可/data/yashan/yasdb_home
--data-path数据存放目录,保持默认即可/data/yashan/yasdb_data
--begin-port数据库起始端口默认:1688
--node节点数量2
--data绑定的数据盘路径/dev/yas/data
--vote投票盘路径,协调集群内各个节点工作/dev/yas/vote
--ycr绑定的YCR盘路径/dev/yas/ycr

执行完毕后,当前目录下将生成yashandb.toml(数据库集群的配置文件)和hosts.toml(服务器的配置文件)两个配置文件,可对其进行手动修改,这里保持默认.

随后执行安装命令:

./bin/yasboot package install -t hosts.toml -i yashandb-23.2.4.100-linux-x86_64.tar.gz

进行数据库部署:

./bin/yasboot cluster deploy -t yashandb.toml --yfs-force-create

提示task completed, status: SUCCESS即为安装成功.image-20241227232408108

随后重启数据库执行更改:

./bin/yasboot cluster restart -c yashandb

1.2.2 可视化安装

共享集群部署仅支持使用命令行安装,可视化安装为分布式部署:

分布式部署可以将数据库系统分散到多个独立的计算节点上,每个节点都拥有自己的存储和计算资源,并且能够独立处理查询请求。单个节点发生故障,其他节点仍然可以继续工作。
而共享集群部署是在一个或多个机器上运行多个数据库实例,所有实例共享同一套硬件资源(如CPU、内存、存储等)。数据库的读写操作会被分布到不同的实例上执行,但底层的数据存储是共享的。这种部署方式的写操作必须通过单一的主节点进行;而且如果共享的存储出现故障,整个集群都会受到影响。

简单的说:分布式部署通过多个独立节点横向扩展,适合高可用性和大规模数据处理,而共享集群部署则是在多个实例间共享同一套硬件资源,更适合简化管理和读取扩展。

运行下面命令打开web页面(需要先在防火墙放开9001端口):

./bin/yasom --web --listen 0.0.0.0:9001

之后打开ip:9001,即可进入配置页面

image-20241219165249767image-20241219165325412

点击”尝试连接“,显示连接成功即完成。

image-20241219165410419

下一步是设置权限。但因为上文已经给了sudo权限,所以直接默认设置跳过即可:

image-20241219165513721

节点规模设置填写部分可以按照上文表格,选择节点数量,节点端口默认为1688

image-20241219190202813

之后一直下一步即可,提示安装成功即完成安装流程.

1.3 客户端安装

访问客户端,下载对应系统的安装包:这里以Windows系统安装amd64版本为例:
image-20241227212549499

下载后解压到一个文件夹后,在“我的电脑”点击右键,选择“属性”-”高级系统设置“

image-20241227212857071

随后选择“环境变量”,在Path变量增加刚才解压出来的两个文件夹:

image-20241227213004497

然后Win+R打开CMD,输入命令测试:

yasql <用户名>/<密码>@<主机ip地址>:1688

image-20241227213203776

出现SQL>则表示连接成功.

2.性能测试

为了直观地测试数据库性能,我写了一系列python脚本进行测试,python安装参考YashanDB Python驱动使用介绍 | YashanDB Doc

这里踩了一个小坑:安装python的路径最好不要有中文,否则会出现YAS-08045空指针错误。同时非常感谢群内的数据库大佬,非常耐心地为我一步一步排查问题!

由于代码过长,均放置于4.代码附录部分:

2.1 单次查询

我们可以通过对创建(create)、查询(select)、插入(insert)、和删除(delete)这四种基本操作的执行时间进行比较。

在同服务器部署的环境下,崖山数据库在这四个方面的操作执行时间上均展示出良好表现。特别是创建表的操作,MySQL在处理创建表请求时花费了大约两秒钟的时间才完成响应,而相比之下,崖山数据库仅用了0.11秒即完成了相同的操作。这一结果依靠于崖山数据库原创的“有界计算理论”,在处理DDL(数据定义语言)语句非常高效,在面对大规模或复杂的表结构创建时,能够提供更快捷的服务响应速度。

对于其他三种操作,即查询、插入和删除,崖山数据库同样展示了比MySQL更为优越的性能特点。在高并发访问环境下,这种优化就显得尤为重要了。

123db_io_performance_comparison

2.2 批量查询

为了测试大量数据插入的情景,随机生成一万个整数、和一千个浮点数,分别对崖山数据库、mysql数据库执行插入、修改、删除操作:

bulk_operations_performance_comparison

可以看到,崖山数据库在插入、更新、删除的表现均优于MySQL,这对于大量数据插入的场景时可以有显著的效果提升。甚至在部分场景,崖山数据库的操作时间是mysql的一半以上。

2.3 压力测试

为了评估崖山数据库在处理大量数据写入时的I/O性能,对其执行压力测试:持续三十秒内的插入、修改和删除操作,并记录每个操作的响应时间。

测试结果显示,崖山数据库的延迟表现极其出色,平均插入和删除操作的时间仅需0.几毫秒,同时在没有建立索引的情况下,查询操作的响应时间也稳定在1毫秒左右。

这表明崖山数据库在高负载条件下仍能保持高效的事务处理能力,非常适合用于应用后端数据库。

Pressure test completed over 30 seconds.
Total operations performed: 2001
Average time for update_data: 0.0005 seconds
Average time for insert_data: 0.0006 seconds
Average time for select_data: 0.0010 seconds
Average time for delete_data: 0.0005 seconds

stress_test_performance_single_thread

2.4 QPS测试

QPS压力测试是测试数据库不可不尝的一环,还是对数据库进行30s的查询操作,测试耗时和响应速度。

我的主机是一块西数4TB蓝盘,7200转,结果非常理想,几乎达到了磁盘I/O上限。平均查询只要0.3毫秒。QPS达到了3097,几乎是这块硬盘的上限了。

Total operations performed: 92914
Total time taken: 30.0004 seconds
Queries per second (QPS): 3097.0937
Average response time for select: 0.0003 seconds

qps_test_performance

可以说完全可以胜任大型业务的后端数据库了。

2.5 高可用测试

高可用的目的就是当一个服务器因为各种各样奇奇怪怪的原因死机之后仍能提供正常服务。那么就模拟“拔电源”操作,进行持续120s的读写,但是在50s时对集群某台幸运服务器进行断电操作!

使用python记录响应时间,在50s通过ssh连接到集群内备用机并执行init 0 命令。在50s时间段(标注小箭头处,由于线程原因断电时间比红线要略早一点点),响应时间有大幅度上升,但是在5s左右集群便进行了自我恢复,响应时间显著,影响业务时间极短,效果非常理想。

img

3.结论

经过测试,崖山数据库在数据增删改查操作上均表现了非常优异的性能,相较mysql有不错的性能提升。在国产化进程推进的今天,崖山数据库非常适合用于Oracle、mysql数据库之类的国产替代,同时相对于 Oracle 之类的大型数据库安装部署也更简单,特别适合个人初学者的数据库入门学习。


4.代码附录

4.1 单次查询速度测试

import yasdb
import mysql.connector
import matplotlib
matplotlib.use('Agg')  # 使用非交互式后端,适合无头环境
import matplotlib.pyplot as plt
import time

# 连接到崖山数据库
yasdb_connection = yasdb.connect(host="192.168.7.112", port=1688, user="sys", password="root")
yasdb_cursor = yasdb_connection.cursor()

# 连接到第二个数据库 (MySQL)
mysql_connection = mysql.connector.connect(
    host="127.0.0.1",
    port=3306,
    user="test",
    password="114514",
    database="data"  # 指定测试数据库名称
)
mysql_cursor = mysql_connection.cursor()

# 准备列表来存储各个操作的名称和它们对应的执行时间
operations = []
times_yasdb = []
times_mysql = []

def measure_time(operation_name, operation_func_yasdb, operation_func_mysql):
    start_time = time.time()
    operation_func_yasdb()
    end_time = time.time()
    elapsed_time_yasdb = end_time - start_time
    
    start_time = time.time()
    operation_func_mysql()
    end_time = time.time()
    elapsed_time_mysql = end_time - start_time
    
    operations.append(operation_name)
    times_yasdb.append(elapsed_time_yasdb)
    times_mysql.append(elapsed_time_mysql)

# 定义要测试的操作(为每个数据库定义)
def create_table_yasdb():
    yasdb_cursor.execute("drop table if exists bind_param_heap_1")
    yasdb_cursor.execute("create table bind_param_heap_1(a int, b double, c int)")

def create_table_mysql():
    mysql_cursor.execute("DROP TABLE IF EXISTS bind_param_heap_1")
    mysql_cursor.execute("CREATE TABLE bind_param_heap_1(a INT, b DOUBLE, c INT)")

def insert_data_yasdb():
    yasdb_cursor.executemany("insert into bind_param_heap_1 values(:1, :2, :3)", [
        (1, 10, 30), (2, 20, 40)])

def insert_data_mysql():
    mysql_cursor.executemany("INSERT INTO bind_param_heap_1 VALUES (%s, %s, %s)", [
        (1, 10, 30), (2, 20, 40)])

def delete_data_yasdb():
    yasdb_cursor.execute("delete from bind_param_heap_1 where a=:1", (1,))

def delete_data_mysql():
    mysql_cursor.execute("DELETE FROM bind_param_heap_1 WHERE a=%s", (1,))

def update_data_yasdb():
    yasdb_cursor.execute("update bind_param_heap_1 set c=:1", (50,))

def update_data_mysql():
    mysql_cursor.execute("UPDATE bind_param_heap_1 SET c=%s", (50,))

def select_data_yasdb():
    yasdb_cursor.execute("select * from bind_param_heap_1")
    yasdb_cursor.fetchall()

def select_data_mysql():
    mysql_cursor.execute("SELECT * FROM bind_param_heap_1")
    mysql_cursor.fetchall()

# 测量每个操作的执行时间
measure_time('Create Table', create_table_yasdb, create_table_mysql)
measure_time('Insert Data', insert_data_yasdb, insert_data_mysql)
yasdb_connection.commit()
mysql_connection.commit()
measure_time('Delete Data', delete_data_yasdb, delete_data_mysql)
yasdb_connection.commit()
mysql_connection.commit()
measure_time('Update Data', update_data_yasdb, update_data_mysql)
yasdb_connection.commit()
mysql_connection.commit()
measure_time('Select Data', select_data_yasdb, select_data_mysql)

# 提交事务并关闭连接
yasdb_connection.commit()
mysql_connection.commit()
yasdb_cursor.close()
mysql_cursor.close()
yasdb_connection.close()
mysql_connection.close()

# 绘制图表
fig, ax = plt.subplots(figsize=(10, 6))
index = range(len(operations))
bar_width = 0.35
opacity = 0.8

rects1 = plt.bar(index, times_yasdb, bar_width,
                 alpha=opacity,
                 color='b',
                 label='YASDB')

rects2 = plt.bar([i + bar_width for i in index], times_mysql, bar_width,
                 alpha=opacity,
                 color='g',
                 label='Mysql')

plt.xlabel('Operations')
plt.ylabel('Time in seconds')
plt.title('Database IO Performance Comparison')
plt.xticks([i + bar_width / 2 for i in index], operations, rotation=45)
plt.legend()

for rect in rects1 + rects2:
    height = rect.get_height()
    plt.text(rect.get_x() + rect.get_width()/2., height,
             f'{height:.4f}',
             ha='center', va='bottom')

plt.tight_layout()

# 保存图表为tiff文件
output_file = 'db_io_performance_comparison.png'
plt.savefig(output_file, format='png', dpi=300)

print(f"The result has been saved as {output_file}")

4.2 批量查询测试

import yasdb
import mysql.connector
import matplotlib
matplotlib.use('Agg')  # 使用非交互式后端,适合无头环境
import matplotlib.pyplot as plt
import time
import random

# 连接到第崖山数据库
yasdb_connection = yasdb.connect(host="192.168.7.112", port=1688, user="sys", password="root")
yasdb_cursor = yasdb_connection.cursor()

# 连接到MySQL
mysql_connection = mysql.connector.connect(
    host="127.0.0.1",
    port=3306,
    user="test",
    password="114514",
    database="data"  # 替换为您的数据库名称
)
mysql_cursor = mysql_connection.cursor()

# 定义要测试的操作
def create_table_yasdb():
    """创建表(YASDB)"""
    yasdb_cursor.execute("drop table if exists bind_param_heap_1")
    yasdb_cursor.execute("create table bind_param_heap_1(a int, b double, c int)")

def create_table_mysql():
    """创建表(MySQL)"""
    mysql_cursor.execute("DROP TABLE IF EXISTS bind_param_heap_1")
    mysql_cursor.execute("CREATE TABLE bind_param_heap_1(a INT, b DOUBLE, c INT)")

def generate_random_data(num_records):
    """生成num_records数量的随机数据"""
    data = []
    for _ in range(num_records):
        a = random.randint(1, 10000)  # 随机整数
        b = round(random.uniform(1, 1000), 2)  # 随机浮点数
        c = random.randint(1, 1000)  # 随机整数
        data.append((a, b, c))
    return data

random_data = generate_random_data(1000)

def bulk_insert_data_yasdb():
    """批量插入数据到YASDB"""
    yasdb_cursor.executemany("insert into bind_param_heap_1 values(:1, :2, :3)", random_data)

def bulk_insert_data_mysql():
    """批量插入数据到MySQL"""
    mysql_cursor.executemany("INSERT INTO bind_param_heap_1 VALUES (%s, %s, %s)", random_data)

def bulk_update_data_yasdb():
    """批量更新数据到YASDB"""
    update_data = [(random.randint(1, 1000), row[0]) for row in random_data]
    yasdb_cursor.executemany("update bind_param_heap_1 set c=:1 where a=:2", update_data)

def bulk_update_data_mysql():
    """批量更新数据到MySQL"""
    update_data = [(random.randint(1, 1000), row[0]) for row in random_data]
    mysql_cursor.executemany("UPDATE bind_param_heap_1 SET c=%s WHERE a=%s", update_data)

def bulk_delete_data_yasdb():
    """批量删除数据从YASDB"""
    delete_ids = [row[0] for row in random.sample(random_data, min(len(random_data), 500))]
    for id in delete_ids:
        yasdb_cursor.execute("delete from bind_param_heap_1 where a=:1", (id,))

def bulk_delete_data_mysql():
    """批量删除数据从MySQL"""
    delete_ids = [row[0] for row in random.sample(random_data, min(len(random_data), 500))]
    for id in delete_ids:
        mysql_cursor.execute("DELETE FROM bind_param_heap_1 WHERE a=%s", (id,))

def measure_time(operation_name, operation_func_yasdb, operation_func_mysql):
    start_time = time.time()
    operation_func_yasdb()
    end_time = time.time()
    elapsed_time_yasdb = end_time - start_time
    
    start_time = time.time()
    operation_func_mysql()
    end_time = time.time()
    elapsed_time_mysql = end_time - start_time
    
    operations.append(operation_name)
    times_yasdb.append(elapsed_time_yasdb)
    times_mysql.append(elapsed_time_mysql)

# 准备列表来存储各个操作的名称和它们对应的执行时间
operations = []
times_yasdb = []
times_mysql = []

# 创建表
create_table_yasdb()
create_table_mysql()

# 测量批量插入操作的执行时间
measure_time('Bulk Insert Data', bulk_insert_data_yasdb, bulk_insert_data_mysql)

# 提交事务
yasdb_connection.commit()
mysql_connection.commit()

# 测量执行时间
measure_time('Bulk Update Data', bulk_update_data_yasdb, bulk_update_data_mysql)

# 提交事务
yasdb_connection.commit()
mysql_connection.commit()

# 测量批量删除操作的执行时间
measure_time('Bulk Delete Data', bulk_delete_data_yasdb, bulk_delete_data_mysql)

# 提交事务
yasdb_connection.commit()
mysql_connection.commit()

# 关闭连接
yasdb_cursor.close()
mysql_cursor.close()
yasdb_connection.close()
mysql_connection.close()

# 绘制图表
fig, ax = plt.subplots(figsize=(10, 6))
index = range(len(operations))
bar_width = 0.35
opacity = 0.8

rects1 = plt.bar(index, times_yasdb, bar_width,
                 alpha=opacity,
                 color='b',
                 label='YASDB')

rects2 = plt.bar([i + bar_width for i in index], times_mysql, bar_width,
                 alpha=opacity,
                 color='g',
                 label='MySQL')

plt.xlabel('Operations')
plt.ylabel('Time in seconds')
plt.title('Database IO Performance Comparison - Bulk Operations')
plt.xticks([i + bar_width / 2 for i in index], operations, rotation=45)
plt.legend()

for rect in rects1 + rects2:
    height = rect.get_height()
    plt.text(rect.get_x() + rect.get_width()/2., height,
             f'{height:.4f}',
             ha='center', va='bottom')

plt.tight_layout()

# 保存图表为PNG文件
output_file = 'bulk_operations_performance_comparison.png'
plt.savefig(output_file, format='png', dpi=300)

print(f"The chart has been saved as {output_file}")

4.3 压力测试

import yasdb
import time
import random
import matplotlib.pyplot as plt

# 连接到YASDB数据库
yasdb_connection = yasdb.connect(host="192.168.7.112", port=1688, user="sys", password="root")
yasdb_cursor = yasdb_connection.cursor()

# 创建表(如果需要的话)
def create_table():
    yasdb_cursor.execute("drop table if exists stress_test_table")
    yasdb_cursor.execute("create table stress_test_table(id int, value double)")

create_table()
yasdb_connection.commit()

# 定义要执行的操作函数,并记录时间
operation_times = []

def generate_random_data():
    """生成随机数据"""
    id_val = random.randint(1, 10000)
    value = round(random.uniform(1, 1000), 2)
    return (id_val, value)

def perform_operation(operation_func, data):
    """执行操作并记录时间"""
    start_time = time.time()
    operation_func(data)
    end_time = time.time()
    elapsed_time = end_time - start_time
    operation_name = operation_func.__name__
    operation_times.append((operation_name, elapsed_time))

def insert_data(data):
    """插入数据"""
    yasdb_cursor.execute("insert into stress_test_table values(:1, :2)", data)

def update_data(data):
    """更新数据"""
    yasdb_cursor.execute("update stress_test_table set value=:2 where id=:1", data)

def delete_data(data):
    """删除数据"""
    yasdb_cursor.execute("delete from stress_test_table where id=:1", (data[0],))

def select_data(data):
    """查询数据"""
    yasdb_cursor.execute("select * from stress_test_table where id=:1", (data[0],))
    yasdb_cursor.fetchall()

# 设置压力测试参数
test_duration = 30  # 测试持续时间,单位为秒
operations_performed = 0

start_time = time.time()
while time.time() - start_time < test_duration:
    data = generate_random_data()
    operations = [insert_data, update_data, delete_data, select_data]
    operation = random.choice(operations)
    perform_operation(operation, data)
    yasdb_connection.commit()
    operations_performed += 1

# 关闭连接
yasdb_cursor.close()
yasdb_connection.close()

print(f"Pressure test completed with single thread over {test_duration} seconds.")
print(f"Total operations performed: {operations_performed}")

# 分析性能指标
from collections import defaultdict
operation_stats = defaultdict(list)

for operation_name, elapsed_time in operation_times:
    operation_stats[operation_name].append(elapsed_time)

# 计算平均响应时间和其他统计信息
averages = {op: sum(times)/len(times) for op, times in operation_stats.items()}

# 打印统计信息
for op, avg_time in averages.items():
    print(f"Average time for {op}: {avg_time:.4f} seconds")

# 绘制折线图
fig, ax = plt.subplots(figsize=(10, 6))

# 将操作名称映射到整数索引以便绘图
operation_indices = {op: idx for idx, op in enumerate(operation_stats.keys())}
time_series = {idx: [] for idx in operation_indices.values()}

for op, times in operation_stats.items():
    idx = operation_indices[op]
    for t in times:
        time_series[idx].append(t)

# 绘制每种操作的折线图
for idx, series in time_series.items():
    ax.plot(series, label=list(operation_indices.keys())[idx])

plt.xlabel('Operation Index')
plt.ylabel('Time in seconds')
plt.title('Database Operation Performance Over Time')
plt.legend(title='Operations')

plt.tight_layout()

# 保存图表为tiff文件
output_file = 'stress_test_performance_single_thread.png'
plt.savefig(output_file, format='png', dpi=300)

print(f"The performance chart has been saved as {output_file}")

4.4 QPS测试

import yasdb
import time
import random
import matplotlib.pyplot as plt

# 连接到YASDB数据库
yasdb_connection = yasdb.connect(host="192.168.7.112", port=1688, user="sys", password="root")
yasdb_cursor = yasdb_connection.cursor()

# 创建表
def create_table():
    yasdb_cursor.execute("drop table if exists qps_test_table")
    yasdb_cursor.execute("create table qps_test_table(id int, value double)")

create_table()
yasdb_connection.commit()

# 定义要执行的操作函数,并记录时间
operation_times = []

def generate_random_data():
    """生成随机数据"""
    id_val = random.randint(1, 10000)
    value = round(random.uniform(1, 1000), 2)
    return (id_val, value)

def insert_data(data):
    """插入数据"""
    yasdb_cursor.execute("insert into qps_test_table values(:1, :2)", data)

def select_data(data_id):
    """查询数据"""
    yasdb_cursor.execute("select * from qps_test_table where id=:1", (data_id,))
    yasdb_cursor.fetchall()

# 设置压力测试参数
test_duration = 30  # 测试持续时间,单位为秒
operations_performed = 0

# 准备一些随机数据供查询使用
random_ids = [random.randint(1, 10000) for _ in range(1000)]

# 开始QPS测试
start_time = time.time()
end_time = start_time + test_duration
while time.time() < end_time:
    data_id = random.choice(random_ids)
    start_query_time = time.time()
    select_data(data_id)
    end_query_time = time.time()
    elapsed_time = end_query_time - start_query_time
    operation_times.append(elapsed_time)
    operations_performed += 1
    yasdb_connection.commit()

# 关闭连接
yasdb_cursor.close()
yasdb_connection.close()

# 计算QPS和其他统计信息
total_time = time.time() - start_time
qps = operations_performed / total_time
print(f"Total operations performed: {operations_performed}")
print(f"Total time taken: {total_time:.4f} seconds")
print(f"Queries per second (QPS): {qps:.4f}")

# 分析性能指标
from collections import defaultdict
operation_stats = defaultdict(list)

for elapsed_time in operation_times:
    operation_stats['select'].append(elapsed_time)

# 计算平均响应时间和其他统计信息
averages = {op: sum(times)/len(times) for op, times in operation_stats.items()}
average_response_time = averages.get('select', 0)

# 打印统计信息
print(f"Average response time for select: {average_response_time:.4f} seconds")

# 绘制折线图
fig, ax = plt.subplots(figsize=(10, 6))

# 绘制每种操作的折线图
ax.plot(operation_times, label='Select Operation')

plt.xlabel('Query Index')
plt.ylabel('Time in seconds')
plt.title('Database Query Performance Over Time')
plt.legend(title='Operations')

plt.tight_layout()

# 保存图表为tiff文件
output_file = 'qps_test_performance.png'
plt.savefig(output_file, format='png', dpi=300)

print(f"The performance chart has been saved as {output_file}")

4.5 高可用测试

import yasdb
import threading
import time
import random
from concurrent.futures import ThreadPoolExecutor
import matplotlib
matplotlib.use('Agg')  # 使用非交互式后端,适合无头环境
import matplotlib.pyplot as plt

# 连接到崖山数据库
yasdb_connection = yasdb.connect(host="192.168.7.112", port=1688, user="sys", password="root")
yasdb_cursor = yasdb_connection.cursor()

# 准备列表来存储各个查询的执行时间和它们对应的索引
query_times = []
timestamps = []

def select_data():
    """执行一次select查询"""
    start_time = time.time()
    yasdb_cursor.execute("select * from bind_param_heap_1 where a=:1", (random.randint(1, 10000),))
    yasdb_cursor.fetchall()
    end_time = time.time()
    elapsed_time = end_time - start_time
    query_times.append(elapsed_time)
    timestamps.append(start_time)

def perform_selects(duration):
    """持续执行select查询直到达到指定的时间长度"""
    start_time = time.time()
    while time.time() - start_time < duration:
        select_data()
        yasdb_connection.commit()

def shutdown_server():
    """通过SSH连接到服务器并执行关机命令"""
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    try:
        ssh.connect('192.168.7.231', username='root', password='your_password')  # 替换为实际密码
        stdin, stdout, stderr = ssh.exec_command('sudo init 0')
        print(stdout.read().decode())
        print(stderr.read().decode())
    finally:
        ssh.close()

def plot_results():
    """绘制查询响应时间图表"""
    fig, ax = plt.subplots(figsize=(12, 6))
    
    # 绘制查询响应时间
    ax.plot(timestamps, query_times, label='Query Response Time', color='blue', alpha=0.7)
    
    # 标记出执行关机命令的时间点
    if timestamps: 
        shutdown_timestamp = min(timestamps) + 40
        ax.axvline(x=shutdown_timestamp, color='red', linestyle='--', label='Shutdown Command Executed')
        
        # 添加注释说明关机开始时间点
        ax.annotate('Downtime Start', xy=(shutdown_timestamp, max(query_times)), 
                    xytext=(shutdown_timestamp - 5, max(query_times) + 0.1),
                    arrowprops=dict(facecolor='black', shrink=0.05))

    # 设置图表属性
    ax.set_xlabel('Time (seconds)')
    ax.set_ylabel('Response Time (seconds)')
    ax.set_title('Query Response Time Over Time with Server Shutdown')
    ax.legend()
    ax.grid(True)
    
    # 保存图表为PNG文件
    output_file = 'query_response_time_with_shutdown.png'
    plt.savefig(output_file, format='png', dpi=300)
    print(f"The chart has been saved as {output_file}")

if __name__ == "__main__":
    # 创建表
    def create_table_yasdb():
        yasdb_cursor.execute("drop table if exists bind_param_heap_1")
        yasdb_cursor.execute("create table bind_param_heap_1(a int, b double, c int)")

    def bulk_insert_data_yasdb():
        data = [(i, round(random.uniform(1, 1000), 2), i) for i in range(1, 10001)]
        yasdb_cursor.executemany("insert into bind_param_heap_1 values(:1, :2, :3)", data)

    create_table_yasdb()
    bulk_insert_data_yasdb()  # 确保有数据可以查询
    yasdb_connection.commit()

    # 设置测试参数
    test_duration = 180  # 测试持续时间,单位为秒
    shutdown_time = 40   # 执行shutdown命令的时间点

    # 使用线程池执行两个任务:一个是持续执行查询,另一个是在指定时间点执行关机命令
    with ThreadPoolExecutor(max_workers=2) as executor:
        future1 = executor.submit(perform_selects, test_duration)
        if time.time() + shutdown_time <= time.time() + test_duration:
            future2 = executor.submit(shutdown_server)
            time.sleep(shutdown_time)  # 等待到指定时间点再执行关机命令

    # 关闭连接
    yasdb_cursor.close()
    yasdb_connection.close()

    # 输出结果
    print(f"Total queries performed: {len(query_times)}")
    if query_times:
        print(f"Average response time: {sum(query_times)/len(query_times):.4f} seconds")

    # 绘制图表
    plot_results()