[mod] addition of various type hints / engine processors

Continuation of #5147 .. typification of the engine processors.

BTW:

- removed obsolete engine property https_support
- fixed & improved currency_convert
- engine instances can now implement a engine.setup method

[#5147] https://github.com/searxng/searxng/pull/5147

Signed-off-by: Markus Heiser <markus.heiser@darmarit.de>
This commit is contained in:
Markus Heiser
2025-09-11 19:10:27 +02:00
committed by Markus Heiser
parent 23257bddce
commit 8f8343dc0d
28 changed files with 814 additions and 522 deletions

View File

@@ -1,8 +1,7 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# pylint: disable=missing-module-docstring, too-few-public-methods
# the public namespace has not yet been finally defined ..
# __all__ = [..., ]
__all__ = ["SearchWithPlugins"]
import typing as t
@@ -22,7 +21,7 @@ from searx.metrics import initialize as initialize_metrics, counter_inc
from searx.network import initialize as initialize_network, check_network_configuration
from searx.results import ResultContainer
from searx.search.checker import initialize as initialize_checker
from searx.search.processors import PROCESSORS, initialize as initialize_processors
from searx.search.processors import PROCESSORS
if t.TYPE_CHECKING:
@@ -44,7 +43,7 @@ def initialize(
if check_network:
check_network_configuration()
initialize_metrics([engine['name'] for engine in settings_engines], enable_metrics)
initialize_processors(settings_engines)
PROCESSORS.init(settings_engines)
if enable_checker:
initialize_checker()
@@ -52,8 +51,6 @@ def initialize(
class Search:
"""Search information container"""
__slots__ = "search_query", "result_container", "start_time", "actual_timeout" # type: ignore
def __init__(self, search_query: "SearchQuery"):
"""Initialize the Search"""
# init vars
@@ -185,8 +182,6 @@ class Search:
class SearchWithPlugins(Search):
"""Inherit from the Search class, add calls to the plugins."""
__slots__ = 'user_plugins', 'request'
def __init__(self, search_query: "SearchQuery", request: "SXNG_Request", user_plugins: list[str]):
super().__init__(search_query)
self.user_plugins = user_plugins

View File

@@ -24,42 +24,29 @@ class EngineRef:
return hash((self.name, self.category))
@typing.final
class SearchQuery:
"""container for all the search parameters (query, language, etc...)"""
__slots__ = (
'query',
'engineref_list',
'lang',
'locale',
'safesearch',
'pageno',
'time_range',
'timeout_limit',
'external_bang',
'engine_data',
'redirect_to_first_result',
)
def __init__(
self,
query: str,
engineref_list: typing.List[EngineRef],
engineref_list: list[EngineRef],
lang: str = 'all',
safesearch: int = 0,
safesearch: typing.Literal[0, 1, 2] = 0,
pageno: int = 1,
time_range: typing.Optional[str] = None,
timeout_limit: typing.Optional[float] = None,
external_bang: typing.Optional[str] = None,
engine_data: typing.Optional[typing.Dict[str, str]] = None,
redirect_to_first_result: typing.Optional[bool] = None,
time_range: typing.Literal["day", "week", "month", "year"] | None = None,
timeout_limit: float | None = None,
external_bang: str | None = None,
engine_data: dict[str, dict[str, str]] | None = None,
redirect_to_first_result: bool | None = None,
): # pylint:disable=too-many-arguments
self.query = query
self.engineref_list = engineref_list
self.lang = lang
self.safesearch = safesearch
self.safesearch: typing.Literal[0, 1, 2] = safesearch
self.pageno = pageno
self.time_range = time_range
self.time_range: typing.Literal["day", "week", "month", "year"] | None = time_range
self.timeout_limit = timeout_limit
self.external_bang = external_bang
self.engine_data = engine_data or {}

View File

@@ -2,83 +2,95 @@
"""Implement request processors used by engine-types."""
__all__ = [
'EngineProcessor',
'OfflineProcessor',
'OnlineProcessor',
'OnlineDictionaryProcessor',
'OnlineCurrencyProcessor',
'OnlineUrlSearchProcessor',
'PROCESSORS',
"OfflineParamTypes",
"OnlineCurrenciesParams",
"OnlineDictParams",
"OnlineParamTypes",
"OnlineParams",
"OnlineUrlSearchParams",
"PROCESSORS",
"ParamTypes",
"RequestParams",
]
import typing as t
import threading
from searx import logger
from searx import engines
from .online import OnlineProcessor
from .abstract import EngineProcessor, RequestParams
from .offline import OfflineProcessor
from .online_dictionary import OnlineDictionaryProcessor
from .online_currency import OnlineCurrencyProcessor
from .online_url_search import OnlineUrlSearchProcessor
from .abstract import EngineProcessor
from .online import OnlineProcessor, OnlineParams
from .online_dictionary import OnlineDictionaryProcessor, OnlineDictParams
from .online_currency import OnlineCurrencyProcessor, OnlineCurrenciesParams
from .online_url_search import OnlineUrlSearchProcessor, OnlineUrlSearchParams
if t.TYPE_CHECKING:
from searx.enginelib import Engine
logger = logger.getChild("search.processors")
logger = logger.getChild('search.processors')
PROCESSORS: dict[str, EngineProcessor] = {}
"""Cache request processors, stored by *engine-name* (:py:func:`initialize`)
OnlineParamTypes: t.TypeAlias = OnlineParams | OnlineDictParams | OnlineCurrenciesParams | OnlineUrlSearchParams
OfflineParamTypes: t.TypeAlias = RequestParams
ParamTypes: t.TypeAlias = OfflineParamTypes | OnlineParamTypes
class ProcessorMap(dict[str, EngineProcessor]):
"""Class to manage :py:obj:`EngineProcessor` instances in a key/value map
(instances stored by *engine-name*)."""
processor_types: dict[str, type[EngineProcessor]] = {
OnlineProcessor.engine_type: OnlineProcessor,
OfflineProcessor.engine_type: OfflineProcessor,
OnlineDictionaryProcessor.engine_type: OnlineDictionaryProcessor,
OnlineCurrencyProcessor.engine_type: OnlineCurrencyProcessor,
OnlineUrlSearchProcessor.engine_type: OnlineUrlSearchProcessor,
}
def init(self, engine_list: list[dict[str, t.Any]]):
"""Initialize all engines and registers a processor for each engine."""
for eng_settings in engine_list:
eng_name: str = eng_settings["name"]
if eng_settings.get("inactive", False) is True:
logger.info("Engine of name '%s' is inactive.", eng_name)
continue
eng_obj = engines.engines.get(eng_name)
if eng_obj is None:
logger.warning("Engine of name '%s' does not exists.", eng_name)
continue
eng_type = getattr(eng_obj, "engine_type", "online")
proc_cls = self.processor_types.get(eng_type)
if proc_cls is None:
logger.error("Engine '%s' is of unknown engine_type: %s", eng_type)
continue
# initialize (and register) the engine
eng_proc = proc_cls(eng_obj)
eng_proc.initialize(self.register_processor)
def register_processor(self, eng_proc: EngineProcessor, eng_proc_ok: bool) -> bool:
"""Register the :py:obj:`EngineProcessor`.
This method is usually passed as a callback to the initialization of the
:py:obj:`EngineProcessor`.
The value (true/false) passed in ``eng_proc_ok`` indicates whether the
initialization of the :py:obj:`EngineProcessor` was successful; if this
is not the case, the processor is not registered.
"""
if eng_proc_ok:
self[eng_proc.engine.name] = eng_proc
# logger.debug("registered engine processor: %s", eng_proc.engine.name)
else:
logger.error("init method of engine %s failed (%s).", eng_proc.engine.name)
return eng_proc_ok
PROCESSORS = ProcessorMap()
"""Global :py:obj:`ProcessorMap`.
:meta hide-value:
"""
def get_processor_class(engine_type: str) -> type[EngineProcessor] | None:
"""Return processor class according to the ``engine_type``"""
for c in [
OnlineProcessor,
OfflineProcessor,
OnlineDictionaryProcessor,
OnlineCurrencyProcessor,
OnlineUrlSearchProcessor,
]:
if c.engine_type == engine_type:
return c
return None
def get_processor(engine: "Engine | ModuleType", engine_name: str) -> EngineProcessor | None:
"""Return processor instance that fits to ``engine.engine.type``"""
engine_type = getattr(engine, 'engine_type', 'online')
processor_class = get_processor_class(engine_type)
if processor_class is not None:
return processor_class(engine, engine_name)
return None
def initialize_processor(processor: EngineProcessor):
"""Initialize one processor
Call the init function of the engine
"""
if processor.has_initialize_function:
_t = threading.Thread(target=processor.initialize, daemon=True)
_t.start()
def initialize(engine_list: list[dict[str, t.Any]]):
"""Initialize all engines and store a processor for each engine in
:py:obj:`PROCESSORS`."""
for engine_data in engine_list:
engine_name: str = engine_data['name']
engine = engines.engines.get(engine_name)
if engine:
processor = get_processor(engine, engine_name)
if processor is None:
engine.logger.error('Error get processor for engine %s', engine_name)
else:
initialize_processor(processor)
PROCESSORS[engine_name] = processor

View File

@@ -1,7 +1,5 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Abstract base classes for engine request processors.
"""
"""Abstract base classes for all engine processors."""
import typing as t
@@ -10,25 +8,75 @@ import threading
from abc import abstractmethod, ABC
from timeit import default_timer
from searx import settings, logger
from searx import get_setting
from searx import logger
from searx.engines import engines
from searx.network import get_time_for_thread, get_network
from searx.metrics import histogram_observe, counter_inc, count_exception, count_error
from searx.exceptions import SearxEngineAccessDeniedException, SearxEngineResponseException
from searx.exceptions import SearxEngineAccessDeniedException
from searx.utils import get_engine_from_settings
if t.TYPE_CHECKING:
import types
from searx.enginelib import Engine
from searx.search.models import SearchQuery
from searx.results import ResultContainer
from searx.result_types import Result, LegacyResult # pyright: ignore[reportPrivateLocalImportUsage]
logger = logger.getChild('searx.search.processor')
SUSPENDED_STATUS: dict[int | str, 'SuspendedStatus'] = {}
logger = logger.getChild("searx.search.processor")
SUSPENDED_STATUS: dict[int | str, "SuspendedStatus"] = {}
class RequestParams(t.TypedDict):
"""Basic quantity of the Request parameters of all engine types."""
query: str
"""Search term, stripped of search syntax arguments."""
category: str
"""Current category, like ``general``.
.. hint::
This field is deprecated, don't use it in further implementations.
This field is currently *arbitrarily* filled with the name of "one""
category (the name of the first category of the engine). In practice,
however, it is not clear what this "one" category should be; in principle,
multiple categories can also be activated in a search.
"""
pageno: int
"""Current page number, where the first page is ``1``."""
safesearch: t.Literal[0, 1, 2]
"""Safe-Search filter (0:normal, 1:moderate, 2:strict)."""
time_range: t.Literal["day", "week", "month", "year"] | None
"""Time-range filter."""
engine_data: dict[str, str]
"""Allows the transfer of (engine specific) data to the next request of the
client. In the case of the ``online`` engines, this data is delivered to
the client via the HTML ``<form>`` in response.
If the client then sends this form back to the server with the next request,
this data will be available.
This makes it possible to carry data from one request to the next without a
session context, but this feature (is fragile) and should only be used in
exceptional cases. See also :ref:`engine_data`."""
searxng_locale: str
"""Language / locale filter from the search request, a string like 'all',
'en', 'en-US', 'zh-HK' .. and others, for more details see
:py:obj:`searx.locales`."""
class SuspendedStatus:
"""Class to handle suspend state."""
__slots__ = 'suspend_end_time', 'suspend_reason', 'continuous_errors', 'lock'
def __init__(self):
self.lock: threading.Lock = threading.Lock()
self.continuous_errors: int = 0
@@ -39,18 +87,18 @@ class SuspendedStatus:
def is_suspended(self):
return self.suspend_end_time >= default_timer()
def suspend(self, suspended_time: int, suspend_reason: str):
def suspend(self, suspended_time: int | None, suspend_reason: str):
with self.lock:
# update continuous_errors / suspend_end_time
self.continuous_errors += 1
if suspended_time is None:
suspended_time = min(
settings['search']['max_ban_time_on_fail'],
self.continuous_errors * settings['search']['ban_time_on_fail'],
)
max_ban: int = get_setting("search.max_ban_time_on_fail")
ban_fail: int = get_setting("search.ban_time_on_fail")
suspended_time = min(max_ban, ban_fail)
self.suspend_end_time = default_timer() + suspended_time
self.suspend_reason = suspend_reason
logger.debug('Suspend for %i seconds', suspended_time)
logger.debug("Suspend for %i seconds", suspended_time)
def resume(self):
with self.lock:
@@ -63,31 +111,63 @@ class SuspendedStatus:
class EngineProcessor(ABC):
"""Base classes used for all types of request processors."""
__slots__ = 'engine', 'engine_name', 'suspended_status', 'logger'
engine_type: str
def __init__(self, engine: "Engine|ModuleType", engine_name: str):
self.engine: "Engine" = engine
self.engine_name: str = engine_name
self.logger: logging.Logger = engines[engine_name].logger
key = get_network(self.engine_name)
key = id(key) if key else self.engine_name
def __init__(self, engine: "Engine|types.ModuleType"):
self.engine: "Engine" = engine # pyright: ignore[reportAttributeAccessIssue]
self.logger: logging.Logger = engines[engine.name].logger
key = get_network(self.engine.name)
key = id(key) if key else self.engine.name
self.suspended_status: SuspendedStatus = SUSPENDED_STATUS.setdefault(key, SuspendedStatus())
def initialize(self):
def initialize(self, callback: t.Callable[["EngineProcessor", bool], bool]):
"""Initialization of *this* :py:obj:`EngineProcessor`.
If processor's engine has an ``init`` method, it is called first.
Engine's ``init`` method is executed in a thread, meaning that the
*registration* (the ``callback``) may occur later and is not already
established by the return from this registration method.
Registration only takes place if the ``init`` method is not available or
is successfully run through.
"""
if not hasattr(self.engine, "init"):
callback(self, True)
return
if not callable(self.engine.init):
logger.error("Engine's init method isn't a callable (is of type: %s).", type(self.engine.init))
callback(self, False)
return
def __init_processor_thread():
eng_ok = self.init_engine()
callback(self, eng_ok)
# set up and start a thread
threading.Thread(target=__init_processor_thread, daemon=True).start()
def init_engine(self) -> bool:
eng_setting = get_engine_from_settings(self.engine.name)
init_ok: bool | None = False
try:
self.engine.init(get_engine_from_settings(self.engine_name))
except SearxEngineResponseException as exc:
self.logger.warning('Fail to initialize // %s', exc)
init_ok = self.engine.init(eng_setting)
except Exception: # pylint: disable=broad-except
self.logger.exception('Fail to initialize')
else:
self.logger.debug('Initialized')
logger.exception("Init method of engine %s failed due to an exception.", self.engine.name)
init_ok = False
# In older engines, None is returned from the init method, which is
# equivalent to indicating that the initialization was successful.
if init_ok is None:
init_ok = True
return init_ok
@property
def has_initialize_function(self):
return hasattr(self.engine, 'init')
def handle_exception(self, result_container, exception_or_message, suspend=False):
def handle_exception(
self,
result_container: "ResultContainer",
exception_or_message: BaseException | str,
suspend: bool = False,
):
# update result_container
if isinstance(exception_or_message, BaseException):
exception_class = exception_or_message.__class__
@@ -96,13 +176,13 @@ class EngineProcessor(ABC):
error_message = module_name + exception_class.__qualname__
else:
error_message = exception_or_message
result_container.add_unresponsive_engine(self.engine_name, error_message)
result_container.add_unresponsive_engine(self.engine.name, error_message)
# metrics
counter_inc('engine', self.engine_name, 'search', 'count', 'error')
counter_inc('engine', self.engine.name, 'search', 'count', 'error')
if isinstance(exception_or_message, BaseException):
count_exception(self.engine_name, exception_or_message)
count_exception(self.engine.name, exception_or_message)
else:
count_error(self.engine_name, exception_or_message)
count_error(self.engine.name, exception_or_message)
# suspend the engine ?
if suspend:
suspended_time = None
@@ -110,51 +190,63 @@ class EngineProcessor(ABC):
suspended_time = exception_or_message.suspended_time
self.suspended_status.suspend(suspended_time, error_message) # pylint: disable=no-member
def _extend_container_basic(self, result_container, start_time, search_results):
def _extend_container_basic(
self,
result_container: "ResultContainer",
start_time: float,
search_results: "list[Result | LegacyResult]",
):
# update result_container
result_container.extend(self.engine_name, search_results)
result_container.extend(self.engine.name, search_results)
engine_time = default_timer() - start_time
page_load_time = get_time_for_thread()
result_container.add_timing(self.engine_name, engine_time, page_load_time)
result_container.add_timing(self.engine.name, engine_time, page_load_time)
# metrics
counter_inc('engine', self.engine_name, 'search', 'count', 'successful')
histogram_observe(engine_time, 'engine', self.engine_name, 'time', 'total')
counter_inc('engine', self.engine.name, 'search', 'count', 'successful')
histogram_observe(engine_time, 'engine', self.engine.name, 'time', 'total')
if page_load_time is not None:
histogram_observe(page_load_time, 'engine', self.engine_name, 'time', 'http')
histogram_observe(page_load_time, 'engine', self.engine.name, 'time', 'http')
def extend_container(self, result_container, start_time, search_results):
def extend_container(
self,
result_container: "ResultContainer",
start_time: float,
search_results: "list[Result | LegacyResult]|None",
):
if getattr(threading.current_thread(), '_timeout', False):
# the main thread is not waiting anymore
self.handle_exception(result_container, 'timeout', None)
self.handle_exception(result_container, 'timeout', False)
else:
# check if the engine accepted the request
if search_results is not None:
self._extend_container_basic(result_container, start_time, search_results)
self.suspended_status.resume()
def extend_container_if_suspended(self, result_container):
def extend_container_if_suspended(self, result_container: "ResultContainer") -> bool:
if self.suspended_status.is_suspended:
result_container.add_unresponsive_engine(
self.engine_name, self.suspended_status.suspend_reason, suspended=True
self.engine.name, self.suspended_status.suspend_reason, suspended=True
)
return True
return False
def get_params(self, search_query, engine_category) -> dict[str, t.Any]:
"""Returns a set of (see :ref:`request params <engine request arguments>`) or
``None`` if request is not supported.
def get_params(self, search_query: "SearchQuery", engine_category: str) -> RequestParams | None:
"""Returns a dictionary with the :ref:`request parameters <engine
request arguments>` (:py:obj:`RequestParams`), if the search condition
is not supported by the engine, ``None`` is returned:
Not supported conditions (``None`` is returned):
- *time range* filter in search conditions, but the engine does not have
a corresponding filter
- page number > 1 when engine does not support paging
- page number > ``max_page``
- A page-number > 1 when engine does not support paging.
- A time range when the engine does not support time range.
"""
# if paging is not supported, skip
if search_query.pageno > 1 and not self.engine.paging:
return None
# if max page is reached, skip
max_page = self.engine.max_page or settings['search']['max_page']
max_page = self.engine.max_page or get_setting("search.max_page")
if max_page and max_page < search_query.pageno:
return None
@@ -162,39 +254,45 @@ class EngineProcessor(ABC):
if search_query.time_range and not self.engine.time_range_support:
return None
params = {}
params["query"] = search_query.query
params['category'] = engine_category
params['pageno'] = search_query.pageno
params['safesearch'] = search_query.safesearch
params['time_range'] = search_query.time_range
params['engine_data'] = search_query.engine_data.get(self.engine_name, {})
params['searxng_locale'] = search_query.lang
params: RequestParams = {
"query": search_query.query,
"category": engine_category,
"pageno": search_query.pageno,
"safesearch": search_query.safesearch,
"time_range": search_query.time_range,
"engine_data": search_query.engine_data.get(self.engine.name, {}),
"searxng_locale": search_query.lang,
}
# deprecated / vintage --> use params['searxng_locale']
# deprecated / vintage --> use params["searxng_locale"]
#
# Conditions related to engine's traits are implemented in engine.traits
# module. Don't do 'locale' decisions here in the abstract layer of the
# module. Don't do "locale" decisions here in the abstract layer of the
# search processor, just pass the value from user's choice unchanged to
# the engine request.
if hasattr(self.engine, 'language') and self.engine.language:
params['language'] = self.engine.language
if hasattr(self.engine, "language") and self.engine.language:
params["language"] = self.engine.language # pyright: ignore[reportGeneralTypeIssues]
else:
params['language'] = search_query.lang
params["language"] = search_query.lang # pyright: ignore[reportGeneralTypeIssues]
return params
@abstractmethod
def search(self, query, params, result_container, start_time, timeout_limit):
def search(
self,
query: str,
params: RequestParams,
result_container: "ResultContainer",
start_time: float,
timeout_limit: float,
):
pass
def get_tests(self):
tests = getattr(self.engine, 'tests', None)
if tests is None:
tests = getattr(self.engine, 'additional_tests', {})
tests.update(self.get_default_tests())
return tests
# deprecated!
return {}
def get_default_tests(self):
# deprecated!
return {}

View File

@@ -1,26 +1,32 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Processors for engine-type: ``offline``
"""Processors for engine-type: ``offline``"""
"""
import typing as t
from .abstract import EngineProcessor, RequestParams
from .abstract import EngineProcessor
if t.TYPE_CHECKING:
from searx.results import ResultContainer
class OfflineProcessor(EngineProcessor):
"""Processor class used by ``offline`` engines"""
"""Processor class used by ``offline`` engines."""
engine_type = 'offline'
engine_type: str = "offline"
def _search_basic(self, query, params):
return self.engine.search(query, params)
def search(self, query, params, result_container, start_time, timeout_limit):
def search(
self,
query: str,
params: RequestParams,
result_container: "ResultContainer",
start_time: float,
timeout_limit: float,
):
try:
search_results = self._search_basic(query, params)
search_results = self.engine.search(query, params)
self.extend_container(result_container, start_time, search_results)
except ValueError as e:
# do not record the error
self.logger.exception('engine {0} : invalid input : {1}'.format(self.engine_name, e))
self.logger.exception('engine {0} : invalid input : {1}'.format(self.engine.name, e))
except Exception as e: # pylint: disable=broad-except
self.handle_exception(result_container, e)
self.logger.exception('engine {0} : exception : {1}'.format(self.engine_name, e))
self.logger.exception('engine {0} : exception : {1}'.format(self.engine.name, e))

View File

@@ -1,8 +1,9 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Processors for engine-type: ``online``
"""Processor used for ``online`` engines."""
"""
# pylint: disable=use-dict-literal
__all__ = ["OnlineProcessor", "OnlineParams"]
import typing as t
from timeit import default_timer
import asyncio
@@ -17,50 +18,132 @@ from searx.exceptions import (
SearxEngineTooManyRequestsException,
)
from searx.metrics.error_recorder import count_error
from .abstract import EngineProcessor
from .abstract import EngineProcessor, RequestParams
if t.TYPE_CHECKING:
from searx.search.models import SearchQuery
from searx.results import ResultContainer
from searx.result_types import EngineResults
def default_request_params():
class HTTPParams(t.TypedDict):
"""HTTP request parameters"""
method: t.Literal["GET", "POST"]
"""HTTP request method."""
headers: dict[str, str]
"""HTTP header information."""
data: dict[str, str]
"""Sending `form encoded data`_.
.. _form encoded data:
https://www.python-httpx.org/quickstart/#sending-form-encoded-data
"""
json: dict[str, t.Any]
"""`Sending `JSON encoded data`_.
.. _JSON encoded data:
https://www.python-httpx.org/quickstart/#sending-json-encoded-data
"""
content: bytes
"""`Sending `binary request data`_.
.. _binary request data:
https://www.python-httpx.org/quickstart/#sending-json-encoded-data
"""
url: str
"""Requested url."""
cookies: dict[str, str]
"""HTTP cookies."""
allow_redirects: bool
"""Follow redirects"""
max_redirects: int
"""Maximum redirects, hard limit."""
soft_max_redirects: int
"""Maximum redirects, soft limit. Record an error but don't stop the engine."""
verify: None | t.Literal[False] | str # not sure str really works
"""If not ``None``, it overrides the verify value defined in the network. Use
``False`` to accept any server certificate and use a path to file to specify a
server certificate"""
auth: str | None
"""An authentication to use when sending requests."""
raise_for_httperror: bool
"""Raise an exception if the `HTTP response status code`_ is ``>= 300``.
.. _HTTP response status code:
https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Status
"""
class OnlineParams(HTTPParams, RequestParams):
"""Request parameters of a ``online`` engine."""
def default_request_params() -> HTTPParams:
"""Default request parameters for ``online`` engines."""
return {
# fmt: off
'method': 'GET',
'headers': {},
'data': {},
'url': '',
'cookies': {},
'auth': None
# fmt: on
"method": "GET",
"headers": {},
"data": {},
"json": {},
"content": b"",
"url": "",
"cookies": {},
"allow_redirects": False,
"max_redirects": 0,
"soft_max_redirects": 0,
"auth": None,
"verify": None,
"raise_for_httperror": True,
}
class OnlineProcessor(EngineProcessor):
"""Processor class for ``online`` engines."""
engine_type = 'online'
engine_type: str = "online"
def initialize(self):
def init_engine(self) -> bool:
"""This method is called in a thread, and before the base method is
called, the network must be set up for the ``online`` engines."""
self.init_network_in_thread(start_time=default_timer(), timeout_limit=self.engine.timeout)
return super().init_engine()
def init_network_in_thread(self, start_time: float, timeout_limit: float):
# set timeout for all HTTP requests
searx.network.set_timeout_for_thread(self.engine.timeout, start_time=default_timer())
searx.network.set_timeout_for_thread(timeout_limit, start_time=start_time)
# reset the HTTP total time
searx.network.reset_time_for_thread()
# set the network
searx.network.set_context_network_name(self.engine_name)
super().initialize()
searx.network.set_context_network_name(self.engine.name)
def get_params(self, search_query, engine_category):
"""Returns a set of :ref:`request params <engine request online>` or ``None``
if request is not supported.
"""
params = super().get_params(search_query, engine_category)
if params is None:
return None
def get_params(self, search_query: "SearchQuery", engine_category: str) -> OnlineParams | None:
"""Returns a dictionary with the :ref:`request params <engine request
online>` (:py:obj:`OnlineParams`), if the search condition is not
supported by the engine, ``None`` is returned."""
# add default params
params.update(default_request_params())
base_params: RequestParams | None = super().get_params(search_query, engine_category)
if base_params is None:
return base_params
params: OnlineParams = {**default_request_params(), **base_params}
headers = params["headers"]
# add an user agent
params['headers']['User-Agent'] = gen_useragent()
headers["User-Agent"] = gen_useragent()
# add Accept-Language header
if self.engine.send_accept_language_header and search_query.locale:
@@ -71,73 +154,77 @@ class OnlineProcessor(EngineProcessor):
search_query.locale.territory,
search_query.locale.language,
)
params['headers']['Accept-Language'] = ac_lang
headers["Accept-Language"] = ac_lang
self.logger.debug('HTTP Accept-Language: %s', params['headers'].get('Accept-Language', ''))
self.logger.debug("HTTP Accept-Language: %s", headers.get("Accept-Language", ""))
return params
def _send_http_request(self, params):
# create dictionary which contain all
# information about the request
request_args = dict(headers=params['headers'], cookies=params['cookies'], auth=params['auth'])
def _send_http_request(self, params: OnlineParams):
# verify
# if not None, it overrides the verify value defined in the network.
# use False to accept any server certificate
# use a path to file to specify a server certificate
verify = params.get('verify')
# create dictionary which contain all information about the request
request_args: dict[str, t.Any] = {
"headers": params["headers"],
"cookies": params["cookies"],
"auth": params["auth"],
}
verify = params.get("verify")
if verify is not None:
request_args['verify'] = params['verify']
request_args["verify"] = verify
# max_redirects
max_redirects = params.get('max_redirects')
max_redirects = params.get("max_redirects")
if max_redirects:
request_args['max_redirects'] = max_redirects
request_args["max_redirects"] = max_redirects
# allow_redirects
if 'allow_redirects' in params:
request_args['allow_redirects'] = params['allow_redirects']
if "allow_redirects" in params:
request_args["allow_redirects"] = params["allow_redirects"]
# soft_max_redirects
soft_max_redirects = params.get('soft_max_redirects', max_redirects or 0)
soft_max_redirects: int = params.get("soft_max_redirects", max_redirects or 0)
# raise_for_status
request_args['raise_for_httperror'] = params.get('raise_for_httperror', True)
request_args["raise_for_httperror"] = params.get("raise_for_httperror", True)
# specific type of request (GET or POST)
if params['method'] == 'GET':
if params["method"] == "GET":
req = searx.network.get
else:
req = searx.network.post
request_args['data'] = params['data']
if params["data"]:
request_args["data"] = params["data"]
if params["json"]:
request_args["json"] = params["json"]
if params["content"]:
request_args["content"] = params["content"]
# send the request
response = req(params['url'], **request_args)
response = req(params["url"], **request_args)
# check soft limit of the redirect count
if len(response.history) > soft_max_redirects:
# unexpected redirect : record an error
# but the engine might still return valid results.
status_code = str(response.status_code or '')
reason = response.reason_phrase or ''
status_code = str(response.status_code or "")
reason = response.reason_phrase or ""
hostname = response.url.host
count_error(
self.engine_name,
'{} redirects, maximum: {}'.format(len(response.history), soft_max_redirects),
self.engine.name,
"{} redirects, maximum: {}".format(len(response.history), soft_max_redirects),
(status_code, reason, hostname),
secondary=True,
)
return response
def _search_basic(self, query, params):
def _search_basic(self, query: str, params: OnlineParams) -> "EngineResults|None":
# update request parameters dependent on
# search-engine (contained in engines folder)
self.engine.request(query, params)
# ignoring empty urls
if not params['url']:
if not params["url"]:
return None
# send request
@@ -147,13 +234,15 @@ class OnlineProcessor(EngineProcessor):
response.search_params = params
return self.engine.response(response)
def search(self, query, params, result_container, start_time, timeout_limit):
# set timeout for all HTTP requests
searx.network.set_timeout_for_thread(timeout_limit, start_time=start_time)
# reset the HTTP total time
searx.network.reset_time_for_thread()
# set the network
searx.network.set_context_network_name(self.engine_name)
def search( # pyright: ignore[reportIncompatibleMethodOverride]
self,
query: str,
params: OnlineParams,
result_container: "ResultContainer",
start_time: float,
timeout_limit: float,
):
self.init_network_in_thread(start_time, timeout_limit)
try:
# send requests and parse the results
@@ -162,7 +251,7 @@ class OnlineProcessor(EngineProcessor):
except ssl.SSLError as e:
# requests timeout (connect or read)
self.handle_exception(result_container, e, suspend=True)
self.logger.error("SSLError {}, verify={}".format(e, searx.network.get_network(self.engine_name).verify))
self.logger.error("SSLError {}, verify={}".format(e, searx.network.get_network(self.engine.name).verify))
except (httpx.TimeoutException, asyncio.TimeoutError) as e:
# requests timeout (connect or read)
self.handle_exception(result_container, e, suspend=True)
@@ -179,55 +268,13 @@ class OnlineProcessor(EngineProcessor):
default_timer() - start_time, timeout_limit, e
)
)
except SearxEngineCaptchaException as e:
except (
SearxEngineCaptchaException,
SearxEngineTooManyRequestsException,
SearxEngineAccessDeniedException,
) as e:
self.handle_exception(result_container, e, suspend=True)
self.logger.exception('CAPTCHA')
except SearxEngineTooManyRequestsException as e:
self.handle_exception(result_container, e, suspend=True)
self.logger.exception('Too many requests')
except SearxEngineAccessDeniedException as e:
self.handle_exception(result_container, e, suspend=True)
self.logger.exception('SearXNG is blocked')
self.logger.exception(e.message)
except Exception as e: # pylint: disable=broad-except
self.handle_exception(result_container, e)
self.logger.exception('exception : {0}'.format(e))
def get_default_tests(self):
tests = {}
tests['simple'] = {
'matrix': {'query': ('life', 'computer')},
'result_container': ['not_empty'],
}
if getattr(self.engine, 'paging', False):
tests['paging'] = {
'matrix': {'query': 'time', 'pageno': (1, 2, 3)},
'result_container': ['not_empty'],
'test': ['unique_results'],
}
if 'general' in self.engine.categories:
# avoid documentation about HTML tags (<time> and <input type="time">)
tests['paging']['matrix']['query'] = 'news'
if getattr(self.engine, 'time_range', False):
tests['time_range'] = {
'matrix': {'query': 'news', 'time_range': (None, 'day')},
'result_container': ['not_empty'],
'test': ['unique_results'],
}
if getattr(self.engine, 'traits', False):
tests['lang_fr'] = {
'matrix': {'query': 'paris', 'lang': 'fr'},
'result_container': ['not_empty', ('has_language', 'fr')],
}
tests['lang_en'] = {
'matrix': {'query': 'paris', 'lang': 'en'},
'result_container': ['not_empty', ('has_language', 'en')],
}
if getattr(self.engine, 'safesearch', False):
tests['safesearch'] = {'matrix': {'query': 'porn', 'safesearch': (0, 2)}, 'test': ['unique_results']}
return tests
self.logger.exception("exception : {0}".format(e))

