bench.py 12 KB


  1. #!/usr/bin/env python3
  2. #
  3. # This script bootstraps an NSQ cluster in EC2 and runs benchmarks.
  4. #
  5. # Requires python3 and the following packages:
  6. # - boto3
  7. # - paramiko
  8. # - tornado
  9. #
  10. # AWS authentication is delegated entirely to the boto3 environment, see:
  11. #
  12. # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html
  13. #
  14. # EC2 instances are launched into EC2 Classic, expecting a 'default' security group
  15. # that allows allows SSH (port 22) from 0.0.0.0/0 and an EC2 key pair created
  16. # (named 'default', but configurable via --ssh-key-name).
  17. #
  18. import sys
  19. import logging
  20. import time
  21. import datetime
  22. import socket
  23. import warnings
  24. import hashlib
  25. import boto3
  26. import paramiko.client
  27. import paramiko.ssh_exception
  28. import tornado.options
  29. def ssh_connect_with_retries(host, retries=3, timeout=30):
  30. for i in range(retries):
  31. try:
  32. ssh_client = paramiko.client.SSHClient()
  33. ssh_client.set_missing_host_key_policy(paramiko.client.WarningPolicy())
  34. ssh_client.connect(host, username='ubuntu', timeout=timeout)
  35. return ssh_client
  36. except (socket.error, paramiko.ssh_exception.SSHException):
  37. if i == retries - 1:
  38. raise
  39. logging.warning('... re-trying to connect to %s:%d in 15s', host, 22)
  40. time.sleep(15)
  41. def ssh_cmd_async(ssh_client, cmd):
  42. transport = ssh_client.get_transport()
  43. chan = transport.open_session()
  44. chan.exec_command(cmd)
  45. return chan
  46. def ssh_cmd(ssh_client, cmd, timeout=2):
  47. transport = ssh_client.get_transport()
  48. chan = transport.open_session()
  49. chan.settimeout(timeout)
  50. chan.exec_command(cmd)
  51. stdout = b''
  52. stderr = b''
  53. while True:
  54. if chan.recv_ready():
  55. stdout += chan.recv(4096)
  56. continue
  57. if chan.recv_stderr_ready():
  58. stderr += chan.recv_stderr(4096)
  59. continue
  60. if chan.exit_status_ready():
  61. exit_status = chan.recv_exit_status()
  62. break
  63. time.sleep(0.1)
  64. if exit_status != 0:
  65. raise Exception('%r' % stderr)
  66. return stdout, stderr
  67. def get_session():
  68. return boto3.session.Session(region_name=tornado.options.options.region)
  69. def _bootstrap(addr):
  70. commit = tornado.options.options.commit
  71. golang_version = tornado.options.options.golang_version
  72. ssh_client = ssh_connect_with_retries(addr)
  73. for cmd in [
  74. 'wget https://storage.googleapis.com/golang/go%s.linux-amd64.tar.gz' % golang_version,
  75. 'sudo -S tar -C /usr/local -xzf go%s.linux-amd64.tar.gz' % golang_version,
  76. 'sudo -S apt-get update',
  77. 'sudo -S apt-get -y install git mercurial',
  78. 'mkdir -p go/src/github.com/nsqio',
  79. 'cd go/src/github.com/nsqio && git clone https://github.com/nsqio/nsq',
  80. 'cd go/src/github.com/nsqio/nsq && git checkout %s' % commit,
  81. 'cd go/src/github.com/nsqio/nsq/apps/nsqd && GO111MODULE=on /usr/local/go/bin/go build',
  82. 'cd go/src/github.com/nsqio/nsq/bench/bench_writer && GO111MODULE=on /usr/local/go/bin/go build',
  83. 'cd go/src/github.com/nsqio/nsq/bench/bench_reader && GO111MODULE=on /usr/local/go/bin/go build',
  84. 'sudo -S mkdir -p /mnt/nsq',
  85. 'sudo -S chmod 777 /mnt/nsq']:
  86. ssh_cmd(ssh_client, cmd, timeout=10)
  87. def bootstrap():
  88. session = get_session()
  89. ec2 = session.resource('ec2')
  90. total_count = tornado.options.options.nsqd_count + tornado.options.options.worker_count
  91. logging.info('launching %d instances', total_count)
  92. instances = ec2.create_instances(
  93. ImageId=tornado.options.options.ami,
  94. MinCount=total_count,
  95. MaxCount=total_count,
  96. KeyName=tornado.options.options.ssh_key_name,
  97. InstanceType=tornado.options.options.instance_type,
  98. SecurityGroups=['default'])
  99. logging.info('waiting for instances to launch...')
  100. while any(i.state['Name'] != 'running' for i in instances):
  101. waiting_for = [i.id for i in instances if i.state['Name'] != 'running']
  102. logging.info('... sleeping for 5s (waiting for %s)', ', '.join(waiting_for))
  103. time.sleep(5)
  104. for instance in instances:
  105. instance.load()
  106. for instance in instances:
  107. if not instance.tags:
  108. instance.create_tags(Tags=[{'Key': 'nsq_bench', 'Value': '1'}])
  109. try:
  110. c = 0
  111. for i in instances:
  112. c += 1
  113. logging.info('(%d) bootstrapping %s (%s)', c, i.public_dns_name, i.id)
  114. _bootstrap(i.public_dns_name)
  115. except Exception:
  116. logging.exception('bootstrap failed')
  117. decomm()
  118. def run():
  119. instances = _find_instances()
  120. logging.info('launching nsqd on %d host(s)', tornado.options.options.nsqd_count)
  121. nsqd_chans = []
  122. nsqd_hosts = instances[:tornado.options.options.nsqd_count]
  123. for instance in nsqd_hosts:
  124. try:
  125. ssh_client = ssh_connect_with_retries(instance.public_dns_name)
  126. for cmd in [
  127. 'sudo -S pkill -f nsqd',
  128. 'sudo -S rm -f /mnt/nsq/*.dat',
  129. 'GOMAXPROCS=32 ./go/src/github.com/nsqio/nsq/apps/nsqd/nsqd \
  130. --data-path=/mnt/nsq \
  131. --mem-queue-size=10000000 \
  132. --max-rdy-count=%s' % (tornado.options.options.rdy)]:
  133. nsqd_chans.append((ssh_client, ssh_cmd_async(ssh_client, cmd)))
  134. except Exception:
  135. logging.exception('failed')
  136. nsqd_tcp_addrs = [i.public_dns_name for i in nsqd_hosts]
  137. dt = datetime.datetime.utcnow()
  138. deadline = dt + datetime.timedelta(seconds=30)
  139. logging.info('launching %d producer(s) on %d host(s)',
  140. tornado.options.options.nsqd_count * tornado.options.options.worker_count,
  141. tornado.options.options.worker_count)
  142. worker_chans = []
  143. producer_hosts = instances[tornado.options.options.nsqd_count:]
  144. for instance in producer_hosts:
  145. for nsqd_tcp_addr in nsqd_tcp_addrs:
  146. topic = hashlib.md5(instance.public_dns_name.encode('utf-8')).hexdigest()
  147. try:
  148. ssh_client = ssh_connect_with_retries(instance.public_dns_name)
  149. for cmd in [
  150. 'GOMAXPROCS=2 \
  151. ./go/src/github.com/nsqio/nsq/bench/bench_writer/bench_writer \
  152. --topic=%s --nsqd-tcp-address=%s:4150 --deadline=\'%s\' --size=%d' % (
  153. topic, nsqd_tcp_addr, deadline.strftime('%Y-%m-%d %H:%M:%S'),
  154. tornado.options.options.msg_size)]:
  155. worker_chans.append((ssh_client, ssh_cmd_async(ssh_client, cmd)))
  156. except Exception:
  157. logging.exception('failed')
  158. if tornado.options.options.mode == 'pubsub':
  159. logging.info('launching %d consumer(s) on %d host(s)',
  160. tornado.options.options.nsqd_count * tornado.options.options.worker_count,
  161. tornado.options.options.worker_count)
  162. consumer_hosts = instances[tornado.options.options.nsqd_count:]
  163. for instance in consumer_hosts:
  164. for nsqd_tcp_addr in nsqd_tcp_addrs:
  165. topic = hashlib.md5(instance.public_dns_name.encode('utf-8')).hexdigest()
  166. try:
  167. ssh_client = ssh_connect_with_retries(instance.public_dns_name)
  168. for cmd in [
  169. 'GOMAXPROCS=8 \
  170. ./go/src/github.com/nsqio/nsq/bench/bench_reader/bench_reader \
  171. --topic=%s --nsqd-tcp-address=%s:4150 --deadline=\'%s\' --size=%d \
  172. --rdy=%d' % (
  173. topic, nsqd_tcp_addr, deadline.strftime('%Y-%m-%d %H:%M:%S'),
  174. tornado.options.options.msg_size, tornado.options.options.rdy)]:
  175. worker_chans.append((ssh_client, ssh_cmd_async(ssh_client, cmd)))
  176. except Exception:
  177. logging.exception('failed')
  178. stats = {
  179. 'bench_reader': {
  180. 'durations': [],
  181. 'mbytes': [],
  182. 'ops': []
  183. },
  184. 'bench_writer': {
  185. 'durations': [],
  186. 'mbytes': [],
  187. 'ops': []
  188. }
  189. }
  190. while worker_chans:
  191. for ssh_client, chan in worker_chans[:]:
  192. if chan.recv_ready():
  193. sys.stdout.write(chan.recv(4096))
  194. sys.stdout.flush()
  195. continue
  196. if chan.recv_stderr_ready():
  197. line = chan.recv_stderr(4096).decode('utf-8')
  198. if 'duration:' in line:
  199. kind = line.split(' ')[0][1:-1]
  200. parts = line.rsplit('duration:')[1].split('-')
  201. stats[kind]['durations'].append(float(parts[0].strip()[:-1]))
  202. stats[kind]['mbytes'].append(float(parts[1].strip()[:-4]))
  203. stats[kind]['ops'].append(float(parts[2].strip()[:-5]))
  204. sys.stdout.write(line)
  205. sys.stdout.flush()
  206. continue
  207. if chan.exit_status_ready():
  208. worker_chans.remove((ssh_client, chan))
  209. time.sleep(0.1)
  210. for kind, data in stats.items():
  211. if not data['durations']:
  212. continue
  213. max_duration = max(data['durations'])
  214. total_mb = sum(data['mbytes'])
  215. total_ops = sum(data['ops'])
  216. logging.info('[%s] %fs - %fmb/s - %fops/s - %fus/op',
  217. kind, max_duration, total_mb, total_ops,
  218. max_duration / total_ops * 1000 * 1000)
  219. for ssh_client, chan in nsqd_chans:
  220. chan.close()
  221. def _find_instances():
  222. session = get_session()
  223. ec2 = session.resource('ec2')
  224. return [i for i in ec2.instances.all() if
  225. i.state['Name'] == 'running' and any(t['Key'] == 'nsq_bench' for t in i.tags)]
  226. def decomm():
  227. instances = _find_instances()
  228. logging.info('terminating instances %s' % ','.join(i.id for i in instances))
  229. for instance in instances:
  230. instance.terminate()
  231. if __name__ == '__main__':
  232. tornado.options.define('region', type=str, default='us-east-1',
  233. help='EC2 region to launch instances')
  234. tornado.options.define('nsqd_count', type=int, default=3,
  235. help='how many nsqd instances to launch')
  236. tornado.options.define('worker_count', type=int, default=3,
  237. help='how many worker instances to launch')
  238. # ubuntu 18.04 HVM instance store us-east-1
  239. tornado.options.define('ami', type=str, default='ami-0938f2289b3fa3f5b',
  240. help='AMI ID')
  241. tornado.options.define('ssh_key_name', type=str, default='default',
  242. help='SSH key name')
  243. tornado.options.define('instance_type', type=str, default='c3.2xlarge',
  244. help='EC2 instance type')
  245. tornado.options.define('msg_size', type=int, default=200,
  246. help='size of message')
  247. tornado.options.define('rdy', type=int, default=10000,
  248. help='RDY count to use for bench_reader')
  249. tornado.options.define('mode', type=str, default='pubsub',
  250. help='the benchmark mode (pub, pubsub)')
  251. tornado.options.define('commit', type=str, default='master',
  252. help='the git commit')
  253. tornado.options.define('golang_version', type=str, default='1.14.3',
  254. help='the go version')
  255. tornado.options.parse_command_line()
  256. logging.getLogger('paramiko').setLevel(logging.WARNING)
  257. warnings.simplefilter('ignore')
  258. cmd_name = sys.argv[-1]
  259. cmd_map = {
  260. 'bootstrap': bootstrap,
  261. 'run': run,
  262. 'decomm': decomm
  263. }
  264. cmd = cmd_map.get(cmd_name, bootstrap)
  265. sys.exit(cmd())