跳转至

强制限定 Locust 的请求次数

约 124 个字 315 行代码 预计阅读时间 4 分钟

Warning

救急方法,不建议使用

背景

Locust 只提供了限定请求时长的限制,而没有请求次数的限制。

现需要严格限定请求次数为 500 次。

方法 1 - 重写类方法

http_session.py

import time

import requests
from locust import clients
from locust.clients import ResponseContextManager
from requests import RequestException


class HttpSession(clients.HttpSession):
    def request(self, method, url, name=None, catch_response=False, context={}, exec_if=lambda: False, **kwargs):
        """
        Constructs and sends a :py:class:`requests.Request`.
        Returns :py:class:`requests.Response` object.

        :param exec_if: 给定条件为真时才执行
        :param method: method for the new :class:`Request` object.
        :param url: URL for the new :class:`Request` object.
        :param name: (optional) An argument that can be specified to use as label in Locust's statistics instead of the URL path.
          This can be used to group different URL's that are requested into a single entry in Locust's statistics.
        :param catch_response: (optional) Boolean argument that, if set, can be used to make a request return a context manager
          to work as argument to a with statement. This will allow the request to be marked as a fail based on the content of the
          response, even if the response code is ok (2xx). The opposite also works, one can use catch_response to catch a request
          and then mark it as successful even if the response code was not (i.e 500 or 404).
        :param params: (optional) Dictionary or bytes to be sent in the query string for the :class:`Request`.
        :param data: (optional) Dictionary or bytes to send in the body of the :class:`Request`.
        :param headers: (optional) Dictionary of HTTP Headers to send with the :class:`Request`.
        :param cookies: (optional) Dict or CookieJar object to send with the :class:`Request`.
        :param files: (optional) Dictionary of ``'filename': file-like-objects`` for multipart encoding upload.
        :param auth: (optional) Auth tuple or callable to enable Basic/Digest/Custom HTTP Auth.
        :param timeout: (optional) How long in seconds to wait for the server to send data before giving up, as a float,
            or a (`connect timeout, read timeout <user/advanced.html#timeouts>`_) tuple.
        :type timeout: float or tuple
        :param allow_redirects: (optional) Set to True by default.
        :type allow_redirects: bool
        :param proxies: (optional) Dictionary mapping protocol to the URL of the proxy.
        :param stream: (optional) whether to immediately download the response content. Defaults to ``False``.
        :param verify: (optional) if ``True``, the SSL cert will be verified. A CA_BUNDLE path can also be provided.
        :param cert: (optional) if String, path to ssl client cert file (.pem). If Tuple, ('cert', 'key') pair.
        """

        # if group name has been set and no name parameter has been passed in; set the name parameter to group_name
        if self.request_name and not name:
            name = self.request_name

        # prepend url with hostname unless it's already an absolute URL
        url = self._build_url(url)

        # TODO: MOD BY DingJunyao
        if exec_if is not False:
            if not exec_if():
                raise Exception('NOT IF')
        start_time = time.time()
        start_perf_counter = time.perf_counter()
        response = self._send_request_safe_mode(method, url, **kwargs)
        response_time = (time.perf_counter() - start_perf_counter) * 1000

        request_after_redirect = (response.history and response.history[0] or response).request
        url_after_redirect = request_after_redirect.path_url

        if self.user:
            context = {**self.user.context(), **context}

        # store meta data that is used when reporting the request to locust's statistics
        request_meta = {
            "request_type": method,
            "response_time": response_time,
            "name": name or url_after_redirect,
            "context": context,
            "response": response,
            "exception": None,
            "start_time": start_time,
            "url": request_after_redirect.url,
        }

        # get the length of the content, but if the argument stream is set to True, we take
        # the size from the content-length header, in order to not trigger fetching of the body
        if kwargs.get("stream", False):
            request_meta["response_length"] = int(response.headers.get("content-length") or 0)
        else:
            request_meta["response_length"] = len(response.content or b"")

        if catch_response:
            return ResponseContextManager(response, request_event=self.request_event, request_meta=request_meta)
        else:
            if name:
                # Since we use the Exception message when grouping failures, in order to not get
                # multiple failure entries for different URLs for the same name argument, we need
                # to temporarily override the response.url attribute
                orig_url = response.url
                response.url = name

            try:
                response.raise_for_status()
            except RequestException as e:
                while (
                    isinstance(
                        e,
                        (
                            requests.exceptions.ConnectionError,
                            requests.packages.urllib3.exceptions.ProtocolError,
                            requests.packages.urllib3.exceptions.MaxRetryError,
                            requests.packages.urllib3.exceptions.NewConnectionError,
                        ),
                    )
                    and e.__context__  # Not sure if the above exceptions can ever be the lowest level, but it is good to be sure
                ):
                    e = e.__context__
                request_meta["exception"] = e

            self.request_event.fire(**request_meta)
            if name:
                response.url = orig_url
            return response

http_user.py

import locust
from locust.exception import LocustError

from http_session import HttpSession