View File

@@ -1,42 +1,71 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Processors for engine-type: ``online_currency``
"""Processor used for ``online_currency`` engines."""
"""
import typing as t
import unicodedata
import re
import flask_babel
import babel
from searx.data import CURRENCIES
from .online import OnlineProcessor
from .online import OnlineProcessor, OnlineParams
parser_re = re.compile('.*?(\\d+(?:\\.\\d+)?) ([^.0-9]+) (?:in|to) ([^.0-9]+)', re.I)
if t.TYPE_CHECKING:
from .abstract import EngineProcessor
from searx.search.models import SearchQuery
def normalize_name(name: str):
name = name.strip()
name = name.lower().replace('-', ' ').rstrip('s')
name = re.sub(' +', ' ', name)
return unicodedata.normalize('NFKD', name).lower()
search_syntax = re.compile(r".*?(\d+(?:\.\d+)?) ([^.0-9]+) (?:in|to) ([^.0-9]+)", re.I)
"""Search syntax used for from/to currency (e.g. ``10 usd to eur``)"""
class CurrenciesParams(t.TypedDict):
"""Currencies request parameters."""
amount: float
"""Currency amount to be converted"""
to_iso4217: str
"""ISO_4217_ alpha code of the currency used as the basis for conversion.
.. _ISO_4217: https://en.wikipedia.org/wiki/ISO_4217
"""
from_iso4217: str
"""ISO_4217_ alpha code of the currency to be converted."""
from_name: str
"""Name of the currency used as the basis for conversion."""
to_name: str
"""Name of the currency of the currency to be converted."""
class OnlineCurrenciesParams(CurrenciesParams, OnlineParams): # pylint: disable=duplicate-bases
"""Request parameters of a ``online_currency`` engine."""
class OnlineCurrencyProcessor(OnlineProcessor):
"""Processor class used by ``online_currency`` engines."""
engine_type = 'online_currency'
engine_type: str = "online_currency"
def initialize(self):
def initialize(self, callback: t.Callable[["EngineProcessor", bool], bool]):
CURRENCIES.init()
super().initialize()
super().initialize(callback)
def get_params(self, search_query, engine_category):
"""Returns a set of :ref:`request params <engine request online_currency>`
or ``None`` if search query does not match to :py:obj:`parser_re`."""
def get_params(self, search_query: "SearchQuery", engine_category: str) -> OnlineCurrenciesParams | None:
"""Returns a dictionary with the :ref:`request params <engine request
online_currency>` (:py:obj:`OnlineCurrenciesParams`). ``None`` is
returned if the search query does not match :py:obj:`search_syntax`."""
params = super().get_params(search_query, engine_category)
if params is None:
online_params: OnlineParams | None = super().get_params(search_query, engine_category)
if online_params is None:
return None
m = parser_re.match(search_query.query)
m = search_syntax.match(search_query.query)
if not m:
return None
@@ -46,22 +75,46 @@ class OnlineCurrencyProcessor(OnlineProcessor):
except ValueError:
return None
from_currency = CURRENCIES.name_to_iso4217(normalize_name(from_currency))
to_currency = CURRENCIES.name_to_iso4217(normalize_name(to_currency))
# most often $ stands for USD
if from_currency == "$":
from_currency = "$ us"
params['amount'] = amount
params['from'] = from_currency
params['to'] = to_currency
params['from_name'] = CURRENCIES.iso4217_to_name(from_currency, "en")
params['to_name'] = CURRENCIES.iso4217_to_name(to_currency, "en")
return params
if to_currency == "$":
to_currency = "$ us"
def get_default_tests(self):
tests = {}
from_iso4217 = from_currency
if not CURRENCIES.is_iso4217(from_iso4217):
from_iso4217 = CURRENCIES.name_to_iso4217(_normalize_name(from_currency))
tests['currency'] = {
'matrix': {'query': '1337 usd in rmb'},
'result_container': ['has_answer'],
to_iso4217 = to_currency
if not CURRENCIES.is_iso4217(to_iso4217):
to_iso4217 = CURRENCIES.name_to_iso4217(_normalize_name(to_currency))
if from_iso4217 is None or to_iso4217 is None:
return None
ui_locale = flask_babel.get_locale() or babel.Locale.parse("en")
from_name: str = CURRENCIES.iso4217_to_name(
from_iso4217, ui_locale.language
) # pyright: ignore[reportAssignmentType]
to_name: str = CURRENCIES.iso4217_to_name(
to_iso4217, ui_locale.language
) # pyright: ignore[reportAssignmentType]
params: OnlineCurrenciesParams = {
**online_params,
"amount": amount,
"from_iso4217": from_iso4217,
"to_iso4217": to_iso4217,
"from_name": from_name,
"to_name": to_name,
}
return tests
return params
def _normalize_name(name: str):
name = name.strip()
name = name.lower().replace("-", " ")
name = re.sub(" +", " ", name)
return unicodedata.normalize("NFKD", name).lower()

View File

@@ -1,60 +1,102 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Processors for engine-type: ``online_dictionary``
"""
"""Processor used for ``online_dictionary`` engines."""
import typing as t
import re
from searx.utils import is_valid_lang
from .online import OnlineProcessor
from searx.sxng_locales import sxng_locales
from .online import OnlineProcessor, OnlineParams
parser_re = re.compile('.*?([a-z]+)-([a-z]+) (.+)$', re.I)
if t.TYPE_CHECKING:
from searx.search.models import SearchQuery
search_syntax = re.compile(r".*?([a-z]+)-([a-z]+) (.+)$", re.I)
"""Search syntax used for from/to language (e.g. ``en-de``)"""
FromToType: t.TypeAlias = tuple[bool, str, str]
"""Type of a language descriptions in the context of a ``online_dictionary``."""
class DictParams(t.TypedDict):
"""Dictionary request parameters."""
from_lang: FromToType
"""Language from which is to be translated."""
to_lang: FromToType
"""Language to translate into."""
query: str
"""Search term, cleaned of search syntax (*from-to* has been removed)."""
class OnlineDictParams(DictParams, OnlineParams): # pylint: disable=duplicate-bases
"""Request parameters of a ``online_dictionary`` engine."""
class OnlineDictionaryProcessor(OnlineProcessor):
"""Processor class used by ``online_dictionary`` engines."""
"""Processor class for ``online_dictionary`` engines."""
engine_type = 'online_dictionary'
engine_type: str = "online_dictionary"
def get_params(self, search_query, engine_category):
"""Returns a set of :ref:`request params <engine request online_dictionary>` or
``None`` if search query does not match to :py:obj:`parser_re`.
"""
params = super().get_params(search_query, engine_category)
if params is None:
def get_params(self, search_query: "SearchQuery", engine_category: str) -> OnlineDictParams | None:
"""Returns a dictionary with the :ref:`request params <engine request
online_dictionary>` (:py:obj:`OnlineDictParams`). ``None`` is returned
if the search query does not match :py:obj:`search_syntax`."""
online_params: OnlineParams | None = super().get_params(search_query, engine_category)
if online_params is None:
return None
m = parser_re.match(search_query.query)
m = search_syntax.match(search_query.query)
if not m:
return None
from_lang, to_lang, query = m.groups()
from_lang = is_valid_lang(from_lang)
to_lang = is_valid_lang(to_lang)
from_lang = _get_lang_descr(from_lang)
to_lang = _get_lang_descr(to_lang)
if not from_lang or not to_lang:
return None
params['from_lang'] = from_lang
params['to_lang'] = to_lang
params['query'] = query
params: OnlineDictParams = {
**online_params,
"from_lang": from_lang,
"to_lang": to_lang,
"query": query,
}
return params
def get_default_tests(self):
tests = {}
if getattr(self.engine, 'paging', False):
tests['translation_paging'] = {
'matrix': {'query': 'en-es house', 'pageno': (1, 2, 3)},
'result_container': ['not_empty', ('one_title_contains', 'house')],
'test': ['unique_results'],
}
else:
tests['translation'] = {
'matrix': {'query': 'en-es house'},
'result_container': ['not_empty', ('one_title_contains', 'house')],
}
def _get_lang_descr(lang: str) -> FromToType | None:
"""Returns language's code and language's english name if argument ``lang``
describes a language known by SearXNG, otherwise ``None``.
return tests
Examples:
.. code:: python
>>> _get_lang_descr("zz")
None
>>> _get_lang_descr("uk")
(True, "uk", "ukrainian")
>>> _get_lang_descr(b"uk")
(True, "uk", "ukrainian")
>>> _get_lang_descr("en")
(True, "en", "english")
>>> _get_lang_descr("Español")
(True, "es", "spanish")
>>> _get_lang_descr("Spanish")
(True, "es", "spanish")
"""
lang = lang.lower()
is_abbr = len(lang) == 2
if is_abbr:
for l in sxng_locales:
if l[0][:2] == lang:
return (True, l[0][:2], l[3].lower())
return None
for l in sxng_locales:
if l[1].lower() == lang or l[3].lower() == lang:
return (True, l[0][:2], l[3].lower())
return None

