SearXNG: searxng_extra

This commit is contained in:
Alexandre Flament
2021-10-02 17:30:39 +02:00
parent e39a03cc61
commit 1bb82a6b54
21 changed files with 22 additions and 22 deletions

View File

View File

@@ -0,0 +1,35 @@
from sys import argv, exit
if not len(argv) > 1:
print('search query required')
exit(1)
import requests
from json import dumps
from searx.engines import google
from searx.search import default_request_params
request_params = default_request_params()
# Possible params
# request_params['headers']['User-Agent'] = ''
# request_params['category'] = ''
request_params['pageno'] = 1
request_params['language'] = 'en_us'
request_params['time_range'] = ''
params = google.request(argv[1], request_params)
request_args = dict(
headers=request_params['headers'],
cookies=request_params['cookies'],
)
if request_params['method'] == 'GET':
req = requests.get
else:
req = requests.post
request_args['data'] = request_params['data']
resp = req(request_params['url'], **request_args)
resp.search_params = request_params
print(dumps(google.response(resp)))

217
searxng_extra/standalone_searx.py Executable file
View File

@@ -0,0 +1,217 @@
#!/usr/bin/env python
"""Script to run searx from terminal.
Getting categories without initiate the engine will only return `['general']`
>>> import searx.engines
... list(searx.engines.categories.keys())
['general']
>>> import searx.search
... searx.search.initialize()
... list(searx.engines.categories.keys())
['general', 'it', 'science', 'images', 'news', 'videos', 'music', 'files', 'social media', 'map']
Example to use this script:
.. code:: bash
$ python3 searxng_extra/standalone_searx.py rain
Example to run it from python:
>>> import importlib
... import json
... import sys
... import searx.engines
... import searx.search
... search_query = 'rain'
... # initialize engines
... searx.search.initialize()
... # load engines categories once instead of each time the function called
... engine_cs = list(searx.engines.categories.keys())
... # load module
... spec = importlib.util.spec_from_file_location(
... 'utils.standalone_searx', 'searxng_extra/standalone_searx.py')
... sas = importlib.util.module_from_spec(spec)
... spec.loader.exec_module(sas)
... # use function from module
... prog_args = sas.parse_argument([search_query], category_choices=engine_cs)
... search_q = sas.get_search_query(prog_args, engine_categories=engine_cs)
... res_dict = sas.to_dict(search_q)
... sys.stdout.write(json.dumps(
... res_dict, sort_keys=True, indent=4, ensure_ascii=False,
... default=sas.json_serial))
{
"answers": [],
"infoboxes": [ {...} ],
"paging": true,
"results": [... ],
"results_number": 820000000.0,
"search": {
"lang": "all",
"pageno": 1,
"q": "rain",
"safesearch": 0,
"timerange": null
},
"suggestions": [...]
}
""" # noqa: E501
# pylint: disable=pointless-string-statement
'''
searx is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
searx is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with searx. If not, see < http://www.gnu.org/licenses/ >.
(C) 2016- by Alexandre Flament, <alex@al-f.net>
'''
# pylint: disable=wrong-import-position
import argparse
import sys
from datetime import datetime
from json import dumps
from typing import Any, Dict, List, Optional
import searx
import searx.preferences
import searx.query
import searx.search
import searx.webadapter
EngineCategoriesVar = Optional[List[str]]
def get_search_query(
args: argparse.Namespace, engine_categories: EngineCategoriesVar = None
) -> searx.search.SearchQuery:
"""Get search results for the query"""
if engine_categories is None:
engine_categories = list(searx.engines.categories.keys())
try:
category = args.category.decode('utf-8')
except AttributeError:
category = args.category
form = {
"q": args.query,
"categories": category,
"pageno": str(args.pageno),
"language": args.lang,
"time_range": args.timerange
}
preferences = searx.preferences.Preferences(
['oscar'], engine_categories, searx.engines.engines, [])
preferences.key_value_settings['safesearch'].parse(args.safesearch)
search_query = searx.webadapter.get_search_query_from_webapp(
preferences, form)[0]
return search_query
def no_parsed_url(results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Remove parsed url from dict."""
for result in results:
del result['parsed_url']
return results
def json_serial(obj: Any) -> Any:
"""JSON serializer for objects not serializable by default json code.
:raise TypeError: raised when **obj** is not serializable
"""
if isinstance(obj, datetime):
serial = obj.isoformat()
return serial
if isinstance(obj, bytes):
return obj.decode('utf8')
if isinstance(obj, set):
return list(obj)
raise TypeError("Type ({}) not serializable".format(type(obj)))
def to_dict(search_query: searx.search.SearchQuery) -> Dict[str, Any]:
"""Get result from parsed arguments."""
result_container = searx.search.Search(search_query).search()
result_container_json = {
"search": {
"q": search_query.query,
"pageno": search_query.pageno,
"lang": search_query.lang,
"safesearch": search_query.safesearch,
"timerange": search_query.time_range,
},
"results": no_parsed_url(result_container.get_ordered_results()),
"infoboxes": result_container.infoboxes,
"suggestions": list(result_container.suggestions),
"answers": list(result_container.answers),
"paging": result_container.paging,
"results_number": result_container.results_number()
}
return result_container_json
def parse_argument(
args: Optional[List[str]]=None,
category_choices: EngineCategoriesVar=None
) -> argparse.Namespace:
"""Parse command line.
:raise SystemExit: Query argument required on `args`
Examples:
>>> import importlib
... # load module
... spec = importlib.util.spec_from_file_location(
... 'utils.standalone_searx', 'utils/standalone_searx.py')
... sas = importlib.util.module_from_spec(spec)
... spec.loader.exec_module(sas)
... sas.parse_argument()
usage: ptipython [-h] [--category [{general}]] [--lang [LANG]] [--pageno [PAGENO]] [--safesearch [{0,1,2}]] [--timerange [{day,week,month,year}]]
query
SystemExit: 2
>>> sas.parse_argument(['rain'])
Namespace(category='general', lang='all', pageno=1, query='rain', safesearch='0', timerange=None)
""" # noqa: E501
if not category_choices:
category_choices = list(searx.engines.categories.keys())
parser = argparse.ArgumentParser(description='Standalone searx.')
parser.add_argument('query', type=str,
help='Text query')
parser.add_argument('--category', type=str, nargs='?',
choices=category_choices,
default='general',
help='Search category')
parser.add_argument('--lang', type=str, nargs='?', default='all',
help='Search language')
parser.add_argument('--pageno', type=int, nargs='?', default=1,
help='Page number starting from 1')
parser.add_argument(
'--safesearch', type=str, nargs='?',
choices=['0', '1', '2'], default='0',
help='Safe content filter from none to strict')
parser.add_argument(
'--timerange', type=str,
nargs='?', choices=['day', 'week', 'month', 'year'],
help='Filter by time range')
return parser.parse_args(args)
if __name__ == '__main__':
searx.search.initialize()
engine_cs = list(searx.engines.categories.keys())
prog_args = parse_argument(category_choices=engine_cs)
search_q = get_search_query(prog_args, engine_categories=engine_cs)
res_dict = to_dict(search_q)
sys.stdout.write(dumps(
res_dict, sort_keys=True, indent=4, ensure_ascii=False,
default=json_serial))

View File

View File

@@ -0,0 +1,30 @@
#!/usr/bin/env python
# This script saves Ahmia's blacklist for onion sites.
# More info in https://ahmia.fi/blacklist/
# set path
from os.path import join
import requests
from searx import searx_dir
URL = 'https://ahmia.fi/blacklist/banned/'
def fetch_ahmia_blacklist():
resp = requests.get(URL, timeout=3.0)
if resp.status_code != 200:
raise Exception("Error fetching Ahmia blacklist, HTTP code " + resp.status_code)
else:
blacklist = resp.text.split()
return blacklist
def get_ahmia_blacklist_filename():
return join(join(searx_dir, "data"), "ahmia_blacklist.txt")
blacklist = fetch_ahmia_blacklist()
with open(get_ahmia_blacklist_filename(), "w") as f:
f.write('\n'.join(blacklist))

View File

@@ -0,0 +1,154 @@
#!/usr/bin/env python
import re
import unicodedata
import json
# set path
from sys import path
from os.path import realpath, dirname, join
from searx import searx_dir
from searx.locales import LOCALE_NAMES
from searx.engines import wikidata, set_loggers
set_loggers(wikidata, 'wikidata')
# ORDER BY (with all the query fields) is important to keep a deterministic result order
# so multiple invokation of this script doesn't change currencies.json
SARQL_REQUEST = """
SELECT DISTINCT ?iso4217 ?unit ?unicode ?label ?alias WHERE {
?item wdt:P498 ?iso4217; rdfs:label ?label.
OPTIONAL { ?item skos:altLabel ?alias FILTER (LANG (?alias) = LANG(?label)). }
OPTIONAL { ?item wdt:P5061 ?unit. }
OPTIONAL { ?item wdt:P489 ?symbol.
?symbol wdt:P487 ?unicode. }
MINUS { ?item wdt:P582 ?end_data . } # Ignore monney with an end date
MINUS { ?item wdt:P31/wdt:P279* wd:Q15893266 . } # Ignore "former entity" (obsolete currency)
FILTER(LANG(?label) IN (%LANGUAGES_SPARQL%)).
}
ORDER BY ?iso4217 ?unit ?unicode ?label ?alias
"""
# ORDER BY (with all the query fields) is important to keep a deterministic result order
# so multiple invokation of this script doesn't change currencies.json
SPARQL_WIKIPEDIA_NAMES_REQUEST = """
SELECT DISTINCT ?iso4217 ?article_name WHERE {
?item wdt:P498 ?iso4217 .
?article schema:about ?item ;
schema:name ?article_name ;
schema:isPartOf [ wikibase:wikiGroup "wikipedia" ]
MINUS { ?item wdt:P582 ?end_data . } # Ignore monney with an end date
MINUS { ?item wdt:P31/wdt:P279* wd:Q15893266 . } # Ignore "former entity" (obsolete currency)
FILTER(LANG(?article_name) IN (%LANGUAGES_SPARQL%)).
}
ORDER BY ?iso4217 ?article_name
"""
LANGUAGES = LOCALE_NAMES.keys()
LANGUAGES_SPARQL = ', '.join(set(map(lambda l: repr(l.split('_')[0]), LANGUAGES)))
def remove_accents(name):
return unicodedata.normalize('NFKD', name).lower()
def remove_extra(name):
for c in ('(', ':'):
if c in name:
name = name.split(c)[0].strip()
return name
def _normalize_name(name):
name = re.sub(' +', ' ', remove_accents(name.lower()).replace('-', ' '))
name = remove_extra(name)
return name
def add_currency_name(db, name, iso4217, normalize_name=True):
db_names = db['names']
if normalize_name:
name = _normalize_name(name)
iso4217_set = db_names.setdefault(name, [])
if iso4217 not in iso4217_set:
iso4217_set.insert(0, iso4217)
def add_currency_label(db, label, iso4217, language):
labels = db['iso4217'].setdefault(iso4217, {})
labels[language] = label
def wikidata_request_result_iterator(request):
result = wikidata.send_wikidata_query(
request.replace('%LANGUAGES_SPARQL%', LANGUAGES_SPARQL)
)
if result is not None:
for r in result['results']['bindings']:
yield r
def fetch_db():
db = {
'names': {},
'iso4217': {},
}
for r in wikidata_request_result_iterator(SPARQL_WIKIPEDIA_NAMES_REQUEST):
iso4217 = r['iso4217']['value']
article_name = r['article_name']['value']
article_lang = r['article_name']['xml:lang']
add_currency_name(db, article_name, iso4217)
add_currency_label(db, article_name, iso4217, article_lang)
for r in wikidata_request_result_iterator(SARQL_REQUEST):
iso4217 = r['iso4217']['value']
if 'label' in r:
label = r['label']['value']
label_lang = r['label']['xml:lang']
add_currency_name(db, label, iso4217)
add_currency_label(db, label, iso4217, label_lang)
if 'alias' in r:
add_currency_name(db, r['alias']['value'], iso4217)
if 'unicode' in r:
add_currency_name(db, r['unicode']['value'], iso4217, normalize_name=False)
if 'unit' in r:
add_currency_name(db, r['unit']['value'], iso4217, normalize_name=False)
# reduce memory usage:
# replace lists with one item by the item.
# see searx.search.processors.online_currency.name_to_iso4217
for name in db['names']:
if len(db['names'][name]) == 1:
db['names'][name] = db['names'][name][0]
return db
def get_filename():
return join(join(searx_dir, "data"), "currencies.json")
def main():
#
db = fetch_db()
# static
add_currency_name(db, "euro", 'EUR')
add_currency_name(db, "euros", 'EUR')
add_currency_name(db, "dollar", 'USD')
add_currency_name(db, "dollars", 'USD')
add_currency_name(db, "peso", 'MXN')
add_currency_name(db, "pesos", 'MXN')
with open(get_filename(), 'w', encoding='utf8') as f:
json.dump(db, f, ensure_ascii=False, indent=4)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,316 @@
#!/usr/bin/env python
import json
from urllib.parse import urlparse
from os.path import join
from lxml.html import fromstring
from langdetect import detect_langs
from langdetect.lang_detect_exception import LangDetectException
from searx.engines import wikidata, set_loggers
from searx.utils import extract_text, match_language
from searx.locales import LOCALE_NAMES
from searx import searx_dir
from searx.utils import gen_useragent
import searx.search
import searx.network
set_loggers(wikidata, 'wikidata')
SPARQL_WIKIPEDIA_ARTICLE = """
SELECT DISTINCT ?item ?name
WHERE {
hint:Query hint:optimizer "None".
VALUES ?item { %IDS% }
?article schema:about ?item ;
schema:inLanguage ?lang ;
schema:name ?name ;
schema:isPartOf [ wikibase:wikiGroup "wikipedia" ] .
FILTER(?lang in (%LANGUAGES_SPARQL%)) .
FILTER (!CONTAINS(?name, ':')) .
}
"""
SPARQL_DESCRIPTION = """
SELECT DISTINCT ?item ?itemDescription
WHERE {
VALUES ?item { %IDS% }
?item schema:description ?itemDescription .
FILTER (lang(?itemDescription) in (%LANGUAGES_SPARQL%))
}
ORDER BY ?itemLang
"""
NOT_A_DESCRIPTION = [
'web site',
'site web',
'komputa serĉilo',
'interreta serĉilo',
'bilaketa motor',
'web search engine',
'wikimedia täpsustuslehekülg',
]
SKIP_ENGINE_SOURCE = [
('gitlab', 'wikidata') # descriptions are about wikipedia disambiguation pages
]
LANGUAGES = LOCALE_NAMES.keys()
WIKIPEDIA_LANGUAGES = {'language': 'wikipedia_language'}
LANGUAGES_SPARQL = ''
IDS = None
descriptions = {}
wd_to_engine_name = {}
def normalize_description(description):
for c in [chr(c) for c in range(0, 31)]:
description = description.replace(c, ' ')
description = ' '.join(description.strip().split())
return description
def update_description(engine_name, lang, description, source, replace=True):
if not isinstance(description, str):
return
description = normalize_description(description)
if description.lower() == engine_name.lower():
return
if description.lower() in NOT_A_DESCRIPTION:
return
if (engine_name, source) in SKIP_ENGINE_SOURCE:
return
if ' ' not in description:
# skip unique word description (like "website")
return
if replace or lang not in descriptions[engine_name]:
descriptions[engine_name][lang] = [description, source]
def get_wikipedia_summary(lang, pageid):
params = {
'language': lang.replace('_','-'),
'headers': {}
}
searx.engines.engines['wikipedia'].request(pageid, params)
try:
response = searx.network.get(params['url'], headers=params['headers'], timeout=10)
response.raise_for_status()
api_result = json.loads(response.text)
return api_result.get('extract')
except:
return None
def detect_language(text):
try:
r = detect_langs(str(text)) # pylint: disable=E1101
except LangDetectException:
return None
if len(r) > 0 and r[0].prob > 0.95:
return r[0].lang
return None
def get_website_description(url, lang1, lang2=None):
headers = {
'User-Agent': gen_useragent(),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'DNT': '1',
'Upgrade-Insecure-Requests': '1',
'Sec-GPC': '1',
'Cache-Control': 'max-age=0',
}
if lang1 is not None:
lang_list = [lang1]
if lang2 is not None:
lang_list.append(lang2)
headers['Accept-Language'] = f'{",".join(lang_list)};q=0.8'
try:
response = searx.network.get(url, headers=headers, timeout=10)
response.raise_for_status()
except Exception:
return (None, None)
try:
html = fromstring(response.text)
except ValueError:
html = fromstring(response.content)
description = extract_text(html.xpath('/html/head/meta[@name="description"]/@content'))
if not description:
description = extract_text(html.xpath('/html/head/meta[@property="og:description"]/@content'))
if not description:
description = extract_text(html.xpath('/html/head/title'))
lang = extract_text(html.xpath('/html/@lang'))
if lang is None and len(lang1) > 0:
lang = lang1
lang = detect_language(description) or lang or 'en'
lang = lang.split('_')[0]
lang = lang.split('-')[0]
return (lang, description)
def initialize():
global IDS, WIKIPEDIA_LANGUAGES, LANGUAGES_SPARQL
searx.search.initialize()
wikipedia_engine = searx.engines.engines['wikipedia']
WIKIPEDIA_LANGUAGES = {
language: wikipedia_engine.url_lang(language.replace('_', '-'))
for language in LANGUAGES
}
WIKIPEDIA_LANGUAGES['nb_NO'] = 'no'
LANGUAGES_SPARQL = ', '.join(f"'{l}'" for l in set(WIKIPEDIA_LANGUAGES.values()))
for engine_name, engine in searx.engines.engines.items():
descriptions[engine_name] = {}
wikidata_id = getattr(engine, "about", {}).get('wikidata_id')
if wikidata_id is not None:
wd_to_engine_name.setdefault(wikidata_id, set()).add(engine_name)
IDS = ' '.join(list(map(lambda wd_id: 'wd:' + wd_id, wd_to_engine_name.keys())))
def fetch_wikidata_descriptions():
searx.network.set_timeout_for_thread(60)
result = wikidata.send_wikidata_query(
SPARQL_DESCRIPTION
.replace('%IDS%', IDS)
.replace('%LANGUAGES_SPARQL%', LANGUAGES_SPARQL)
)
if result is not None:
for binding in result['results']['bindings']:
wikidata_id = binding['item']['value'].replace('http://www.wikidata.org/entity/', '')
wikidata_lang = binding['itemDescription']['xml:lang']
description = binding['itemDescription']['value']
for engine_name in wd_to_engine_name[wikidata_id]:
for lang in LANGUAGES:
if WIKIPEDIA_LANGUAGES[lang] == wikidata_lang:
update_description(engine_name, lang, description, 'wikidata')
def fetch_wikipedia_descriptions():
result = wikidata.send_wikidata_query(
SPARQL_WIKIPEDIA_ARTICLE
.replace('%IDS%', IDS)
.replace('%LANGUAGES_SPARQL%', LANGUAGES_SPARQL)
)
if result is not None:
for binding in result['results']['bindings']:
wikidata_id = binding['item']['value'].replace('http://www.wikidata.org/entity/', '')
wikidata_lang = binding['name']['xml:lang']
pageid = binding['name']['value']
for engine_name in wd_to_engine_name[wikidata_id]:
for lang in LANGUAGES:
if WIKIPEDIA_LANGUAGES[lang] == wikidata_lang:
description = get_wikipedia_summary(lang, pageid)
update_description(engine_name, lang, description, 'wikipedia')
def normalize_url(url):
url = url.replace('{language}', 'en')
url = urlparse(url)._replace(path='/', params='', query='', fragment='').geturl()
url = url.replace('https://api.', 'https://')
return url
def fetch_website_description(engine_name, website):
default_lang, default_description = get_website_description(website, None, None)
if default_lang is None or default_description is None:
# the front page can't be fetched: skip this engine
return
wikipedia_languages_r = { V: K for K, V in WIKIPEDIA_LANGUAGES.items() }
languages = ['en', 'es', 'pt', 'ru', 'tr', 'fr']
languages = languages + [ l for l in LANGUAGES if l not in languages]
previous_matched_lang = None
previous_count = 0
for lang in languages:
if lang not in descriptions[engine_name]:
fetched_lang, desc = get_website_description(website, lang, WIKIPEDIA_LANGUAGES[lang])
if fetched_lang is None or desc is None:
continue
matched_lang = match_language(fetched_lang, LANGUAGES, fallback=None)
if matched_lang is None:
fetched_wikipedia_lang = match_language(fetched_lang, WIKIPEDIA_LANGUAGES.values(), fallback=None)
matched_lang = wikipedia_languages_r.get(fetched_wikipedia_lang)
if matched_lang is not None:
update_description(engine_name, matched_lang, desc, website, replace=False)
# check if desc changed with the different lang values
if matched_lang == previous_matched_lang:
previous_count += 1
if previous_count == 6:
# the website has returned the same description for 6 different languages in Accept-Language header
# stop now
break
else:
previous_matched_lang = matched_lang
previous_count = 0
def fetch_website_descriptions():
for engine_name, engine in searx.engines.engines.items():
website = getattr(engine, "about", {}).get('website')
if website is None and hasattr(engine, "search_url"):
website = normalize_url(getattr(engine, "search_url"))
if website is None and hasattr(engine, "base_url"):
website = normalize_url(getattr(engine, "base_url"))
if website is not None:
fetch_website_description(engine_name, website)
def get_engine_descriptions_filename():
return join(join(searx_dir, "data"), "engine_descriptions.json")
def get_output():
"""
From descriptions[engine][language] = [description, source]
To
* output[language][engine] = description_and_source
* description_and_source can be:
* [description, source]
* description (if source = "wikipedia")
* [f"engine:lang", "ref"] (reference to another existing description)
"""
output = {
locale: {} for locale in LOCALE_NAMES
}
seen_descriptions = {}
for engine_name, lang_descriptions in descriptions.items():
for language, description in lang_descriptions.items():
if description[0] in seen_descriptions:
ref = seen_descriptions[description[0]]
description = [f'{ref[0]}:{ref[1]}', 'ref']
else:
seen_descriptions[description[0]] = (engine_name, language)
if description[1] == 'wikipedia':
description = description[0]
output.setdefault(language, {}).setdefault(engine_name, description)
return output
def main():
initialize()
print('Fetching wikidata descriptions')
fetch_wikidata_descriptions()
print('Fetching wikipedia descriptions')
fetch_wikipedia_descriptions()
print('Fetching website descriptions')
fetch_website_descriptions()
output = get_output()
with open(get_engine_descriptions_filename(), 'w', encoding='utf8') as f:
f.write(json.dumps(output, indent=1, separators=(',', ':'), ensure_ascii=False))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,158 @@
#!/usr/bin/env python
# lint: pylint
"""
Update searx/data/external_bangs.json using the duckduckgo bangs.
https://duckduckgo.com/newbang loads
* a javascript which provides the bang version ( https://duckduckgo.com/bv1.js )
* a JSON file which contains the bangs ( https://duckduckgo.com/bang.v260.js for example )
This script loads the javascript, then the bangs.
The javascript URL may change in the future ( for example https://duckduckgo.com/bv2.js ),
but most probably it will requires to update RE_BANG_VERSION
"""
# pylint: disable=C0116
import json
import re
from os.path import join
import httpx
from searx import searx_dir # pylint: disable=E0401 C0413
# from https://duckduckgo.com/newbang
URL_BV1 = 'https://duckduckgo.com/bv1.js'
RE_BANG_VERSION = re.compile(r'\/bang\.v([0-9]+)\.js')
HTTPS_COLON = 'https:'
HTTP_COLON = 'http:'
def get_bang_url():
response = httpx.get(URL_BV1)
response.raise_for_status()
r = RE_BANG_VERSION.findall(response.text)
return f'https://duckduckgo.com/bang.v{r[0]}.js', r[0]
def fetch_ddg_bangs(url):
response = httpx.get(url)
response.raise_for_status()
return json.loads(response.content.decode())
def merge_when_no_leaf(node):
"""Minimize the number of nodes
A -> B -> C
B is child of A
C is child of B
If there are no C equals to '*', then each C are merged into A
For example:
d -> d -> g -> * (ddg*)
-> i -> g -> * (dig*)
becomes
d -> dg -> *
-> ig -> *
"""
restart = False
if not isinstance(node, dict):
return
# create a copy of the keys so node can be modified
keys = list(node.keys())
for key in keys:
if key == '*':
continue
value = node[key]
value_keys = list(value.keys())
if '*' not in value_keys:
for value_key in value_keys:
node[key + value_key] = value[value_key]
merge_when_no_leaf(node[key + value_key])
del node[key]
restart = True
else:
merge_when_no_leaf(value)
if restart:
merge_when_no_leaf(node)
def optimize_leaf(parent, parent_key, node):
if not isinstance(node, dict):
return
if len(node) == 1 and '*' in node and parent is not None:
parent[parent_key] = node['*']
else:
for key, value in node.items():
optimize_leaf(node, key, value)
def parse_ddg_bangs(ddg_bangs):
bang_trie = {}
bang_urls = {}
for bang_definition in ddg_bangs:
# bang_list
bang_url = bang_definition['u']
if '{{{s}}}' not in bang_url:
# ignore invalid bang
continue
bang_url = bang_url.replace('{{{s}}}', chr(2))
# only for the https protocol: "https://example.com" becomes "//example.com"
if bang_url.startswith(HTTPS_COLON + '//'):
bang_url = bang_url[len(HTTPS_COLON):]
#
if bang_url.startswith(HTTP_COLON + '//') and bang_url[len(HTTP_COLON):] in bang_urls:
# if the bang_url uses the http:// protocol, and the same URL exists in https://
# then reuse the https:// bang definition. (written //example.com)
bang_def_output = bang_urls[bang_url[len(HTTP_COLON):]]
else:
# normal use case : new http:// URL or https:// URL (without "https:", see above)
bang_rank = str(bang_definition['r'])
bang_def_output = bang_url + chr(1) + bang_rank
bang_def_output = bang_urls.setdefault(bang_url, bang_def_output)
bang_urls[bang_url] = bang_def_output
# bang name
bang = bang_definition['t']
# bang_trie
t = bang_trie
for bang_letter in bang:
t = t.setdefault(bang_letter, {})
t = t.setdefault('*', bang_def_output)
# optimize the trie
merge_when_no_leaf(bang_trie)
optimize_leaf(None, None, bang_trie)
return bang_trie
def get_bangs_filename():
return join(join(searx_dir, "data"), "external_bangs.json")
if __name__ == '__main__':
bangs_url, bangs_version = get_bang_url()
print(f'fetch bangs from {bangs_url}')
output = {
'version': bangs_version,
'trie': parse_ddg_bangs(fetch_ddg_bangs(bangs_url))
}
with open(get_bangs_filename(), 'w', encoding="utf8") as fp:
json.dump(output, fp, ensure_ascii=False, indent=4)

View File

@@ -0,0 +1,68 @@
#!/usr/bin/env python
import json
import requests
import re
from os.path import dirname, join
from urllib.parse import urlparse, urljoin
from distutils.version import LooseVersion, StrictVersion
from lxml import html
from searx import searx_dir
URL = 'https://ftp.mozilla.org/pub/firefox/releases/'
RELEASE_PATH = '/pub/firefox/releases/'
NORMAL_REGEX = re.compile('^[0-9]+\.[0-9](\.[0-9])?$')
# BETA_REGEX = re.compile('.*[0-9]b([0-9\-a-z]+)$')
# ESR_REGEX = re.compile('^[0-9]+\.[0-9](\.[0-9])?esr$')
#
useragents = {
"versions": (),
"os": ('Windows NT 10.0; WOW64',
'X11; Linux x86_64'),
"ua": "Mozilla/5.0 ({os}; rv:{version}) Gecko/20100101 Firefox/{version}"
}
def fetch_firefox_versions():
resp = requests.get(URL, timeout=2.0)
if resp.status_code != 200:
raise Exception("Error fetching firefox versions, HTTP code " + resp.status_code)
else:
dom = html.fromstring(resp.text)
versions = []
for link in dom.xpath('//a/@href'):
url = urlparse(urljoin(URL, link))
path = url.path
if path.startswith(RELEASE_PATH):
version = path[len(RELEASE_PATH):-1]
if NORMAL_REGEX.match(version):
versions.append(LooseVersion(version))
list.sort(versions, reverse=True)
return versions
def fetch_firefox_last_versions():
versions = fetch_firefox_versions()
result = []
major_last = versions[0].version[0]
major_list = (major_last, major_last - 1)
for version in versions:
major_current = version.version[0]
if major_current in major_list:
result.append(version.vstring)
return result
def get_useragents_filename():
return join(join(searx_dir, "data"), "useragents.json")
useragents["versions"] = fetch_firefox_last_versions()
with open(get_useragents_filename(), "w") as f:
json.dump(useragents, f, indent=4, ensure_ascii=False)

View File

@@ -0,0 +1,204 @@
#!/usr/bin/env python
# This script generates languages.py from intersecting each engine's supported languages.
#
# Output files: searx/data/engines_languages.json and searx/languages.py
import json
from pathlib import Path
from pprint import pformat
from babel import Locale, UnknownLocaleError
from babel.languages import get_global
from searx import settings, searx_dir
from searx.engines import load_engines, engines
from searx.network import set_timeout_for_thread
# Output files.
engines_languages_file = Path(searx_dir) / 'data' / 'engines_languages.json'
languages_file = Path(searx_dir) / 'languages.py'
# Fetchs supported languages for each engine and writes json file with those.
def fetch_supported_languages():
set_timeout_for_thread(10.0)
engines_languages = dict()
names = list(engines)
names.sort()
for engine_name in names:
if hasattr(engines[engine_name], 'fetch_supported_languages'):
engines_languages[engine_name] = engines[engine_name].fetch_supported_languages()
print("fetched %s languages from engine %s" % (
len(engines_languages[engine_name]), engine_name))
if type(engines_languages[engine_name]) == list:
engines_languages[engine_name] = sorted(engines_languages[engine_name])
# write json file
with open(engines_languages_file, 'w', encoding='utf-8') as f:
json.dump(engines_languages, f, indent=2, sort_keys=True)
return engines_languages
# Get babel Locale object from lang_code if possible.
def get_locale(lang_code):
try:
locale = Locale.parse(lang_code, sep='-')
return locale
except (UnknownLocaleError, ValueError):
return None
# Join all language lists.
def join_language_lists(engines_languages):
language_list = dict()
for engine_name in engines_languages:
for lang_code in engines_languages[engine_name]:
# apply custom fixes if necessary
if lang_code in getattr(engines[engine_name], 'language_aliases', {}).values():
lang_code = next(lc for lc, alias in engines[engine_name].language_aliases.items()
if lang_code == alias)
locale = get_locale(lang_code)
# ensure that lang_code uses standard language and country codes
if locale and locale.territory:
lang_code = "{lang}-{country}".format(lang=locale.language, country=locale.territory)
short_code = lang_code.split('-')[0]
# add language without country if not in list
if short_code not in language_list:
if locale:
# get language's data from babel's Locale object
language_name = locale.get_language_name().title()
english_name = locale.english_name.split(' (')[0]
elif short_code in engines_languages['wikipedia']:
# get language's data from wikipedia if not known by babel
language_name = engines_languages['wikipedia'][short_code]['name']
english_name = engines_languages['wikipedia'][short_code]['english_name']
else:
language_name = None
english_name = None
# add language to list
language_list[short_code] = {'name': language_name,
'english_name': english_name,
'counter': set(),
'countries': dict()}
# add language with country if not in list
if lang_code != short_code and lang_code not in language_list[short_code]['countries']:
country_name = ''
if locale:
# get country name from babel's Locale object
country_name = locale.get_territory_name()
language_list[short_code]['countries'][lang_code] = {'country_name': country_name,
'counter': set()}
# count engine for both language_country combination and language alone
language_list[short_code]['counter'].add(engine_name)
if lang_code != short_code:
language_list[short_code]['countries'][lang_code]['counter'].add(engine_name)
return language_list
# Filter language list so it only includes the most supported languages and countries
def filter_language_list(all_languages):
min_engines_per_lang = 13
min_engines_per_country = 10
main_engines = [engine_name for engine_name in engines.keys()
if 'general' in engines[engine_name].categories and
engines[engine_name].supported_languages and
not engines[engine_name].disabled]
# filter list to include only languages supported by most engines or all default general engines
filtered_languages = {code: lang for code, lang
in all_languages.items()
if (len(lang['counter']) >= min_engines_per_lang or
all(main_engine in lang['counter']
for main_engine in main_engines))}
def _copy_lang_data(lang, country_name=None):
new_dict = dict()
new_dict['name'] = all_languages[lang]['name']
new_dict['english_name'] = all_languages[lang]['english_name']
if country_name:
new_dict['country_name'] = country_name
return new_dict
# for each language get country codes supported by most engines or at least one country code
filtered_languages_with_countries = dict()
for lang, lang_data in filtered_languages.items():
countries = lang_data['countries']
filtered_countries = dict()
# get language's country codes with enough supported engines
for lang_country, country_data in countries.items():
if len(country_data['counter']) >= min_engines_per_country:
filtered_countries[lang_country] = _copy_lang_data(lang, country_data['country_name'])
# add language without countries too if there's more than one country to choose from
if len(filtered_countries) > 1:
filtered_countries[lang] = _copy_lang_data(lang)
elif len(filtered_countries) == 1:
# if there's only one country per language, it's not necessary to show country name
lang_country = next(iter(filtered_countries))
filtered_countries[lang_country]['country_name'] = None
# if no country has enough engines try to get most likely country code from babel
if not filtered_countries:
lang_country = None
subtags = get_global('likely_subtags').get(lang)
if subtags:
country_code = subtags.split('_')[-1]
if len(country_code) == 2:
lang_country = "{lang}-{country}".format(lang=lang, country=country_code)
if lang_country:
filtered_countries[lang_country] = _copy_lang_data(lang)
else:
filtered_countries[lang] = _copy_lang_data(lang)
filtered_languages_with_countries.update(filtered_countries)
return filtered_languages_with_countries
# Write languages.py.
def write_languages_file(languages):
file_headers = (
"# -*- coding: utf-8 -*-",
"# list of language codes",
"# this file is generated automatically by utils/fetch_languages.py",
"language_codes ="
)
language_codes = tuple([
(
code,
languages[code]['name'].split(' (')[0],
languages[code].get('country_name') or '',
languages[code].get('english_name') or ''
) for code in sorted(languages)
])
with open(languages_file, 'w') as new_file:
file_content = "{file_headers} \\\n{language_codes}".format(
file_headers='\n'.join(file_headers),
language_codes=pformat(language_codes, indent=4)
)
new_file.write(file_content)
new_file.close()
if __name__ == "__main__":
load_engines(settings['engines'])
engines_languages = fetch_supported_languages()
all_languages = join_language_lists(engines_languages)
filtered_languages = filter_language_list(all_languages)
write_languages_file(filtered_languages)

View File

@@ -0,0 +1,212 @@
#!/usr/bin/env python
# lint: pylint
# pylint: disable=missing-function-docstring
"""Fetch OSM keys and tags.
To get the i18n names, the scripts uses `Wikidata Query Service`_ instead of for
example `OSM tags API`_ (sidenote: the actual change log from
map.atownsend.org.uk_ might be useful to normalize OSM tags)
.. _Wikidata Query Service: https://query.wikidata.org/
.. _OSM tags API: https://taginfo.openstreetmap.org/taginfo/apidoc
.. _map.atownsend.org.uk: https://map.atownsend.org.uk/maps/map/changelog.html
:py:obj:`SPARQL_TAGS_REQUEST` :
Wikidata SPARQL query that returns *type-categories* and *types*. The
returned tag is ``Tag:{category}={type}`` (see :py:func:`get_tags`).
Example:
- https://taginfo.openstreetmap.org/tags/building=house#overview
- https://wiki.openstreetmap.org/wiki/Tag:building%3Dhouse
at the bottom of the infobox (right side), there is a link to wikidata:
https://www.wikidata.org/wiki/Q3947
see property "OpenStreetMap tag or key" (P1282)
- https://wiki.openstreetmap.org/wiki/Tag%3Abuilding%3Dbungalow
https://www.wikidata.org/wiki/Q850107
:py:obj:`SPARQL_KEYS_REQUEST` :
Wikidata SPARQL query that returns *keys*. Example with "payment":
- https://wiki.openstreetmap.org/wiki/Key%3Apayment
at the bottom of infobox (right side), there is a link to wikidata:
https://www.wikidata.org/wiki/Q1148747
link made using the "OpenStreetMap tag or key" property (P1282)
to be confirm: there is a one wiki page per key ?
- https://taginfo.openstreetmap.org/keys/payment#values
- https://taginfo.openstreetmap.org/keys/payment:cash#values
``rdfs:label`` get all the labels without language selection
(as opposed to SERVICE ``wikibase:label``).
"""
import json
import collections
from pathlib import Path
from searx import searx_dir
from searx.network import set_timeout_for_thread
from searx.engines import wikidata, set_loggers
from searx.languages import language_codes
from searx.engines.openstreetmap import get_key_rank, VALUE_TO_LINK
set_loggers(wikidata, 'wikidata')
SPARQL_TAGS_REQUEST = """
SELECT ?tag ?item ?itemLabel WHERE {
?item wdt:P1282 ?tag .
?item rdfs:label ?itemLabel .
FILTER(STRSTARTS(?tag, 'Tag'))
}
GROUP BY ?tag ?item ?itemLabel
ORDER BY ?tag ?item ?itemLabel
"""
SPARQL_KEYS_REQUEST = """
SELECT ?key ?item ?itemLabel WHERE {
?item wdt:P1282 ?key .
?item rdfs:label ?itemLabel .
FILTER(STRSTARTS(?key, 'Key'))
}
GROUP BY ?key ?item ?itemLabel
ORDER BY ?key ?item ?itemLabel
"""
LANGUAGES = [l[0].lower() for l in language_codes]
PRESET_KEYS = {
('wikidata',): {'en': 'Wikidata'},
('wikipedia',): {'en': 'Wikipedia'},
('email',): {'en': 'Email'},
('facebook',): {'en': 'Facebook'},
('fax',): {'en': 'Fax'},
('internet_access', 'ssid'): {'en': 'Wi-Fi'},
}
INCLUDED_KEYS = {
('addr', )
}
def get_preset_keys():
results = collections.OrderedDict()
for keys, value in PRESET_KEYS.items():
r = results
for k in keys:
r = r.setdefault(k, {})
r.setdefault('*', value)
return results
def get_keys():
results = get_preset_keys()
response = wikidata.send_wikidata_query(SPARQL_KEYS_REQUEST)
for key in response['results']['bindings']:
keys = key['key']['value'].split(':')[1:]
if keys[0] == 'currency' and len(keys) > 1:
# special case in openstreetmap.py
continue
if keys[0] == 'contact' and len(keys) > 1:
# label for the key "contact.email" is "Email"
# whatever the language
r = results.setdefault('contact', {})
r[keys[1]] = {
'*': {
'en': keys[1]
}
}
continue
if tuple(keys) in PRESET_KEYS:
# skip presets (already set above)
continue
if get_key_rank(':'.join(keys)) is None\
and ':'.join(keys) not in VALUE_TO_LINK\
and tuple(keys) not in INCLUDED_KEYS:
# keep only keys that will be displayed by openstreetmap.py
continue
label = key['itemLabel']['value'].lower()
lang = key['itemLabel']['xml:lang']
r = results
for k in keys:
r = r.setdefault(k, {})
r = r.setdefault('*', {})
if lang in LANGUAGES:
r.setdefault(lang, label)
# special cases
results['delivery']['covid19']['*'].clear()
for k, v in results['delivery']['*'].items():
results['delivery']['covid19']['*'][k] = v + ' (COVID19)'
results['opening_hours']['covid19']['*'].clear()
for k, v in results['opening_hours']['*'].items():
results['opening_hours']['covid19']['*'][k] = v + ' (COVID19)'
return results
def get_tags():
results = collections.OrderedDict()
response = wikidata.send_wikidata_query(SPARQL_TAGS_REQUEST)
for tag in response['results']['bindings']:
tag_names = tag['tag']['value'].split(':')[1].split('=')
if len(tag_names) == 2:
tag_category, tag_type = tag_names
else:
tag_category, tag_type = tag_names[0], ''
label = tag['itemLabel']['value'].lower()
lang = tag['itemLabel']['xml:lang']
if lang in LANGUAGES:
results.setdefault(tag_category, {}).setdefault(tag_type, {}).setdefault(lang, label)
return results
def optimize_data_lang(translations):
language_to_delete = []
# remove "zh-hk" entry if the value is the same as "zh"
# same for "en-ca" / "en" etc...
for language in translations:
if '-' in language:
base_language = language.split('-')[0]
if translations.get(base_language) == translations.get(language):
language_to_delete.append(language)
for language in language_to_delete:
del translations[language]
language_to_delete = []
# remove entries that have the same value than the "en" entry
value_en = translations.get('en')
if value_en:
for language, value in translations.items():
if language != 'en' and value == value_en:
language_to_delete.append(language)
for language in language_to_delete:
del translations[language]
def optimize_tags(data):
for v in data.values():
for translations in v.values():
optimize_data_lang(translations)
return data
def optimize_keys(data):
for k, v in data.items():
if k == '*':
optimize_data_lang(v)
elif isinstance(v, dict):
optimize_keys(v)
return data
def get_osm_tags_filename():
return Path(searx_dir) / "data" / "osm_keys_tags.json"
if __name__ == '__main__':
set_timeout_for_thread(60)
result = {
'keys': optimize_keys(get_keys()),
'tags': optimize_tags(get_tags()),
}
with open(get_osm_tags_filename(), 'w', encoding="utf8") as f:
json.dump(result, f, indent=4, ensure_ascii=False)

View File

@@ -0,0 +1,145 @@
#!/usr/bin/env python
"""
Update pygments style
Call this script after each upgrade of pygments
"""
# pylint: disable=C0116
# set path
from os.path import join
import pygments
from pygments.formatters import HtmlFormatter # pylint: disable=E0611
from pygments.style import Style
from pygments.token import Comment, Error, Generic, Keyword, Literal, Name, Operator, Text
from searx import searx_dir
class LogicodevStyle(Style): # pylint: disable=R0903
"""Logicodev style
based on https://github.com/searx/searx/blob/2a5c39e33c3306ca17e09211fbf5a0f785cb10c8/searx/static/themes/oscar/less/logicodev/code.less
""" # pylint: disable=C0301
background_color = '#282C34'
styles = {
Comment: "#556366 italic",
Comment.Multiline: "#556366 italic",
Comment.Preproc: "#BC7A00",
Comment.Single: "#556366 italic",
Comment.Special: "#556366 italic",
Error: "border:#ff0000",
Generic.Deleted: "#A00000",
Generic.Emph: "italic",
Generic.Error: "#FF0000",
Generic.Heading: "#000080 bold",
Generic.Inserted: "#00A000",
Generic.Output: "#888888",
Generic.Prompt: "#000080 bold",
Generic.Strong: "bold",
Generic.Subheading: "#800080 bold",
Generic.Traceback: "#0044DD",
Keyword: "#BE74D5 bold",
Keyword.Constant: "#BE74D5 bold",
Keyword.Declaration: "#BE74D5 bold",
Keyword.Namespace: "#BE74D5 bold",
Keyword.Pseudo: "#BE74D5",
Keyword.Reserved: "#BE74D5 bold",
Keyword.Type: "#D46C72",
Literal.Number: "#D19A66",
Literal.String: "#86C372",
Literal.String.Backtick:"#86C372",
Literal.String.Char: "#86C372",
Literal.String.Doc: "#86C372 italic",
Literal.String.Double: "#86C372",
Literal.String.Escape: "#BB6622 bold",
Literal.String.Heredoc: "#86C372",
Literal.String.Interpol:"#BB6688 bold",
Literal.String.Other: "#BE74D5",
Literal.String.Regex: "#BB6688",
Literal.String.Single: "#86C372",
Literal.String.Symbol: "#DFC06F",
Name.Attribute: "#7D9029",
Name.Builtin: "#BE74D5",
Name.Builtin.Pseudo: "#BE74D5",
Name.Class: "#61AFEF bold",
Name.Constant: "#D19A66",
Name.Decorator: "#AA22FF",
Name.Entity: "#999999 bold",
Name.Exception: "#D2413A bold",
Name.Function: "#61AFEF",
Name.Label: "#A0A000",
Name.Namespace: "#61AFEF bold",
Name.Tag: "#BE74D5 bold",
Name.Variable: "#DFC06F",
Name.Variable.Class: "#DFC06F",
Name.Variable.Global: "#DFC06F",
Name.Variable.Instance: "#DFC06F",
Operator: "#D19A66",
Operator.Word: "#AA22FF bold",
Text.Whitespace: "#D7DAE0",
}
CSSCLASS = '.code-highlight'
RULE_CODE_LINENOS = """ .linenos {
-webkit-touch-callout: none;
-webkit-user-select: none;
-khtml-user-select: none;
-moz-user-select: none;
-ms-user-select: none;
user-select: none;
cursor: default;
&::selection {
background: transparent; /* WebKit/Blink Browsers */
}
&::-moz-selection {
background: transparent; /* Gecko Browsers */
}
margin-right: 8px;
text-align: right;
}"""
def get_output_filename(relative_name):
return join(searx_dir, relative_name)
def get_css(cssclass, style):
result = f"""/*
this file is generated automatically by searxng_extra/update/update_pygments.py
using pygments version {pygments.__version__}
*/\n\n"""
css_text = HtmlFormatter(style=style).get_style_defs(cssclass)
result += cssclass + RULE_CODE_LINENOS + '\n\n'
for line in css_text.splitlines():
if ' ' in line and not line.startswith(cssclass):
line = cssclass + ' ' + line
result += line + '\n'
return result
def main():
fname = 'static/themes/oscar/src/generated/pygments-logicodev.less'
print("update: %s" % fname)
with open(get_output_filename(fname), 'w') as f:
f.write(get_css(CSSCLASS, LogicodevStyle))
fname = 'static/themes/oscar/src/generated/pygments-pointhi.less'
print("update: %s" % fname)
with open(get_output_filename(fname), 'w') as f:
f.write(get_css(CSSCLASS, 'default'))
fname = 'static/themes/simple/src/generated/pygments.less'
print("update: %s" % fname)
with open(get_output_filename(fname), 'w') as f:
f.write(get_css(CSSCLASS, 'default'))
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,55 @@
#!/usr/bin/env python
import json
import collections
# set path
from os.path import join
from searx import searx_dir
from searx.engines import wikidata, set_loggers
set_loggers(wikidata, 'wikidata')
# the response contains duplicate ?item with the different ?symbol
# "ORDER BY ?item DESC(?rank) ?symbol" provides a deterministic result
# even if a ?item has different ?symbol of the same rank.
# A deterministic result
# see:
# * https://www.wikidata.org/wiki/Help:Ranking
# * https://www.mediawiki.org/wiki/Wikibase/Indexing/RDF_Dump_Format ("Statement representation" section)
# * https://w.wiki/32BT
# see the result for https://www.wikidata.org/wiki/Q11582
# there are multiple symbols the same rank
SARQL_REQUEST = """
SELECT DISTINCT ?item ?symbol
WHERE
{
?item wdt:P31/wdt:P279 wd:Q47574 .
?item p:P5061 ?symbolP .
?symbolP ps:P5061 ?symbol ;
wikibase:rank ?rank .
FILTER(LANG(?symbol) = "en").
}
ORDER BY ?item DESC(?rank) ?symbol
"""
def get_data():
results = collections.OrderedDict()
response = wikidata.send_wikidata_query(SARQL_REQUEST)
for unit in response['results']['bindings']:
name = unit['item']['value'].replace('http://www.wikidata.org/entity/', '')
unit = unit['symbol']['value']
if name not in results:
# ignore duplicate: always use the first one
results[name] = unit
return results
def get_wikidata_units_filename():
return join(join(searx_dir, "data"), "wikidata_units.json")
with open(get_wikidata_units_filename(), 'w') as f:
json.dump(get_data(), f, indent=4, ensure_ascii=False)