import os
import sys
import time
import subprocess
from datetime import datetime
file_name = "scp.log"
def append_to_file(file_path, content):
try:
with open(file_path, 'a', encoding='utf-8') as file:
file.write(content + '\n')
print(f"成功将内容追加到文件: {file_path}")
except Exception as e:
print(f"追加内容时出错: {e}")
def create_test_file(size_mb, filename="scp_test.tmp"):
"""创建指定大小的测试文件"""
try:
with open(filename, 'wb') as f:
f.write(os.urandom(int(size_mb * 1024 * 1024))) # 生成随机二进制数据
return filename
except Exception as e:
append_to_file(file_name,f"创建测试文件失败: {str(e)}")
return None
def scp_transfer_test(host, username, password, local_file, port=22):
"""
调用系统scp命令测试传输
返回: 传输结果字典(含速度信息)
"""
if not os.path.exists(local_file):
append_to_file(file_name,"测试文件不存在")
return None
# 构建包含密码的命令(使用sshpass自动输入密码,需系统安装sshpass)
# 若系统无sshpass,可手动输入密码,但无法自动化测试
cmd = [
"sshpass",
"-p", password,
"scp",
"-o StrictHostKeyChecking=no",
"scp_test.tmp",
f"{username}@{host}:/tmp"
]
try:
file_size = os.path.getsize(local_file)
start_time = time.time()
# 执行scp命令
result = subprocess.run(
cmd
)
end_time = time.time()
elapsed = end_time - start_time
# 检查执行结果
if result.returncode != 0:
return {
"success": False,
"error": result.stderr.strip()
}
# 计算传输速度(MB/s)
speed_mb_s = (file_size / (1024 * 1024)) / elapsed if elapsed > 0 else 0
return {
"success": True,
"size_mb": round(file_size / (1024 * 1024), 2),
"time_sec": round(elapsed, 2),
"speed_mb_s": round(speed_mb_s, 2),
"local_file": local_file
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def main():
# 配置测试参数
scp_HOST = "10.51.6.60"
scp_USER = "sibajie"
scp_PASS = "sibajie25"
scp_PORT = 22
TEST_SIZE_MB = 1000 # 测试文件大小(MB)
append_to_file(file_name,"开始时间:"+datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
append_to_file(file_name,f"=== scp传输测试 ===")
append_to_file(file_name,f"目标服务器: {scp_HOST}:{scp_PORT}")
append_to_file(file_name,f"测试文件大小: {TEST_SIZE_MB}MB")
# 1. 创建测试文件
local_file = create_test_file(TEST_SIZE_MB)
if not local_file:
sys.exit(1)
# 2. 执行SCP上传测试
append_to_file(file_name,"开始上传测试...")
result = scp_transfer_test(
host=scp_HOST,
username=scp_USER,
password=scp_PASS,
local_file=local_file,
port=scp_PORT
)
# 3. 输出结果
if result and result["success"]:
append_to_file(file_name,f"传输成功!")
append_to_file(file_name,f"文件大小: {result['size_mb']}MB")
append_to_file(file_name,f"耗时: {result['time_sec']}秒")
append_to_file(file_name,f"传输速度: {result['speed_mb_s']} MB/s")
else:
append_to_file(file_name,f"传输失败: {result['error'] if result else '未知错误'}")
# 4. 清理本地测试文件
if os.path.exists(local_file):
os.remove(local_file)
if __name__ == "__main__":
main()
Python3测试通过。