View File

@@ -1,45 +1,64 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Processors for engine-type: ``online_url_search``
"""
"""Processor used for ``online_url_search`` engines."""
import typing as t
import re
from .online import OnlineProcessor
re_search_urls = {
'http': re.compile(r'https?:\/\/[^ ]*'),
'ftp': re.compile(r'ftps?:\/\/[^ ]*'),
'data:image': re.compile('data:image/[^; ]*;base64,[^ ]*'),
from .online import OnlineProcessor, OnlineParams
if t.TYPE_CHECKING:
from .abstract import EngineProcessor
from searx.search.models import SearchQuery
search_syntax = {
"http": re.compile(r"https?:\/\/[^ ]*"),
"ftp": re.compile(r"ftps?:\/\/[^ ]*"),
"data:image": re.compile("data:image/[^; ]*;base64,[^ ]*"),
}
"""Search syntax used for a URL search."""
class UrlParams(t.TypedDict):
"""URL request parameters."""
search_urls: dict[str, str | None]
class OnlineUrlSearchParams(UrlParams, OnlineParams): # pylint: disable=duplicate-bases
"""Request parameters of a ``online_url_search`` engine."""
class OnlineUrlSearchProcessor(OnlineProcessor):
"""Processor class used by ``online_url_search`` engines."""
engine_type = 'online_url_search'
engine_type: str = "online_url_search"
def get_params(self, search_query, engine_category):
"""Returns a set of :ref:`request params <engine request online>` or ``None`` if
search query does not match to :py:obj:`re_search_urls`.
"""
def get_params(self, search_query: "SearchQuery", engine_category: str) -> OnlineUrlSearchParams | None:
"""Returns a dictionary with the :ref:`request params <engine request
online_currency>` (:py:obj:`OnlineUrlSearchParams`). ``None`` is
returned if the search query does not match :py:obj:`search_syntax`."""
params = super().get_params(search_query, engine_category)
if params is None:
online_params: OnlineParams | None = super().get_params(search_query, engine_category)
if online_params is None:
return None
url_match = False
search_urls = {}
search_urls: dict[str, str | None] = {}
has_match: bool = False
for k, v in re_search_urls.items():
m = v.search(search_query.query)
v = None
for url_schema, url_re in search_syntax.items():
search_urls[url_schema] = None
m = url_re.search(search_query.query)
if m:
url_match = True
v = m[0]
search_urls[k] = v
has_match = True
search_urls[url_schema] = m[0]
if not url_match:
if not has_match:
return None
params['search_urls'] = search_urls
params: OnlineUrlSearchParams = {
**online_params,
"search_urls": search_urls,
}
return params