text-generation-webui-mirror/modules/web_search.py
2025-05-28 05:28:15 -07:00

126 lines
4.2 KiB
Python

from datetime import datetime
import requests
from bs4 import BeautifulSoup
from duckduckgo_search import DDGS
from modules.logging_colors import logger
from modules.text_generation import generate_reply
def get_current_timestamp():
"""Returns the current time in 24-hour format"""
return datetime.now().strftime('%b %d, %Y %H:%M')
def generate_search_query(user_message, state):
"""Generate a search query from user message using the LLM"""
search_prompt = f"{user_message}\n\n=====\n\nPlease turn the message above into a short web search query in the same language as the message. Respond with only the search query, nothing else."
# Use a minimal state for search query generation
search_state = state.copy()
search_state['max_new_tokens'] = 64
search_state['temperature'] = 0.1
query = ""
for reply in generate_reply(search_prompt, search_state, stopping_strings=[], is_chat=False):
query = reply.strip()
return query
def download_web_page(url, timeout=10):
"""Download and extract text from a web page"""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(url, headers=headers, timeout=timeout)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
# Remove script and style elements
for script in soup(["script", "style"]):
script.decompose()
# Get text and clean it up
text = soup.get_text()
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = ' '.join(chunk for chunk in chunks if chunk)
return text
except Exception as e:
logger.error(f"Error downloading {url}: {e}")
return f"[Error downloading content from {url}: {str(e)}]"
def perform_web_search(query, num_pages=3):
"""Perform web search and return results with content"""
try:
with DDGS() as ddgs:
results = list(ddgs.text(query, max_results=num_pages))
search_results = []
for i, result in enumerate(results):
url = result.get('href', '')
title = result.get('title', f'Search Result {i+1}')
# Download page content
content = download_web_page(url)
search_results.append({
'title': title,
'url': url,
'content': content
})
return search_results
except Exception as e:
logger.error(f"Error performing web search: {e}")
return []
def add_web_search_attachments(history, row_idx, user_message, state):
"""Perform web search and add results as attachments"""
if not state.get('enable_web_search', False):
return
try:
# Generate search query
search_query = generate_search_query(user_message, state)
if not search_query:
logger.warning("Failed to generate search query")
return
logger.info(f"Generated search query: {search_query}")
# Perform web search
num_pages = int(state.get('web_search_pages', 3))
search_results = perform_web_search(search_query, num_pages)
if not search_results:
logger.warning("No search results found")
return
# Add search results as attachments
key = f"user_{row_idx}"
if key not in history['metadata']:
history['metadata'][key] = {"timestamp": get_current_timestamp()}
if "attachments" not in history['metadata'][key]:
history['metadata'][key]["attachments"] = []
for result in search_results:
attachment = {
"name": result['title'],
"type": "text/html",
"url": result['url'],
"content": result['content']
}
history['metadata'][key]["attachments"].append(attachment)
logger.info(f"Added {len(search_results)} web search results as attachments")
except Exception as e:
logger.error(f"Error in web search: {e}")