class HttpUser(locust.HttpUser):
    """
    Represents an HTTP "user" which is to be spawned and attack the system that is to be load tested.

    The behaviour of this user is defined by its tasks. Tasks can be declared either directly on the
    class by using the :py:func:`@task decorator <locust.task>` on methods, or by setting
    the :py:attr:`tasks attribute <locust.User.tasks>`.

    This class creates a *client* attribute on instantiation which is an HTTP client with support
    for keeping a user session between requests.
    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        if self.host is None:
            raise LocustError(
                "You must specify the base host. Either in the host attribute in the User class, or on the command line using the --host option."
            )

        self.client = HttpSession(
            base_url=self.host,
            request_event=self.environment.events.request,
            user=self,
            pool_manager=self.pool_manager,
        )
        """
        Instance of HttpSession that is created upon instantiation of Locust.
        The client supports cookies, and therefore keeps the session between HTTP requests.
        """
        self.client.trust_env = False

测试代码:

import json, datetime

import gevent
import pymssql
import requests
from locust import task
from locust.stats import stats_printer, stats_history
import locust_plugins

from http_user import HttpUser

ALL_NUM = 0
REQ_SUCCESS_NUM = 0
REQ_FAIL_NUM = 0
QUERY_NUM = 0
TOKEN = ''
TIME = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')


def get_token():
    ...

class AddRecord(HttpUser):
    global ALL_NUM, REQ_SUCCESS_NUM, REQ_FAIL_NUM
        print(f'>>> current ALL: {ALL_NUM}')
        json_dict = json.loads(...)
        headers = {...}
        if ALL_NUM < 500:
            ALL_NUM += 1
            with self.client.request(
                    '...',
                    "...",
                    json=json_dict,
                    headers=headers,
                    catch_response=True,
                    # 这里添加当什么情况下执行
                    exec_if=lambda: ALL_NUM <= 500
            ) as response:
                if response.json()['code'] == 200:
                    response.success()
                    REQ_SUCCESS_NUM += 1
                else:
                    response.failure(response.json())
                    REQ_FAIL_NUM += 1


if __name__ == "__main__":
    get_token()
    print(TOKEN)
    from locust.env import Environment

    env = Environment(user_classes=[AddRecord], host='...')
    env.create_local_runner()

    # start a WebUI instance
    env.create_web_ui("127.0.0.1", 8089)

    # start a greenlet that periodically outputs the current stats
    gevent.spawn(stats_printer(env.stats))

    # start a greenlet that save current stats to history
    gevent.spawn(stats_history, env.runner)

    # start the test
    env.runner.start(10, spawn_rate=1)

    # in 60 seconds stop the runner
    # 这里还是要给一个比较大的数
    gevent.spawn_later(60, lambda: env.runner.quit())

    # wait for the greenlets
    env.runner.greenlet.join()

    # stop the web server for good measures
    env.web_ui.stop()

方法 2 - 修改模块源码

不推荐使用该方法,仅作救急使用。

修改模块源码

修改 Locust 的源码。

更改 clients.pyHttpSession.request 方法:

# TODO: MOD BY DingJunyao
# 这里添加当什么情况下执行的参数 exec_if,为函数。如果该函数执行结果为真,则执行请求;否则,不执行,报错。
    def request(self, method, url, name=None, catch_response=False, context={}, exec_if=False, **kwargs):
        """
        Constructs and sends a :py:class:`requests.Request`.
...
        """
...
        # prepend url with hostname unless it's already an absolute URL
        url = self._build_url(url)

        # TODO: MOD BY DingJunyao
        if exec_if is not False:
            if not exec_if():
                raise Exception('NOT IF')
        # TODO: MOD END
        start_time = time.time()
...

修改测试代码

import json, datetime

import gevent
import pymssql
import requests
from locust import HttpUser, task
from locust.stats import stats_printer, stats_history
import locust_plugins

ALL_NUM = 0
REQ_SUCCESS_NUM = 0
REQ_FAIL_NUM = 0
QUERY_NUM = 0
TOKEN = ''
TIME = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')


def get_token():
    ...

class AddRecord(HttpUser):
    global ALL_NUM, REQ_SUCCESS_NUM, REQ_FAIL_NUM
        print(f'>>> current ALL: {ALL_NUM}')
        json_dict = json.loads(...)
        headers = {...}
        if ALL_NUM < 500:
            ALL_NUM += 1
            with self.client.request(
                    '...',
                    "...",
                    json=json_dict,
                    headers=headers,
                    catch_response=True,
                    # 这里添加当什么情况下执行
                    exec_if=lambda: ALL_NUM <= 500
            ) as response:
                if response.json()['code'] == 200:
                    response.success()
                    REQ_SUCCESS_NUM += 1
                else:
                    response.failure(response.json())
                    REQ_FAIL_NUM += 1


if __name__ == "__main__":
    get_token()
    print(TOKEN)
    from locust.env import Environment

    env = Environment(user_classes=[AddRecord], host='...')
    env.create_local_runner()

    # start a WebUI instance
    env.create_web_ui("127.0.0.1", 8089)

    # start a greenlet that periodically outputs the current stats
    gevent.spawn(stats_printer(env.stats))

    # start a greenlet that save current stats to history
    gevent.spawn(stats_history, env.runner)

    # start the test
    env.runner.start(10, spawn_rate=1)

    # in 60 seconds stop the runner
    # 这里还是要给一个比较大的数
    gevent.spawn_later(60, lambda: env.runner.quit())

    # wait for the greenlets
    env.runner.greenlet.join()

    # stop the web server for good measures
    env.web_ui.stop()