Locust 只提供了限定请求时长的限制,而没有请求次数的限制。
现需要严格限定请求次数为 500 次。
http_session.py
import time
import requests
from locust import clients
from locust.clients import ResponseContextManager
from requests import RequestException
class HttpSession(clients.HttpSession):
def request(self, method, url, name=None, catch_response=False, context={}, exec_if=lambda: False, **kwargs):
"""
Constructs and sends a :py:class:`requests.Request`.
Returns :py:class:`requests.Response` object.
:param exec_if: 给定条件为真时才执行
:param method: method for the new :class:`Request` object.
:param url: URL for the new :class:`Request` object.
:param name: (optional) An argument that can be specified to use as label in Locust's statistics instead of the URL path.
This can be used to group different URL's that are requested into a single entry in Locust's statistics.
:param catch_response: (optional) Boolean argument that, if set, can be used to make a request return a context manager
to work as argument to a with statement. This will allow the request to be marked as a fail based on the content of the
response, even if the response code is ok (2xx). The opposite also works, one can use catch_response to catch a request
and then mark it as successful even if the response code was not (i.e 500 or 404).
:param params: (optional) Dictionary or bytes to be sent in the query string for the :class:`Request`.
:param data: (optional) Dictionary or bytes to send in the body of the :class:`Request`.
:param headers: (optional) Dictionary of HTTP Headers to send with the :class:`Request`.
:param cookies: (optional) Dict or CookieJar object to send with the :class:`Request`.
:param files: (optional) Dictionary of ``'filename': file-like-objects`` for multipart encoding upload.
:param auth: (optional) Auth tuple or callable to enable Basic/Digest/Custom HTTP Auth.
:param timeout: (optional) How long in seconds to wait for the server to send data before giving up, as a float,
or a (`connect timeout, read timeout <user/advanced.html#timeouts>`_) tuple.
:type timeout: float or tuple
:param allow_redirects: (optional) Set to True by default.
:type allow_redirects: bool
:param proxies: (optional) Dictionary mapping protocol to the URL of the proxy.
:param stream: (optional) whether to immediately download the response content. Defaults to ``False``.
:param verify: (optional) if ``True``, the SSL cert will be verified. A CA_BUNDLE path can also be provided.
:param cert: (optional) if String, path to ssl client cert file (.pem). If Tuple, ('cert', 'key') pair.
"""
# if group name has been set and no name parameter has been passed in; set the name parameter to group_name
if self.request_name and not name:
name = self.request_name
# prepend url with hostname unless it's already an absolute URL
url = self._build_url(url)
# TODO: MOD BY DingJunyao
if exec_if is not False:
if not exec_if():
raise Exception('NOT IF')
start_time = time.time()
start_perf_counter = time.perf_counter()
response = self._send_request_safe_mode(method, url, **kwargs)
response_time = (time.perf_counter() - start_perf_counter) * 1000
request_after_redirect = (response.history and response.history[0] or response).request
url_after_redirect = request_after_redirect.path_url
if self.user:
context = {**self.user.context(), **context}
# store meta data that is used when reporting the request to locust's statistics
request_meta = {
"request_type": method,
"response_time": response_time,
"name": name or url_after_redirect,
"context": context,
"response": response,
"exception": None,
"start_time": start_time,
"url": request_after_redirect.url,
}
# get the length of the content, but if the argument stream is set to True, we take
# the size from the content-length header, in order to not trigger fetching of the body
if kwargs.get("stream", False):
request_meta["response_length"] = int(response.headers.get("content-length") or 0)
else:
request_meta["response_length"] = len(response.content or b"")
if catch_response:
return ResponseContextManager(response, request_event=self.request_event, request_meta=request_meta)
else:
if name:
# Since we use the Exception message when grouping failures, in order to not get
# multiple failure entries for different URLs for the same name argument, we need
# to temporarily override the response.url attribute
orig_url = response.url
response.url = name
try:
response.raise_for_status()
except RequestException as e:
while (
isinstance(
e,
(
requests.exceptions.ConnectionError,
requests.packages.urllib3.exceptions.ProtocolError,
requests.packages.urllib3.exceptions.MaxRetryError,
requests.packages.urllib3.exceptions.NewConnectionError,
),
)
and e.__context__ # Not sure if the above exceptions can ever be the lowest level, but it is good to be sure
):
e = e.__context__
request_meta["exception"] = e
self.request_event.fire(**request_meta)
if name:
response.url = orig_url
return response
http_user.py
:
import locust
from locust.exception import LocustError
from http_session import HttpSession
class HttpUser(locust.HttpUser):
"""
Represents an HTTP "user" which is to be spawned and attack the system that is to be load tested.
The behaviour of this user is defined by its tasks. Tasks can be declared either directly on the
class by using the :py:func:`@task decorator <locust.task>` on methods, or by setting
the :py:attr:`tasks attribute <locust.User.tasks>`.
This class creates a *client* attribute on instantiation which is an HTTP client with support
for keeping a user session between requests.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.host is None:
raise LocustError(
"You must specify the base host. Either in the host attribute in the User class, or on the command line using the --host option."
)
self.client = HttpSession(
base_url=self.host,
request_event=self.environment.events.request,
user=self,
pool_manager=self.pool_manager,
)
"""
Instance of HttpSession that is created upon instantiation of Locust.
The client supports cookies, and therefore keeps the session between HTTP requests.
"""
self.client.trust_env = False
测试代码:
import json, datetime
import gevent
import pymssql
import requests
from locust import task
from locust.stats import stats_printer, stats_history
import locust_plugins
from http_user import HttpUser
ALL_NUM = 0
REQ_SUCCESS_NUM = 0
REQ_FAIL_NUM = 0
QUERY_NUM = 0
TOKEN = ''
TIME = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
def get_token():
...
class AddRecord(HttpUser):
global ALL_NUM, REQ_SUCCESS_NUM, REQ_FAIL_NUM
print(f'>>> current ALL: {ALL_NUM}')
json_dict = json.loads(...)
headers = {...}
if ALL_NUM < 500:
ALL_NUM += 1
with self.client.request(
'...',
"...",
json=json_dict,
headers=headers,
catch_response=True,
# 这里添加当什么情况下执行
exec_if=lambda: ALL_NUM <= 500
) as response:
if response.json()['code'] == 200:
response.success()
REQ_SUCCESS_NUM += 1
else:
response.failure(response.json())
REQ_FAIL_NUM += 1
if __name__ == "__main__":
get_token()
print(TOKEN)
from locust.env import Environment
env = Environment(user_classes=[AddRecord], host='...')
env.create_local_runner()
# start a WebUI instance
env.create_web_ui("127.0.0.1", 8089)
# start a greenlet that periodically outputs the current stats
gevent.spawn(stats_printer(env.stats))
# start a greenlet that save current stats to history
gevent.spawn(stats_history, env.runner)
# start the test
env.runner.start(10, spawn_rate=1)
# in 60 seconds stop the runner
# 这里还是要给一个比较大的数
gevent.spawn_later(60, lambda: env.runner.quit())
# wait for the greenlets
env.runner.greenlet.join()
# stop the web server for good measures
env.web_ui.stop()
不推荐使用该方法,仅作救急使用。
修改 Locust 的源码。
更改 clients.py
的 HttpSession.request
方法:
# TODO: MOD BY DingJunyao
# 这里添加当什么情况下执行的参数 exec_if,为函数。如果该函数执行结果为真,则执行请求;否则,不执行,报错。
def request(self, method, url, name=None, catch_response=False, context={}, exec_if=False, **kwargs):
"""
Constructs and sends a :py:class:`requests.Request`.
...
"""
...
# prepend url with hostname unless it's already an absolute URL
url = self._build_url(url)
# TODO: MOD BY DingJunyao
if exec_if is not False:
if not exec_if():
raise Exception('NOT IF')
# TODO: MOD END
start_time = time.time()
...
import json, datetime
import gevent
import pymssql
import requests
from locust import HttpUser, task
from locust.stats import stats_printer, stats_history
import locust_plugins
ALL_NUM = 0
REQ_SUCCESS_NUM = 0
REQ_FAIL_NUM = 0
QUERY_NUM = 0
TOKEN = ''
TIME = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
def get_token():
...
class AddRecord(HttpUser):
global ALL_NUM, REQ_SUCCESS_NUM, REQ_FAIL_NUM
print(f'>>> current ALL: {ALL_NUM}')
json_dict = json.loads(...)
headers = {...}
if ALL_NUM < 500:
ALL_NUM += 1
with self.client.request(
'...',
"...",
json=json_dict,
headers=headers,
catch_response=True,
# 这里添加当什么情况下执行
exec_if=lambda: ALL_NUM <= 500
) as response:
if response.json()['code'] == 200:
response.success()
REQ_SUCCESS_NUM += 1
else:
response.failure(response.json())
REQ_FAIL_NUM += 1
if __name__ == "__main__":
get_token()
print(TOKEN)
from locust.env import Environment
env = Environment(user_classes=[AddRecord], host='...')
env.create_local_runner()
# start a WebUI instance
env.create_web_ui("127.0.0.1", 8089)
# start a greenlet that periodically outputs the current stats
gevent.spawn(stats_printer(env.stats))
# start a greenlet that save current stats to history
gevent.spawn(stats_history, env.runner)
# start the test
env.runner.start(10, spawn_rate=1)
# in 60 seconds stop the runner
# 这里还是要给一个比较大的数
gevent.spawn_later(60, lambda: env.runner.quit())
# wait for the greenlets
env.runner.greenlet.join()
# stop the web server for good measures
env.web_ui.stop()