目录
  1. 1. task_manager.py
  2. 2. task_worker.py
  3. 3. locustTcp.py
分布式高并发tcp压力测试

task_manager.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support, Queue
from locustTcp import locust

# 任务个数
task_number = 3

# 收发队列
task_que = Queue(task_number)
result_queue = Queue(task_number)


def get_task():
return task_que


def get_result():
return result_queue


# 创建类似的queueManager
class QueueManager(BaseManager):
pass


def win_run():

delay_time_average, delay_time_average_total = 0, 0
delay_time_max, delay_time_min, error_times, success_times = 0, 0, 0, 0
time_max = []
time_min = []
# 注册在网络上,callable 关联了Queue 对象
# 将Queue对象在网络中暴露
# window下绑定调用接口不能直接使用lambda,所以只能先定义函数再绑定
QueueManager.register('get_task_queue', callable=get_task)
QueueManager.register('get_result_queue', callable=get_result)
# 绑定端口和设置验证口令
manager = QueueManager(address=('192.168.43.140', 8001), authkey=b'locust')

# 启动管理,监听信息通道
manager.start()

try:
# 通过网络获取任务队列和结果队列
task = manager.get_task_queue()
result = manager.get_result_queue()

# 添加任务
for i in range(task_number):
n = locust
task.put(n)

print('try get result...')
for i in range(task_number):
data = result.get(timeout=10000)
print('result%d is %s' % (i+1, data))
data = eval(data)
print("平均往返时延(ms): %s 最大时延: %s 最小时延: %s 拒绝连接次数: %s 成功连接数: %s\n" %
(data[0], data[2], data[3], data[4], data[5]))
delay_time_average_total += data[0]
time_max.append(data[2])
time_min.append(data[3])
error_times += data[4]
success_times += data[5]

delay_time_average = delay_time_average_total/task_number
delay_time_max = max(time_max)
delay_time_min = min(time_min)
print("最终统计 平均往返时延(ms): %s 最大时延: %s 最小时延: %s 拒绝连接次数: %s 成功连接数: %s\n" %
(delay_time_average, delay_time_max, delay_time_min, error_times, success_times))

except Exception as e:
print('Manager error:', e)
finally:
manager.shutdown()
print("Work is done")


if __name__ == '__main__':
# window下多进程可能有问题,添加这句话缓解
freeze_support()
win_run()

task_worker.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#!/usr/bin/python3
# -*-coding:utf-8 -*-
from multiprocessing.managers import BaseManager

# 创建类似的QueueManager:
class QueueManager(BaseManager):
pass


# 实现第一步:使用QueueManager注册获取Queue的方法名称
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 实现第二步:连接到服务器:
server_addr = '192.168.43.140'
print('Connect to server %s...' % server_addr)
# 端口和验证口令注意保持与服务进程设置的完全一致:
m = QueueManager(address=(server_addr, 8001), authkey=b'locust')
# 从网络连接:
m.connect()
# 实现第三步:获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 实现第四步:从task队列取任务,并把结果写入result队列:
task_num = 0
while not task.empty():
locust = task.get(True, timeout=5) # 获取任务
task_num += 1
a = locust() # 执行任务
# result.put(str(a[-1][1])) # 传输数据到task_manager

result.put(str(a[-1][0])) # 传输数据到task_manager

# 处理结束:
print("任务完成个数: %d" % task_num)
print('worker exit.')

locustTcp.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#!/usr/bin/python3
# -*-coding:utf-8 -*-
from socket import *
import threading
import time

HOST = "122.51.252.101" # tcp压力测试ip 122.51.252.101
PORT = 7474
thread_number = 1000
BUF_SIZE = 1024
Address = (HOST, PORT)
message = "hello"
tcp_connect_data = []
threadLock = threading.Lock() # 进程锁
# AF_INET用于不同机器之间的通信 AF_UNIX只能用于本机内进程之间的通信。
# SOCK_STREAM是基于TCP的,数据传输比较有保障。SOCK_DGRAM是基于UDP的,专门用于局域网


# 获取毫秒级时间
def get_time_ms():
ct = time.time() # 时间戳
local_time = time.localtime(ct) # 本地化时间
cart_time_strftime = time.strftime("%Y-%m-%d %H:%M:%S", local_time) # 格式化时间
cart_time_strftime_ms = (ct - int(ct)) * 1000
ms = "%s.%03d" % (cart_time_strftime, cart_time_strftime_ms) # 拼接,获取毫秒级时间
return ms, ct


# 建立tcp连接
def tcp_connect():
error_flag = 0
tcp_socket = socket(AF_INET, SOCK_STREAM) # 创建socket对象 tcp连接
tcp_socket.settimeout(20) # 设置最大连接时间20s
time1 = get_time_ms()

try:
tcp_socket.connect(Address)
"""
receive_data = tcp_socket.recv(BUF_SIZE)
# print(receive_data.decode('utf-8'))
tcp_socket.send(message.encode('utf-8'))
"""
except Exception as e:
error_flag = 1
print("tcpConnError", e)
else:
pass
time2 = get_time_ms()
delay_time = (time2[1] - time1[1])*1000
data = delay_time, error_flag
tcp_connect_data.append(data)
return data


class MyThread(threading.Thread):

def __init__(self, thread_id):
threading.Thread.__init__(self)
self.threadId = thread_id

def run(self):
threadLock.acquire() # 获取锁,用于线程同步
tcp_connect()
threadLock.release() # 释放锁,开启下一个线程

def info(self):
print(self.threadId)


def locust():
test_thread = MyThread(0)
test_thread.start()
test_thread.join()
delay_time_max = tcp_connect_data[test_thread.threadId][0] # 初始化最大时延和最小时延
delay_time_min = delay_time_max
error_times, success_times, total_time = 0, 0, 0 # 初始化连接数
threads = []
data = []
for i in range(1, thread_number+1):
t = MyThread(i)
threads.append(t)
for t in threads:
# t.setDaemon(True) # 把多线程设置为守护线程
t.start() # 开始执行多线程
# t.info()
t.join() # 等待线程执行完成
delay_time = tcp_connect_data[t.threadId][0]
error_flag = tcp_connect_data[t.threadId][1]
if delay_time_max < delay_time: # 更新最大时延和最小时延
delay_time_max = delay_time
if delay_time_min > delay_time:
delay_time_min = delay_time
total_time += delay_time
delay_time_average = total_time/t.threadId
error_times += error_flag
success_times = t.threadId - error_times
print("%s 执行时间为 %s\n" % (t, get_time_ms()[0])) # 输出执行时间
print("平均往返时延(ms): %s 本次时延: %s 最大时延: %s 最小时延: %s 拒绝连接次数: %s 成功连接数: %s\n" %
(delay_time_average, delay_time, delay_time_max, delay_time_min, error_times, success_times))
thread_data = (delay_time_average, delay_time, delay_time_max, delay_time_min, error_times, success_times), t
data.append(thread_data)
return data


if __name__ == '__main__':
locust()
文章作者: nocbtm
文章链接: https://nocbtm.github.io/2019/12/10/分布式高并发tcp压力测试/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 nocbtm's Blog
打赏
  • 微信
  • 支付宝