diff --git a/searx/engines/ahmia.py b/searx/engines/ahmia.py index 589f38e57..064f8ee79 100644 --- a/searx/engines/ahmia.py +++ b/searx/engines/ahmia.py @@ -3,9 +3,14 @@ Ahmia (Onions) """ +import typing as t + from urllib.parse import urlencode, urlparse, parse_qs from lxml.html import fromstring +from searx.utils import gen_useragent, ElementType from searx.engines.xpath import extract_url, extract_text, eval_xpath_list, eval_xpath +from searx.network import get +from searx.enginelib import EngineCache # about about = { @@ -23,6 +28,7 @@ paging = True page_size = 10 # search url +base_url = 'http://juhanurmihxlp77nkq76byazcldy2hlmovfu2epvl5ankdibsot4csyd.onion' search_url = 'http://juhanurmihxlp77nkq76byazcldy2hlmovfu2epvl5ankdibsot4csyd.onion/search/?{query}' time_range_support = True time_range_dict = {'day': 1, 'week': 7, 'month': 30} @@ -34,10 +40,44 @@ title_xpath = './h4/a[1]' content_xpath = './/p[1]' correction_xpath = '//*[@id="didYouMean"]//a' number_of_results_xpath = '//*[@id="totalResults"]' +name_token_xpath = '//form[@id="searchForm"]/input[@type="hidden"]/@name' +value_token_xpath = '//form[@id="searchForm"]/input[@type="hidden"]/@value' + +CACHE: EngineCache +"""Persistent (SQLite) key/value cache that deletes its values after ``expire`` +seconds.""" + + +def setup(engine_settings: dict[str, t.Any]) -> bool: + global CACHE # pylint: disable=global-statement + CACHE = EngineCache(engine_settings["name"]) + return True + + +def _get_tokens(dom: ElementType | None = None) -> str: + """ + The tokens are hidden in a hidden input field. + They update every minute, but allow up to 1 hour old tokens to be used. + To spend the least amount of requests, it is best to always get the newest + tokens from each request. In worst case if it has expired, it would + need to do a total of 2 requests (over tor, might be ridiculously slow). + """ + if dom is None: + resp = get(base_url, headers={'User-Agent': gen_useragent()}) + dom = fromstring(resp.text) + name_token = extract_text(dom.xpath(name_token_xpath)) + value_token = extract_text(dom.xpath(value_token_xpath)) + return f"{name_token}:{value_token}" def request(query, params): - params['url'] = search_url.format(query=urlencode({'q': query})) + token_str: str | None = CACHE.get('ahmia-tokens') + if not token_str: + token_str = _get_tokens() + CACHE.set('ahmia-tokens', token_str, expire=60 * 60) + name_token, value_token = token_str.split(":") + + params['url'] = search_url.format(query=urlencode({'q': query, name_token: value_token})) if params['time_range'] in time_range_dict: params['url'] += '&' + urlencode({'d': time_range_dict[params['time_range']]}) @@ -77,4 +117,8 @@ def response(resp): except: # pylint: disable=bare-except pass + # Update the tokens to the newest ones + token_str = _get_tokens(dom) + CACHE.set('ahmia-tokens', token_str, expire=60 * 60) + return results