WIP: use the package botdetection

This commit is contained in:
Alexandre Flament 2024-05-05 17:44:30 +00:00
parent ac430a9eaf
commit e826a71c2b
17 changed files with 98 additions and 1261 deletions

View File

@ -16,3 +16,4 @@ redis==5.0.4
markdown-it-py==3.0.0 markdown-it-py==3.0.0
fasttext-predict== fasttext-predict==
pytomlpp==1.0.13; python_version < '3.11' pytomlpp==1.0.13; python_version < '3.11'
botdetection @ git+ssh://git@github.com/dalf/botdetection.git@alt_impl

View File

@ -1,22 +0,0 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
""".. _botdetection src:
Implementations used for bot detection.
from ._helpers import dump_request
from ._helpers import get_real_ip
from ._helpers import get_network
from ._helpers import too_many_requests
__all__ = ['dump_request', 'get_network', 'get_real_ip', 'too_many_requests']
redis_client = None
cfg = None
def init(_cfg, _redis_client):
global redis_client, cfg # pylint: disable=global-statement
redis_client = _redis_client
cfg = _cfg

View File

@ -1,128 +0,0 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# pylint: disable=missing-module-docstring, invalid-name
from __future__ import annotations
from ipaddress import (
import flask
import werkzeug
from searx import logger
from . import config
logger = logger.getChild('botdetection')
def dump_request(request: flask.Request):
return (
+ " || X-Forwarded-For: %s" % request.headers.get('X-Forwarded-For')
+ " || X-Real-IP: %s" % request.headers.get('X-Real-IP')
+ " || form: %s" % request.form
+ " || Accept: %s" % request.headers.get('Accept')
+ " || Accept-Language: %s" % request.headers.get('Accept-Language')
+ " || Accept-Encoding: %s" % request.headers.get('Accept-Encoding')
+ " || Content-Type: %s" % request.headers.get('Content-Type')
+ " || Content-Length: %s" % request.headers.get('Content-Length')
+ " || Connection: %s" % request.headers.get('Connection')
+ " || User-Agent: %s" % request.headers.get('User-Agent')
def too_many_requests(network: IPv4Network | IPv6Network, log_msg: str) -> werkzeug.Response | None:
"""Returns a HTTP 429 response object and writes a ERROR message to the
'botdetection' logger. This function is used in part by the filter methods
to return the default ``Too Many Requests`` response.
logger.debug("BLOCK %s: %s", network.compressed, log_msg)
return flask.make_response(('Too Many Requests', 429))
def get_network(real_ip: IPv4Address | IPv6Address, cfg: config.Config) -> IPv4Network | IPv6Network:
"""Returns the (client) network of whether the real_ip is part of."""
if real_ip.version == 6:
prefix = cfg['real_ip.ipv6_prefix']
prefix = cfg['real_ip.ipv4_prefix']
network = ip_network(f"{real_ip}/{prefix}", strict=False)
# logger.debug("get_network(): %s", network.compressed)
return network
_logged_errors = []
def _log_error_only_once(err_msg):
if err_msg not in _logged_errors:
def get_real_ip(request: flask.Request) -> str:
"""Returns real IP of the request. Since not all proxies set all the HTTP
headers and incoming headers can be faked it may happen that the IP cannot
be determined correctly.
.. sidebar:: :py:obj:`flask.Request.remote_addr`
SearXNG uses Werkzeug's ProxyFix_ (with it default ``x_for=1``).
This function tries to get the remote IP in the order listed below,
additional some tests are done and if inconsistencies or errors are
detected, they are logged.
The remote IP of the request is taken from (first match):
- X-Forwarded-For_ header
- `X-real-IP header <https://github.com/searxng/searxng/issues/1237#issuecomment-1147564516>`__
- :py:obj:`flask.Request.remote_addr`
.. _ProxyFix:
.. _X-Forwarded-For:
forwarded_for = request.headers.get("X-Forwarded-For")
real_ip = request.headers.get('X-Real-IP')
remote_addr = request.remote_addr
# logger.debug(
# "X-Forwarded-For: %s || X-Real-IP: %s || request.remote_addr: %s", forwarded_for, real_ip, remote_addr
# )
if not forwarded_for:
_log_error_only_once("X-Forwarded-For header is not set!")
from . import cfg # pylint: disable=import-outside-toplevel, cyclic-import
forwarded_for = [x.strip() for x in forwarded_for.split(',')]
x_for: int = cfg['real_ip.x_for'] # type: ignore
forwarded_for = forwarded_for[-min(len(forwarded_for), x_for)]
if not real_ip:
_log_error_only_once("X-Real-IP header is not set!")
if forwarded_for and real_ip and forwarded_for != real_ip:
logger.warning("IP from X-Real-IP (%s) is not equal to IP from X-Forwarded-For (%s)", real_ip, forwarded_for)
if forwarded_for and remote_addr and forwarded_for != remote_addr:
"IP from WSGI environment (%s) is not equal to IP from X-Forwarded-For (%s)", remote_addr, forwarded_for
if real_ip and remote_addr and real_ip != remote_addr:
logger.warning("IP from WSGI environment (%s) is not equal to IP from X-Real-IP (%s)", remote_addr, real_ip)
request_ip = forwarded_for or real_ip or remote_addr or ''
# logger.debug("get_real_ip() -> %s", request_ip)
return request_ip

View File

@ -1,400 +0,0 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Configuration class :py:class:`Config` with deep-update, schema validation
and deprecated names.
The :py:class:`Config` class implements a configuration that is based on
structured dictionaries. The configuration schema is defined in a dictionary
structure and the configuration data is given in a dictionary structure.
from __future__ import annotations
from typing import Any
import copy
import typing
import logging
import pathlib
import tomllib
pytomlpp = None
except ImportError:
import pytomlpp
tomllib = None
__all__ = ['Config', 'UNSET', 'SchemaIssue']
log = logging.getLogger(__name__)
class FALSE:
"""Class of ``False`` singelton"""
# pylint: disable=multiple-statements
def __init__(self, msg):
self.msg = msg
def __bool__(self):
return False
def __str__(self):
return self.msg
__repr__ = __str__
class SchemaIssue(ValueError):
"""Exception to store and/or raise a message from a schema issue."""
def __init__(self, level: typing.Literal['warn', 'invalid'], msg: str):
self.level = level
def __str__(self):
return f"[cfg schema {self.level}] {self.args[0]}"
class Config:
"""Base class used for configuration"""
def from_toml(cls, schema_file: pathlib.Path, cfg_file: pathlib.Path, deprecated: dict) -> Config:
# init schema
log.debug("load schema file: %s", schema_file)
cfg = cls(cfg_schema=toml_load(schema_file), deprecated=deprecated)
if not cfg_file.exists():
log.warning("missing config file: %s", cfg_file)
return cfg
# load configuration
log.debug("load config file: %s", cfg_file)
upd_cfg = toml_load(cfg_file)
is_valid, issue_list = cfg.validate(upd_cfg)
for msg in issue_list:
if not is_valid:
raise TypeError(f"schema of {cfg_file} is invalid!")
return cfg
def __init__(self, cfg_schema: typing.Dict, deprecated: typing.Dict[str, str]):
"""Construtor of class Config.
:param cfg_schema: Schema of the configuration
:param deprecated: dictionary that maps deprecated configuration names to a messages
These values are needed for validation, see :py:obj:`validate`.
self.cfg_schema = cfg_schema
self.deprecated = deprecated
self.cfg = copy.deepcopy(cfg_schema)
def __getitem__(self, key: str) -> Any:
return self.get(key)
def validate(self, cfg: dict):
"""Validation of dictionary ``cfg`` on :py:obj:`Config.SCHEMA`.
Validation is done by :py:obj:`validate`."""
return validate(self.cfg_schema, cfg, self.deprecated)
def update(self, upd_cfg: dict):
"""Update this configuration by ``upd_cfg``."""
dict_deepupdate(self.cfg, upd_cfg)
def default(self, name: str):
"""Returns default value of field ``name`` in ``self.cfg_schema``."""
return value(name, self.cfg_schema)
def get(self, name: str, default: Any = UNSET, replace: bool = True) -> Any:
"""Returns the value to which ``name`` points in the configuration.
If there is no such ``name`` in the config and the ``default`` is
:py:obj:`UNSET`, a :py:obj:`KeyError` is raised.
parent = self._get_parent_dict(name)
val = parent.get(name.split('.')[-1], UNSET)
if val is UNSET:
if default is UNSET:
raise KeyError(name)
val = default
if replace and isinstance(val, str):
val = val % self
return val
def set(self, name: str, val):
"""Set the value to which ``name`` points in the configuration.
If there is no such ``name`` in the config, a :py:obj:`KeyError` is
parent = self._get_parent_dict(name)
parent[name.split('.')[-1]] = val
def _get_parent_dict(self, name):
parent_name = '.'.join(name.split('.')[:-1])
if parent_name:
parent = value(parent_name, self.cfg)
parent = self.cfg
if (parent is UNSET) or (not isinstance(parent, dict)):
raise KeyError(parent_name)
return parent
def path(self, name: str, default=UNSET):
"""Get a :py:class:`pathlib.Path` object from a config string."""
val = self.get(name, default)
if val is UNSET:
if default is UNSET:
raise KeyError(name)
return default
return pathlib.Path(str(val))
def pyobj(self, name, default=UNSET):
"""Get python object refered by full qualiffied name (FQN) in the config
fqn = self.get(name, default)
if fqn is UNSET:
if default is UNSET:
raise KeyError(name)
return default
(modulename, name) = str(fqn).rsplit('.', 1)
m = __import__(modulename, {}, {}, [name], 0)
return getattr(m, name)
def toml_load(file_name):
# Python >= 3.11
with open(file_name, "rb") as f:
return tomllib.load(f)
except tomllib.TOMLDecodeError as exc:
msg = str(exc).replace('\t', '').replace('\n', ' ')
log.error("%s: %s", file_name, msg)
# fallback to pytomlpp for Python < 3.11
return pytomlpp.load(file_name)
except pytomlpp.DecodeError as exc:
msg = str(exc).replace('\t', '').replace('\n', ' ')
log.error("%s: %s", file_name, msg)
# working with dictionaries
def value(name: str, data_dict: dict):
"""Returns the value to which ``name`` points in the ``dat_dict``.
.. code: python
>>> data_dict = {
"foo": {"bar": 1 },
"bar": {"foo": 2 },
"foobar": [1, 2, 3],
>>> value('foobar', data_dict)
[1, 2, 3]
>>> value('foo.bar', data_dict)
>>> value('foo.bar.xxx', data_dict)
ret_val = data_dict
for part in name.split('.'):
if isinstance(ret_val, dict):
ret_val = ret_val.get(part, UNSET)
if ret_val is UNSET:
return ret_val
def validate(
schema_dict: typing.Dict, data_dict: typing.Dict, deprecated: typing.Dict[str, str]
) -> typing.Tuple[bool, list]:
"""Deep validation of dictionary in ``data_dict`` against dictionary in
``schema_dict``. Argument deprecated is a dictionary that maps deprecated
configuration names to a messages::
deprecated = {
"foo.bar" : "config 'foo.bar' is deprecated, use 'bar.foo'",
"..." : "..."
The function returns a python tuple ``(is_valid, issue_list)``:
A bool value indicating ``data_dict`` is valid or not.
A list of messages (:py:obj:`SchemaIssue`) from the validation::
[schema warn] data_dict: deprecated 'fontlib.foo': <DEPRECATED['foo.bar']>
[schema invalid] data_dict: key unknown 'fontlib.foo'
[schema invalid] data_dict: type mismatch 'fontlib.foo': expected ..., is ...
If ``schema_dict`` or ``data_dict`` is not a dictionary type a
:py:obj:`SchemaIssue` is raised.
names = []
is_valid = True
issue_list = []
if not isinstance(schema_dict, dict):
raise SchemaIssue('invalid', "schema_dict is not a dict type")
if not isinstance(data_dict, dict):
raise SchemaIssue('invalid', f"data_dict issue{'.'.join(names)} is not a dict type")
is_valid, issue_list = _validate(names, issue_list, schema_dict, data_dict, deprecated)
return is_valid, issue_list
def _validate(
names: typing.List,
issue_list: typing.List,
schema_dict: typing.Dict,
data_dict: typing.Dict,
deprecated: typing.Dict[str, str],
) -> typing.Tuple[bool, typing.List]:
is_valid = True
for key, data_value in data_dict.items():
name = '.'.join(names)
deprecated_msg = deprecated.get(name)
# print("XXX %s: key %s // data_value: %s" % (name, key, data_value))
if deprecated_msg:
issue_list.append(SchemaIssue('warn', f"data_dict '{name}': deprecated - {deprecated_msg}"))
schema_value = value(name, schema_dict)
# print("YYY %s: key %s // schema_value: %s" % (name, key, schema_value))
if schema_value is UNSET:
if not deprecated_msg:
issue_list.append(SchemaIssue('invalid', f"data_dict '{name}': key unknown in schema_dict"))
is_valid = False
elif type(schema_value) != type(data_value): # pylint: disable=unidiomatic-typecheck
(f"data_dict: type mismatch '{name}':" f" expected {type(schema_value)}, is: {type(data_value)}"),
is_valid = False
elif isinstance(data_value, dict):
_valid, _ = _validate(names, issue_list, schema_dict, data_value, deprecated)
is_valid = is_valid and _valid
return is_valid, issue_list
def dict_deepupdate(base_dict: dict, upd_dict: dict, names=None):
"""Deep-update of dictionary in ``base_dict`` by dictionary in ``upd_dict``.
For each ``upd_key`` & ``upd_val`` pair in ``upd_dict``:
0. If types of ``base_dict[upd_key]`` and ``upd_val`` do not match raise a
1. If ``base_dict[upd_key]`` is a dict: recursively deep-update it by ``upd_val``.
2. If ``base_dict[upd_key]`` not exist: set ``base_dict[upd_key]`` from a
(deep-) copy of ``upd_val``.
3. If ``upd_val`` is a list, extend list in ``base_dict[upd_key]`` by the
list in ``upd_val``.
4. If ``upd_val`` is a set, update set in ``base_dict[upd_key]`` by set in
# pylint: disable=too-many-branches
if not isinstance(base_dict, dict):
raise TypeError("argument 'base_dict' is not a ditionary type")
if not isinstance(upd_dict, dict):
raise TypeError("argument 'upd_dict' is not a ditionary type")
if names is None:
names = []
for upd_key, upd_val in upd_dict.items():
# For each upd_key & upd_val pair in upd_dict:
if isinstance(upd_val, dict):
if upd_key in base_dict:
# if base_dict[upd_key] exists, recursively deep-update it
if not isinstance(base_dict[upd_key], dict):
raise TypeError(f"type mismatch {'.'.join(names)}: is not a dict type in base_dict")
+ [
# if base_dict[upd_key] not exist, set base_dict[upd_key] from deepcopy of upd_val
base_dict[upd_key] = copy.deepcopy(upd_val)
elif isinstance(upd_val, list):
if upd_key in base_dict:
# if base_dict[upd_key] exists, base_dict[up_key] is extended by
# the list from upd_val
if not isinstance(base_dict[upd_key], list):
raise TypeError(f"type mismatch {'.'.join(names)}: is not a list type in base_dict")
# if base_dict[upd_key] doesn't exists, set base_dict[key] from a deepcopy of the
# list in upd_val.
base_dict[upd_key] = copy.deepcopy(upd_val)
elif isinstance(upd_val, set):
if upd_key in base_dict:
# if base_dict[upd_key] exists, base_dict[up_key] is updated by the set in upd_val
if not isinstance(base_dict[upd_key], set):
raise TypeError(f"type mismatch {'.'.join(names)}: is not a set type in base_dict")
# if base_dict[upd_key] doesn't exists, set base_dict[upd_key] from a copy of the
# set in upd_val
base_dict[upd_key] = upd_val.copy()
# for any other type of upd_val replace or add base_dict[upd_key] by a copy
# of upd_val
base_dict[upd_key] = copy.copy(upd_val)

View File

@ -1,38 +0,0 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
Method ``http_accept``
The ``http_accept`` method evaluates a request as the request of a bot if the
Accept_ header ..
- did not contain ``text/html``
.. _Accept:
# pylint: disable=unused-argument
from __future__ import annotations
from ipaddress import (
import flask
import werkzeug
from . import config
from ._helpers import too_many_requests
def filter_request(
network: IPv4Network | IPv6Network,
request: flask.Request,
cfg: config.Config,
) -> werkzeug.Response | None:
if 'text/html' not in request.accept_mimetypes:
return too_many_requests(network, "HTTP header Accept did not contain text/html")
return None

View File

@ -1,40 +0,0 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
Method ``http_accept_encoding``
The ``http_accept_encoding`` method evaluates a request as the request of a
bot if the Accept-Encoding_ header ..
- did not contain ``gzip`` AND ``deflate`` (if both values are missed)
- did not contain ``text/html``
.. _Accept-Encoding:
# pylint: disable=unused-argument
from __future__ import annotations
from ipaddress import (
import flask
import werkzeug
from . import config
from ._helpers import too_many_requests
def filter_request(
network: IPv4Network | IPv6Network,
request: flask.Request,
cfg: config.Config,
) -> werkzeug.Response | None:
accept_list = [l.strip() for l in request.headers.get('Accept-Encoding', '').split(',')]
if not ('gzip' in accept_list or 'deflate' in accept_list):
return too_many_requests(network, "HTTP header Accept-Encoding did not contain gzip nor deflate")
return None

View File

@ -1,34 +0,0 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
Method ``http_accept_language``
The ``http_accept_language`` method evaluates a request as the request of a bot
if the Accept-Language_ header is unset.
.. _Accept-Language:
# pylint: disable=unused-argument
from __future__ import annotations
from ipaddress import (
import flask
import werkzeug
from . import config
from ._helpers import too_many_requests
def filter_request(
network: IPv4Network | IPv6Network,
request: flask.Request,
cfg: config.Config,
) -> werkzeug.Response | None:
if request.headers.get('Accept-Language', '').strip() == '':
return too_many_requests(network, "missing HTTP header Accept-Language")
return None

View File

@ -1,36 +0,0 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
Method ``http_connection``
The ``http_connection`` method evaluates a request as the request of a bot if
the Connection_ header is set to ``close``.
.. _Connection:
# pylint: disable=unused-argument
from __future__ import annotations
from ipaddress import (
import flask
import werkzeug
from . import config
from ._helpers import too_many_requests
def filter_request(
network: IPv4Network | IPv6Network,
request: flask.Request,
cfg: config.Config,
) -> werkzeug.Response | None:
if request.headers.get('Connection', '').strip() == 'close':
return too_many_requests(network, "HTTP header 'Connection=close")
return None

View File

@ -1,66 +0,0 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
Method ``http_user_agent``
The ``http_user_agent`` method evaluates a request as the request of a bot if
the User-Agent_ header is unset or matches the regular expression
.. _User-Agent:
# pylint: disable=unused-argument
from __future__ import annotations
import re
from ipaddress import (
import flask
import werkzeug
from . import config
from ._helpers import too_many_requests
+ r'unknown'
+ r'|[Cc][Uu][Rr][Ll]|[wW]get|Scrapy|splash|JavaFX|FeedFetcher|python-requests|Go-http-client|Java|Jakarta|okhttp'
+ r'|HttpClient|Jersey|Python|libwww-perl|Ruby|SynHttpClient|UniversalFeedParser|Googlebot|GoogleImageProxy'
+ r'|bingbot|Baiduspider|yacybot|YandexMobileBot|YandexBot|Yahoo! Slurp|MJ12bot|AhrefsBot|archive.org_bot|msnbot'
+ r'|MJ12bot|SeznamBot|linkdexbot|Netvibes|SMTBot|zgrab|James BOT|Sogou|Abonti|Pixray|Spinn3r|SemrushBot|Exabot'
+ r'|ZmEu|BLEXBot|bitlybot|HeadlessChrome'
# unmaintained Farside instances
+ r'|'
+ re.escape(r'Mozilla/5.0 (compatible; Farside/0.1.0; +https://farside.link)')
# other bots and client to block
+ '|.*PetalBot.*'
+ r')'
"""Regular expression that matches to User-Agent_ from known *bots*"""
_regexp = None
def regexp_user_agent():
global _regexp # pylint: disable=global-statement
if not _regexp:
_regexp = re.compile(USER_AGENT)
return _regexp
def filter_request(
network: IPv4Network | IPv6Network,
request: flask.Request,
cfg: config.Config,
) -> werkzeug.Response | None:
user_agent = request.headers.get('User-Agent', 'unknown')
if regexp_user_agent().match(user_agent):
return too_many_requests(network, f"bot detected, HTTP header User-Agent: {user_agent}")
return None

View File

@ -1,146 +0,0 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
""".. _botdetection.ip_limit:
Method ``ip_limit``
The ``ip_limit`` method counts request from an IP in *sliding windows*. If
there are to many requests in a sliding window, the request is evaluated as a
bot request. This method requires a redis DB and needs a HTTP X-Forwarded-For_
header. To take privacy only the hash value of an IP is stored in the redis DB
and at least for a maximum of 10 minutes.
The :py:obj:`.link_token` method can be used to investigate whether a request is
*suspicious*. To activate the :py:obj:`.link_token` method in the
:py:obj:`.ip_limit` method add the following configuration:
.. code:: toml
link_token = true
If the :py:obj:`.link_token` method is activated and a request is *suspicious*
the request rates are reduced:
- :py:obj:`BURST_MAX` -> :py:obj:`BURST_MAX_SUSPICIOUS`
- :py:obj:`LONG_MAX` -> :py:obj:`LONG_MAX_SUSPICIOUS`
To intercept bots that get their IPs from a range of IPs, there is a
:py:obj:`SUSPICIOUS_IP_WINDOW`. In this window the suspicious IPs are stored
for a longer time. IPs stored in this sliding window have a maximum of
:py:obj:`SUSPICIOUS_IP_MAX` accesses before they are blocked. As soon as the IP
makes a request that is not suspicious, the sliding window for this IP is
.. _X-Forwarded-For:
from __future__ import annotations
from ipaddress import (
import flask
import werkzeug
from searx import redisdb
from searx.redislib import incr_sliding_window, drop_counter
from . import link_token
from . import config
from ._helpers import (
logger = logger.getChild('ip_limit')
"""Time (sec) before sliding window for *burst* requests expires."""
"""Maximum requests from one IP in the :py:obj:`BURST_WINDOW`"""
"""Maximum of suspicious requests from one IP in the :py:obj:`BURST_WINDOW`"""
"""Time (sec) before the longer sliding window expires."""
LONG_MAX = 150
"""Maximum requests from one IP in the :py:obj:`LONG_WINDOW`"""
"""Maximum suspicious requests from one IP in the :py:obj:`LONG_WINDOW`"""
"""Time (sec) before sliding window for API requests (format != html) expires."""
"""Maximum requests from one IP in the :py:obj:`API_WONDOW`"""
SUSPICIOUS_IP_WINDOW = 3600 * 24 * 30
"""Time (sec) before sliding window for one suspicious IP expires."""
"""Maximum requests from one suspicious IP in the :py:obj:`SUSPICIOUS_IP_WINDOW`."""
def filter_request(
network: IPv4Network | IPv6Network,
request: flask.Request,
cfg: config.Config,
) -> werkzeug.Response | None:
# pylint: disable=too-many-return-statements
redis_client = redisdb.client()
if network.is_link_local and not cfg['botdetection.ip_limit.filter_link_local']:
logger.debug("network %s is link-local -> not monitored by ip_limit method", network.compressed)
return None
if request.args.get('format', 'html') != 'html':
c = incr_sliding_window(redis_client, 'ip_limit.API_WONDOW:' + network.compressed, API_WONDOW)
if c > API_MAX:
return too_many_requests(network, "too many request in API_WINDOW")
if cfg['botdetection.ip_limit.link_token']:
suspicious = link_token.is_suspicious(network, request, True)
if not suspicious:
# this IP is no longer suspicious: release ip again / delete the counter of this IP
drop_counter(redis_client, 'ip_limit.SUSPICIOUS_IP_WINDOW' + network.compressed)
return None
# this IP is suspicious: count requests from this IP
c = incr_sliding_window(
redis_client, 'ip_limit.SUSPICIOUS_IP_WINDOW' + network.compressed, SUSPICIOUS_IP_WINDOW
logger.error("BLOCK: too many request from %s in SUSPICIOUS_IP_WINDOW (redirect to /)", network)
return flask.redirect(flask.url_for('index'), code=302)
c = incr_sliding_window(redis_client, 'ip_limit.BURST_WINDOW' + network.compressed, BURST_WINDOW)
return too_many_requests(network, "too many request in BURST_WINDOW (BURST_MAX_SUSPICIOUS)")
c = incr_sliding_window(redis_client, 'ip_limit.LONG_WINDOW' + network.compressed, LONG_WINDOW)
return too_many_requests(network, "too many request in LONG_WINDOW (LONG_MAX_SUSPICIOUS)")
return None
# vanilla limiter without extensions counts BURST_MAX and LONG_MAX
c = incr_sliding_window(redis_client, 'ip_limit.BURST_WINDOW' + network.compressed, BURST_WINDOW)
if c > BURST_MAX:
return too_many_requests(network, "too many request in BURST_WINDOW (BURST_MAX)")
c = incr_sliding_window(redis_client, 'ip_limit.LONG_WINDOW' + network.compressed, LONG_WINDOW)
if c > LONG_MAX:
return too_many_requests(network, "too many request in LONG_WINDOW (LONG_MAX)")
return None

View File

@ -1,84 +0,0 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
""".. _botdetection.ip_lists:
Method ``ip_lists``
The ``ip_lists`` method implements IP :py:obj:`block- <block_ip>` and
:py:obj:`pass-lists <pass_ip>`.
.. code:: toml
pass_ip = [
'', # IPv4 of check.searx.space
'', # IPv4 private network
'fe80::/10' # IPv6 linklocal
block_ip = [
'', # IPv4 of example.org
'', # invalid IP --> will be ignored, logged in ERROR class
# pylint: disable=unused-argument
from __future__ import annotations
from typing import Tuple
from ipaddress import (
from . import config
from ._helpers import logger
logger = logger.getChild('ip_limit')
# https://github.com/searxng/searxng/pull/2484#issuecomment-1576639195
'', # IPv4 check.searx.space
'2a01:04f8:1c1c:8fc2::/64', # IPv6 check.searx.space
"""Passlist of IPs from the SearXNG organization, e.g. `check.searx.space`."""
def pass_ip(real_ip: IPv4Address | IPv6Address, cfg: config.Config) -> Tuple[bool, str]:
"""Checks if the IP on the subnet is in one of the members of the
``botdetection.ip_lists.pass_ip`` list.
if cfg.get('botdetection.ip_lists.pass_searxng_org', default=True):
for net in SEARXNG_ORG:
net = ip_network(net, strict=False)
if real_ip.version == net.version and real_ip in net:
return True, f"IP matches {net.compressed} in SEARXNG_ORG list."
return ip_is_subnet_of_member_in_list(real_ip, 'botdetection.ip_lists.pass_ip', cfg)
def block_ip(real_ip: IPv4Address | IPv6Address, cfg: config.Config) -> Tuple[bool, str]:
"""Checks if the IP on the subnet is in one of the members of the
``botdetection.ip_lists.block_ip`` list.
block, msg = ip_is_subnet_of_member_in_list(real_ip, 'botdetection.ip_lists.block_ip', cfg)
if block:
msg += " To remove IP from list, please contact the maintainer of the service."
return block, msg
def ip_is_subnet_of_member_in_list(
real_ip: IPv4Address | IPv6Address, list_name: str, cfg: config.Config
) -> Tuple[bool, str]:
for net in cfg.get(list_name, default=[]):
net = ip_network(net, strict=False)
except ValueError:
logger.error("invalid IP %s in %s", net, list_name)
if real_ip.version == net.version and real_ip in net:
return True, f"IP matches {net.compressed} in {list_name}."
return False, f"IP is not a member of an item in the f{list_name} list"

View File

@ -1,154 +0,0 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
Method ``link_token``
The ``link_token`` method evaluates a request as :py:obj:`suspicious
<is_suspicious>` if the URL ``/client<token>.css`` is not requested by the
client. By adding a random component (the token) in the URL, a bot can not send
a ping by request a static URL.
.. note::
This method requires a redis DB and needs a HTTP X-Forwarded-For_ header.
To get in use of this method a flask URL route needs to be added:
.. code:: python
@app.route('/client<token>.css', methods=['GET', 'POST'])
def client_token(token=None):
link_token.ping(request, token)
return Response('', mimetype='text/css')
And in the HTML template from flask a stylesheet link is needed (the value of
``link_token`` comes from :py:obj:`get_token`):
.. code:: html
<link rel="stylesheet"
href="{{ url_for('client_token', token=link_token) }}"
type="text/css" />
.. _X-Forwarded-For:
from __future__ import annotations
from ipaddress import (
import string
import random
import flask
from searx import logger
from searx import redisdb
from searx.redislib import secret_hash
from ._helpers import (
"""Livetime (sec) of limiter's CSS token."""
"""Livetime (sec) of the ping-key from a client (request)"""
PING_KEY = 'SearXNG_limiter.ping'
"""Prefix of all ping-keys generated by :py:obj:`get_ping_key`"""
TOKEN_KEY = 'SearXNG_limiter.token'
"""Key for which the current token is stored in the DB"""
logger = logger.getChild('botdetection.link_token')
def is_suspicious(network: IPv4Network | IPv6Network, request: flask.Request, renew: bool = False):
"""Checks whether a valid ping is exists for this (client) network, if not
this request is rated as *suspicious*. If a valid ping exists and argument
``renew`` is ``True`` the expire time of this ping is reset to
redis_client = redisdb.client()
if not redis_client:
return False
ping_key = get_ping_key(network, request)
if not redis_client.get(ping_key):
logger.info("missing ping (IP: %s) / request: %s", network.compressed, ping_key)
return True
if renew:
redis_client.set(ping_key, 1, ex=PING_LIVE_TIME)
logger.debug("found ping for (client) network %s -> %s", network.compressed, ping_key)
return False
def ping(request: flask.Request, token: str):
"""This function is called by a request to URL ``/client<token>.css``. If
``token`` is valid a :py:obj:`PING_KEY` for the client is stored in the DB.
The expire time of this ping-key is :py:obj:`PING_LIVE_TIME`.
from . import redis_client, cfg # pylint: disable=import-outside-toplevel, cyclic-import
if not redis_client:
if not token_is_valid(token):
real_ip = ip_address(get_real_ip(request))
network = get_network(real_ip, cfg)
ping_key = get_ping_key(network, request)
logger.debug("store ping_key for (client) network %s (IP %s) -> %s", network.compressed, real_ip, ping_key)
redis_client.set(ping_key, 1, ex=PING_LIVE_TIME)
def get_ping_key(network: IPv4Network | IPv6Network, request: flask.Request) -> str:
"""Generates a hashed key that fits (more or less) to a *WEB-browser
session* in a network."""
return (
+ "["
+ secret_hash(
network.compressed + request.headers.get('Accept-Language', '') + request.headers.get('User-Agent', '')
+ "]"
def token_is_valid(token) -> bool:
valid = token == get_token()
logger.debug("token is valid --> %s", valid)
return valid
def get_token() -> str:
"""Returns current token. If there is no currently active token a new token
is generated randomly and stored in the redis DB.
- :py:obj:`TOKEN_LIVE_TIME`
- :py:obj:`TOKEN_KEY`
redis_client = redisdb.client()
if not redis_client:
# This function is also called when limiter is inactive / no redis DB
# (see render function in webapp.py)
return '12345678'
token = redis_client.get(TOKEN_KEY)
if token:
token = token.decode('UTF-8')
token = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(16))
redis_client.set(TOKEN_KEY, token, ex=TOKEN_LIVE_TIME)
return token

View File

@ -96,33 +96,37 @@ from __future__ import annotations
import sys import sys
from pathlib import Path from pathlib import Path
from ipaddress import ip_address
import flask import flask
import werkzeug import werkzeug
from searx import ( from botdetection import (
logger, install_botdetection,
redisdb, RouteFilter,
) Config,
from searx import botdetection PredefinedRequestFilter,
from searx.botdetection import ( RequestContext,
config, RequestInfo,
http_accept, too_many_requests,
) )
from searx import logger, redisdb
import tomllib
pytomlpp = None
except ImportError:
import pytomlpp
tomllib = None
# the configuration are limiter.toml and "limiter" in settings.yml so, for # the configuration are limiter.toml and "limiter" in settings.yml so, for
# coherency, the logger is "limiter" # coherency, the logger is "limiter"
logger = logger.getChild('limiter') logger = logger.getChild('limiter')
CFG: config.Config = None # type: ignore CFG: Config = None # type: ignore
LIMITER_CFG_SCHEMA = Path(__file__).parent / "limiter.toml" LIMITER_CFG_SCHEMA = Path(__file__).parent / "limiter.toml"
@ -131,82 +135,71 @@ LIMITER_CFG_SCHEMA = Path(__file__).parent / "limiter.toml"
LIMITER_CFG = Path('/etc/searxng/limiter.toml') LIMITER_CFG = Path('/etc/searxng/limiter.toml')
"""Local Limiter configuration.""" """Local Limiter configuration."""
# "dummy.old.foo": "config 'dummy.old.foo' exists only for tests. Don't use it in your real project config." """Time (sec) before sliding window for API requests (format != html) expires."""
"""Maximum requests from one IP in the :py:obj:`API_WINDOW`"""
def get_cfg() -> config.Config: def toml_load(file_name):
# Python >= 3.11
with open(file_name, "rb") as f:
return tomllib.load(f)
except tomllib.TOMLDecodeError as exc:
msg = str(exc).replace('\t', '').replace('\n', ' ')
logger.error("%s: %s", file_name, msg)
# fallback to pytomlpp for Python < 3.11
return pytomlpp.load(file_name)
except pytomlpp.DecodeError as exc:
msg = str(exc).replace('\t', '').replace('\n', ' ')
logger.error("%s: %s", file_name, msg)
def get_config() -> Config:
global CFG # pylint: disable=global-statement global CFG # pylint: disable=global-statement
if CFG is None: if CFG is None:
data = toml_load(LIMITER_CFG)
data = toml_load(LIMITER_CFG_SCHEMA)
CFG = Config(real_ip=data["real_ip"], botdetection=data["botdetection"])
return CFG return CFG
def filter_request(request: flask.Request) -> werkzeug.Response | None: def api_rate_filter_request(
# pylint: disable=too-many-return-statements context: RequestContext,
request_info: RequestInfo,
cfg = get_cfg() request: flask.Request,
real_ip = ip_address(get_real_ip(request)) ) -> werkzeug.Response | None:
network = get_network(real_ip, cfg) if request.args.get("format", "html") != "html":
c = context.redislib.incr_sliding_window("ip_limit.API_WINDOW:" + request_info.network.compressed, API_WINDOW)
if request.path == '/healthz': if c > API_MAX:
return None return too_many_requests(request_info, "too many request in API_WINDOW")
# link-local
if network.is_link_local:
return None
# block- & pass- lists
# 1. The IP of the request is first checked against the pass-list; if the IP
# matches an entry in the list, the request is not blocked.
# 2. If no matching entry is found in the pass-list, then a check is made against
# the block list; if the IP matches an entry in the list, the request is
# blocked.
# 3. If the IP is not in either list, the request is not blocked.
match, msg = ip_lists.pass_ip(real_ip, cfg)
if match:
logger.warning("PASS %s: matched PASSLIST - %s", network.compressed, msg)
return None
match, msg = ip_lists.block_ip(real_ip, cfg)
if match:
logger.error("BLOCK %s: matched BLOCKLIST - %s", network.compressed, msg)
return flask.make_response(('IP is on BLOCKLIST - %s' % msg, 429))
# methods applied on /
for func in [
val = func.filter_request(network, request, cfg)
if val is not None:
return val
# methods applied on /search
if request.path == '/search':
for func in [
val = func.filter_request(network, request, cfg)
if val is not None:
return val
logger.debug(f"OK {network}: %s", dump_request(flask.request))
return None return None
def pre_request(): route_filter = RouteFilter(
"""See :py:obj:`flask.Flask.before_request`""" {
return filter_request(flask.request) "/healthz": [],
"/search": [
"*": [
def is_installed(): def is_installed():
@ -221,13 +214,10 @@ def initialize(app: flask.Flask, settings):
# even if the limiter is not activated, the botdetection must be activated # even if the limiter is not activated, the botdetection must be activated
# (e.g. the self_info plugin uses the botdetection to get client IP) # (e.g. the self_info plugin uses the botdetection to get client IP)
cfg = get_cfg()
redis_client = redisdb.client()
botdetection.init(cfg, redis_client)
if not (settings['server']['limiter'] or settings['server']['public_instance']): if not (settings['server']['limiter'] or settings['server']['public_instance']):
return return
redis_client = redisdb.client()
if not redis_client: if not redis_client:
logger.error( logger.error(
"The limiter requires Redis, please consult the documentation: " "The limiter requires Redis, please consult the documentation: "
@ -237,10 +227,12 @@ def initialize(app: flask.Flask, settings):
sys.exit(1) sys.exit(1)
return return
# install botdetection
config = get_config()
if settings['server']['public_instance']: if settings['server']['public_instance']:
# overwrite limiter.toml setting # overwrite limiter.toml setting
cfg.set('botdetection.ip_limit.link_token', True) config.botdetection.ip_limit.link_token = True
app.before_request(pre_request) install_botdetection(app, redis_client, config, route_filter)

View File

@ -4,7 +4,6 @@
import re import re
from flask_babel import gettext from flask_babel import gettext
from searx.botdetection._helpers import get_real_ip
name = gettext('Self Information') name = gettext('Self Information')
description = gettext('Displays your IP if the query is "ip" and your user agent if the query contains "user agent".') description = gettext('Displays your IP if the query is "ip" and your user agent if the query contains "user agent".')
@ -17,11 +16,18 @@ query_examples = ''
p = re.compile('.*user[ -]agent.*', re.IGNORECASE) p = re.compile('.*user[ -]agent.*', re.IGNORECASE)
def get_client_ip(request):
botdetection_context = getattr(request, "botdetection_context", None)
if botdetection_context:
return request.botdetection_request_info.real_ip
return request.remote_addr
def post_search(request, search): def post_search(request, search):
if search.search_query.pageno > 1: if search.search_query.pageno > 1:
return True return True
if search.search_query.query == 'ip': if search.search_query.query == 'ip':
ip = get_real_ip(request) ip = get_client_ip(request)
search.result_container.answers['ip'] = {'answer': ip} search.result_container.answers['ip'] = {'answer': ip}
elif p.match(search.search_query.query): elif p.match(search.search_query.query):
ua = request.user_agent ua = request.user_agent

View File

@ -17,9 +17,9 @@
{% else %} {% else %}
<link rel="stylesheet" href="{{ url_for('static', filename='css/searxng.min.css') }}" type="text/css" media="screen" /> <link rel="stylesheet" href="{{ url_for('static', filename='css/searxng.min.css') }}" type="text/css" media="screen" />
{% endif %} {% endif %}
{% if get_setting('server.limiter') or get_setting('server.public_instance') %} {%- if botdetection_html_header is defined -%}
<link rel="stylesheet" href="{{ url_for('client_token', token=link_token) }}" type="text/css" /> {{- botdetection_html_header() | safe -}}
{% endif %} {%- endif %}
{% block styles %}{% endblock %} {% block styles %}{% endblock %}
<!--[if gte IE 9]>--> <!--[if gte IE 9]>-->
<script src="{{ url_for('static', filename='js/searxng.head.min.js') }}" client_settings="{{ client_settings }}"></script> <script src="{{ url_for('static', filename='js/searxng.head.min.js') }}" client_settings="{{ client_settings }}"></script>

View File

@ -56,7 +56,6 @@ from searx import (
from searx import infopage from searx import infopage
from searx import limiter from searx import limiter
from searx.botdetection import link_token
from searx.data import ENGINE_DESCRIPTIONS from searx.data import ENGINE_DESCRIPTIONS
from searx.results import Timing from searx.results import Timing
@ -383,7 +382,6 @@ def render(template_name: str, **kwargs):
kwargs['endpoint'] = 'results' if 'q' in kwargs else request.endpoint kwargs['endpoint'] = 'results' if 'q' in kwargs else request.endpoint
kwargs['cookies'] = request.cookies kwargs['cookies'] = request.cookies
kwargs['errors'] = request.errors kwargs['errors'] = request.errors
kwargs['link_token'] = link_token.get_token()
# values from the preferences # values from the preferences
kwargs['preferences'] = request.preferences kwargs['preferences'] = request.preferences
@ -613,12 +611,6 @@ def health():
return Response('OK', mimetype='text/plain') return Response('OK', mimetype='text/plain')
@app.route('/client<token>.css', methods=['GET', 'POST'])
def client_token(token=None):
link_token.ping(request, token)
return Response('', mimetype='text/css')
@app.route('/search', methods=['GET', 'POST']) @app.route('/search', methods=['GET', 'POST'])
def search(): def search():
"""Search query in q and return results. """Search query in q and return results.
@ -1281,7 +1273,7 @@ def config():
for _ in plugins: for _ in plugins:
_plugins.append({'name': _.name, 'enabled': _.default_on}) _plugins.append({'name': _.name, 'enabled': _.default_on})
_limiter_cfg = limiter.get_cfg() _limiter_cfg = limiter.get_config()
return jsonify( return jsonify(
{ {
@ -1304,8 +1296,8 @@ def config():
}, },
'limiter': { 'limiter': {
'enabled': limiter.is_installed(), 'enabled': limiter.is_installed(),
'botdetection.ip_limit.link_token': _limiter_cfg.get('botdetection.ip_limit.link_token'), 'botdetection.ip_limit.link_token': _limiter_cfg.botdetection.ip_limit.link_token,
'botdetection.ip_lists.pass_searxng_org': _limiter_cfg.get('botdetection.ip_lists.pass_searxng_org'), 'botdetection.ip_lists.pass_searxng_org': _limiter_cfg.botdetection.ip_lists, # fix me
}, },
'doi_resolvers': list(settings['doi_resolvers'].keys()), 'doi_resolvers': list(settings['doi_resolvers'].keys()),
'default_doi_resolver': settings['default_doi_resolver'], 'default_doi_resolver': settings['default_doi_resolver'],

View File

@ -3,11 +3,7 @@
from mock import Mock from mock import Mock
from searx import ( from searx import plugins
from tests import SearxTestCase from tests import SearxTestCase
@ -53,8 +49,6 @@ class SelfIPTest(SearxTestCase): # pylint: disable=missing-class-docstring
plugin = plugins.load_and_initialize_plugin('searx.plugins.self_info', False, (None, {})) plugin = plugins.load_and_initialize_plugin('searx.plugins.self_info', False, (None, {}))
store = plugins.PluginStore() store = plugins.PluginStore()
store.register(plugin) store.register(plugin)
cfg = limiter.get_cfg()
botdetection.init(cfg, None)
self.assertTrue(len(store.plugins) == 1) self.assertTrue(len(store.plugins) == 1)