mirror of
https://github.com/oobabooga/text-generation-webui.git
synced 2025-06-07 06:06:20 -04:00
Compare commits
2 commits
8906f55aa9
...
30a0fa6006
Author | SHA1 | Date | |
---|---|---|---|
|
30a0fa6006 | ||
|
b549ac9cfe |
4 changed files with 46 additions and 330 deletions
|
@ -17,7 +17,7 @@ from jinja2.sandbox import ImmutableSandboxedEnvironment
|
|||
from PIL import Image
|
||||
|
||||
import modules.shared as shared
|
||||
from modules import utils, message_versioning
|
||||
from modules import utils
|
||||
from modules.extensions import apply_extensions
|
||||
from modules.html_generator import (
|
||||
chat_html_wrapper,
|
||||
|
@ -667,13 +667,6 @@ def generate_chat_reply_wrapper(text, state, regenerate=False, _continue=False):
|
|||
send_dummy_reply(state['start_with'], state)
|
||||
|
||||
history = state['history']
|
||||
initial_history_len = len(history['internal'])
|
||||
if not regenerate and not _continue and text.strip():
|
||||
temp_history = copy.deepcopy(history)
|
||||
visible_text = html.escape(text)
|
||||
temp_history['internal'].append([text, ''])
|
||||
temp_history['visible'].append([visible_text, ''])
|
||||
message_versioning.append_message_version(temp_history, state, is_bot=False)
|
||||
|
||||
last_save_time = time.monotonic()
|
||||
save_interval = 8
|
||||
|
@ -685,9 +678,6 @@ def generate_chat_reply_wrapper(text, state, regenerate=False, _continue=False):
|
|||
if i == 0 or (current_time - last_save_time) >= save_interval:
|
||||
save_history(history, state['unique_id'], state['character_menu'], state['mode'])
|
||||
last_save_time = current_time
|
||||
|
||||
if len(history['internal']) > initial_history_len or regenerate or _continue:
|
||||
message_versioning.append_message_version(history, state, is_bot=True)
|
||||
|
||||
save_history(history, state['unique_id'], state['character_menu'], state['mode'])
|
||||
|
||||
|
@ -721,8 +711,9 @@ def send_last_reply_to_input(history):
|
|||
return ''
|
||||
|
||||
|
||||
def replace_last_reply(text, state):
|
||||
def replace_last_reply(textbox, state):
|
||||
history = state['history']
|
||||
text = textbox['text']
|
||||
|
||||
# Initialize metadata if not present
|
||||
if 'metadata' not in history:
|
||||
|
@ -835,7 +826,6 @@ def rename_history(old_id, new_id, character, mode):
|
|||
else:
|
||||
logger.info(f"Renaming \"{old_p}\" to \"{new_p}\"")
|
||||
old_p.rename(new_p)
|
||||
message_versioning.rename_history_data(old_id, new_id, character, mode)
|
||||
|
||||
|
||||
def get_paths(state):
|
||||
|
@ -1334,8 +1324,6 @@ def handle_replace_last_reply_click(text, state):
|
|||
last_reply = state['history']['internal'][-1][1] if len(state['history']['internal']) > 0 else None
|
||||
history = replace_last_reply(text, state)
|
||||
save_history(history, state['unique_id'], state['character_menu'], state['mode'])
|
||||
if len(history['internal']) > 0 and history['internal'][-1][1] != last_reply: # Differs from last reply
|
||||
message_versioning.append_message_version(history, state, is_bot=True)
|
||||
html = redraw_html(history, state['name1'], state['name2'], state['mode'], state['chat_style'], state['character_menu'])
|
||||
|
||||
return [history, html, {"text": "", "files": []}]
|
||||
|
@ -1344,7 +1332,6 @@ def handle_replace_last_reply_click(text, state):
|
|||
def handle_send_dummy_message_click(text, state):
|
||||
history = send_dummy_message(text, state)
|
||||
save_history(history, state['unique_id'], state['character_menu'], state['mode'])
|
||||
message_versioning.append_message_version(history, state, is_bot=False)
|
||||
html = redraw_html(history, state['name1'], state['name2'], state['mode'], state['chat_style'], state['character_menu'])
|
||||
|
||||
return [history, html, {"text": "", "files": []}]
|
||||
|
@ -1353,7 +1340,6 @@ def handle_send_dummy_message_click(text, state):
|
|||
def handle_send_dummy_reply_click(text, state):
|
||||
history = send_dummy_reply(text, state)
|
||||
save_history(history, state['unique_id'], state['character_menu'], state['mode'])
|
||||
message_versioning.append_message_version(history, state, is_bot=True)
|
||||
html = redraw_html(history, state['name1'], state['name2'], state['mode'], state['chat_style'], state['character_menu'])
|
||||
|
||||
return [history, html, {"text": "", "files": []}]
|
||||
|
@ -1401,7 +1387,6 @@ def handle_delete_chat_confirm_click(state):
|
|||
index = str(all_histories.index(unique_id_to_delete))
|
||||
|
||||
delete_history(unique_id_to_delete, character_to_delete, mode_to_delete)
|
||||
message_versioning.clear_history_data(unique_id_to_delete, character_to_delete, mode_to_delete)
|
||||
|
||||
# Load the next appropriate history
|
||||
history, unique_id = load_history_after_deletion(state, index)
|
||||
|
|
|
@ -397,8 +397,8 @@ def generate_instruct_html(history):
|
|||
row_visible = history['visible'][i]
|
||||
row_internal = history['internal'][i]
|
||||
converted_visible = [convert_to_markdown_wrapped(entry, message_id=i, use_cache=i != len(history['visible']) - 1) for entry in row_visible]
|
||||
versioning_nav_user = message_versioning.get_message_version_nav_elements(i, 0)
|
||||
versioning_nav_bot = message_versioning.get_message_version_nav_elements(i, 1)
|
||||
versioning_nav_user = message_versioning.get_message_version_nav_elements(history, i, 0)
|
||||
versioning_nav_bot = message_versioning.get_message_version_nav_elements(history, i, 1)
|
||||
|
||||
# Get timestamps
|
||||
user_timestamp = format_message_timestamp(history, "user", i)
|
||||
|
@ -470,8 +470,8 @@ def generate_cai_chat_html(history, name1, name2, style, character, reset_cache=
|
|||
row_visible = history['visible'][i]
|
||||
row_internal = history['internal'][i]
|
||||
converted_visible = [convert_to_markdown_wrapped(entry, message_id=i, use_cache=i != len(history['visible']) - 1) for entry in row_visible]
|
||||
versioning_nav_user = message_versioning.get_message_version_nav_elements(i, 0)
|
||||
versioning_nav_bot = message_versioning.get_message_version_nav_elements(i, 1)
|
||||
versioning_nav_user = message_versioning.get_message_version_nav_elements(history, i, 0)
|
||||
versioning_nav_bot = message_versioning.get_message_version_nav_elements(history, i, 1)
|
||||
|
||||
# Get timestamps
|
||||
user_timestamp = format_message_timestamp(history, "user", i)
|
||||
|
@ -523,8 +523,8 @@ def generate_chat_html(history, name1, name2, reset_cache=False):
|
|||
row_visible = history['visible'][i]
|
||||
row_internal = history['internal'][i]
|
||||
converted_visible = [convert_to_markdown_wrapped(entry, message_id=i, use_cache=i != len(history['visible']) - 1) for entry in row_visible]
|
||||
versioning_nav_user = message_versioning.get_message_version_nav_elements(i, 0)
|
||||
versioning_nav_bot = message_versioning.get_message_version_nav_elements(i, 1)
|
||||
versioning_nav_user = message_versioning.get_message_version_nav_elements(history, i, 0)
|
||||
versioning_nav_bot = message_versioning.get_message_version_nav_elements(history, i, 1)
|
||||
|
||||
# Get timestamps
|
||||
user_timestamp = format_message_timestamp(history, "user", i)
|
||||
|
|
|
@ -6,193 +6,9 @@ from pathlib import Path
|
|||
import json
|
||||
import logging as logger
|
||||
|
||||
# --- History Storage Logic ---
|
||||
|
||||
# Global variables for the currently loaded history data and state
|
||||
loaded_history = {'visible': [], 'internal': []}
|
||||
last_state = {'character_menu': None, 'unique_id': None, 'mode': None, 'display_mode': 'html'}
|
||||
|
||||
|
||||
def validate_list(lst: List, i: int):
|
||||
"""Ensure list is properly extended to index i"""
|
||||
if len(lst) <= i:
|
||||
lst.extend([None] * (i + 1 - len(lst)))
|
||||
|
||||
|
||||
def validate_history_structure(history_data: Dict, i: int):
|
||||
"""Ensure history_data structure is properly extended to index i"""
|
||||
for history_type in ['visible', 'internal']:
|
||||
if history_type not in history_data:
|
||||
history_data[history_type] = []
|
||||
validate_list(history_data[history_type], i)
|
||||
|
||||
|
||||
def init_history_entry(history_data: Dict, i: int):
|
||||
"""Initialize history entry dicts at index i if nonexistent"""
|
||||
for history_type in ['visible', 'internal']:
|
||||
if history_data[history_type][i] is None or not isinstance(history_data[history_type][i], list):
|
||||
history_data[history_type][i] = [
|
||||
{'text': [], 'pos': 0}, # User message data
|
||||
{'text': [], 'pos': 0} # Bot message data
|
||||
]
|
||||
|
||||
|
||||
def load_or_init_history_data(character: Optional[str], unique_id: Optional[str], mode: str) -> Dict:
|
||||
"""
|
||||
Loads history data from file if character and unique_id are provided, the file exists,
|
||||
and the requested context (character, unique_id, mode) differs from the currently loaded context.
|
||||
Otherwise, returns the existing in-memory history or a new empty history structure.
|
||||
"""
|
||||
global loaded_history, last_state
|
||||
|
||||
_character = last_state.get('character_menu')
|
||||
_unique_id = last_state.get('unique_id')
|
||||
_mode = last_state.get('mode')
|
||||
|
||||
# --- Check if requested context matches already loaded context ---
|
||||
if (character == _character
|
||||
and unique_id == _unique_id # noqa: W503
|
||||
and mode == _mode # noqa: W503
|
||||
and _character is not None): # noqa: W503
|
||||
return loaded_history
|
||||
|
||||
# --- Context differs or history data not loaded, proceed with load/initialization ---
|
||||
def _create_empty_history():
|
||||
return {'visible': [], 'internal': []}
|
||||
|
||||
if not character or not unique_id:
|
||||
logger.warning("Cannot load history data: Character or ID is missing.")
|
||||
loaded_history = _create_empty_history()
|
||||
return loaded_history
|
||||
|
||||
path = get_history_data_path(unique_id, character, mode)
|
||||
if not path.exists():
|
||||
loaded_history = _create_empty_history()
|
||||
return loaded_history
|
||||
try:
|
||||
with open(path, 'r', encoding='utf-8') as f:
|
||||
contents = f.read()
|
||||
if contents:
|
||||
loaded_data = json.loads(contents)
|
||||
last_state['character_menu'] = character
|
||||
last_state['unique_id'] = unique_id
|
||||
last_state['mode'] = mode
|
||||
loaded_history = loaded_data
|
||||
return loaded_history
|
||||
else:
|
||||
loaded_history = _create_empty_history()
|
||||
return loaded_history
|
||||
except json.JSONDecodeError:
|
||||
logger.error(f"Error decoding JSON from history data file: {path}. Initializing empty.", exc_info=True)
|
||||
loaded_history = _create_empty_history()
|
||||
return loaded_history
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading message versioning history data from {path}: {e}", exc_info=True)
|
||||
loaded_history = _create_empty_history()
|
||||
return loaded_history
|
||||
|
||||
|
||||
def append_message_version(history: Dict, state: Dict, is_bot=True):
|
||||
"""
|
||||
Append a message to the end of the currently loaded history data (loaded_history).
|
||||
Requires state to potentially save the updated history.
|
||||
"""
|
||||
global loaded_history
|
||||
|
||||
# NOTE: Assumes the correct history for the state's character/unique_id is already loaded into loaded_history
|
||||
|
||||
msg_type = 1 if is_bot else 0
|
||||
if not history or not history.get('visible') or not history.get('internal'):
|
||||
logger.warning("Attempted to append to history data with invalid history.")
|
||||
return
|
||||
|
||||
i = len(history['visible']) - 1
|
||||
if i < 0 or len(history['visible'][i]) <= msg_type or len(history['internal'][i]) <= msg_type:
|
||||
logger.warning(f"Input history index {i} or msg_type {msg_type} out of bounds for appending.")
|
||||
return
|
||||
|
||||
visible_text = history['visible'][i][msg_type]
|
||||
if not visible_text:
|
||||
return
|
||||
internal_text = history['internal'][i][msg_type]
|
||||
|
||||
try:
|
||||
validate_history_structure(loaded_history, i)
|
||||
init_history_entry(loaded_history, i)
|
||||
|
||||
# Append the strings to the respective lists in loaded_history
|
||||
if 'text' not in loaded_history['visible'][i][msg_type]:
|
||||
loaded_history['visible'][i][msg_type]['text'] = []
|
||||
if 'text' not in loaded_history['internal'][i][msg_type]:
|
||||
loaded_history['internal'][i][msg_type]['text'] = []
|
||||
|
||||
vis_list = loaded_history['visible'][i][msg_type]['text']
|
||||
int_list = loaded_history['internal'][i][msg_type]['text']
|
||||
|
||||
vis_list.append(visible_text)
|
||||
int_list.append(internal_text)
|
||||
|
||||
# Update position to the new last index
|
||||
new_pos = len(vis_list) - 1
|
||||
loaded_history['visible'][i][msg_type]['pos'] = new_pos
|
||||
loaded_history['internal'][i][msg_type]['pos'] = new_pos
|
||||
|
||||
character = state['character_menu']
|
||||
unique_id = state['unique_id']
|
||||
mode = state['mode']
|
||||
if character and unique_id:
|
||||
save_history_data(character, unique_id, mode, loaded_history)
|
||||
else:
|
||||
logger.warning("Could not save history data after append: character or unique_id missing in state.")
|
||||
except IndexError:
|
||||
logger.error(f"IndexError adding message version: index={i}, msg_type={msg_type}. Message versions: {loaded_history}", exc_info=True)
|
||||
except Exception as e:
|
||||
logger.error(f"Error adding message version: {e}", exc_info=True)
|
||||
return
|
||||
|
||||
|
||||
def save_history_data(character: str, unique_id: str, mode: str, history_data: Dict) -> bool:
|
||||
"""Save the provided history data structure to disk"""
|
||||
|
||||
if not unique_id or not character:
|
||||
logger.warning("Cannot save history data: Character or ID is not set.")
|
||||
return False
|
||||
|
||||
current_mode = mode or last_state.get('mode') or shared.persistent_interface_state.get('mode', 'chat')
|
||||
|
||||
path = get_history_data_path(unique_id, character, current_mode)
|
||||
try:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(path, 'w', encoding='utf-8') as f:
|
||||
data_to_save = {'visible': history_data['visible'], 'internal': history_data['internal']}
|
||||
json.dump(data_to_save, f, indent=4)
|
||||
return True
|
||||
except TypeError as e:
|
||||
logger.error(f"TypeError saving history data (possibly non-serializable data): {e}. Data: {history_data}", exc_info=True)
|
||||
except Exception as e:
|
||||
logger.error(f"Error saving message versioning history data to {path}: {e}", exc_info=True)
|
||||
return False
|
||||
|
||||
|
||||
def get_history_data_path(unique_id: str, character: str, mode: str) -> Path:
|
||||
"""Get the path to the history data file"""
|
||||
if not unique_id or not character or not mode:
|
||||
raise ValueError("unique_id, character, and mode must be provided to get history data path.")
|
||||
|
||||
try:
|
||||
import modules.chat as chat
|
||||
base_history_path = chat.get_history_file_path(unique_id, character, mode)
|
||||
path = base_history_path.parent
|
||||
except Exception as e:
|
||||
# Fallback or error handling if get_history_file_path fails (shouldn't happen, but just in case)
|
||||
logger.error(f"Failed to get base history path via get_history_file_path: {e}.", exc_info=True)
|
||||
|
||||
path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
storage_filename = f'{unique_id}.boogaplus'
|
||||
final_path = path / storage_filename
|
||||
return final_path
|
||||
|
||||
last_state = {
|
||||
'display_mode': 'html', # Default display mode
|
||||
}
|
||||
|
||||
# --- Functions to be called by integrated logic ---
|
||||
|
||||
|
@ -209,44 +25,6 @@ def get_versioning_display_mode():
|
|||
return current_mode
|
||||
|
||||
|
||||
def clear_history_data(unique_id: str, character: str, mode: str):
|
||||
"""
|
||||
Deletes the history data file associated with a history.
|
||||
NOTE: This function does NOT clear the in-memory loaded_history.
|
||||
The caller should handle resetting/reloading the in-memory data if the deleted history was the active one.
|
||||
"""
|
||||
try:
|
||||
history_data_path = get_history_data_path(unique_id, character, mode)
|
||||
if history_data_path.exists():
|
||||
history_data_path.unlink()
|
||||
else:
|
||||
logger.warning(f"Message versioning history data file not found for deletion: {history_data_path}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error deleting message versioning history data file {history_data_path}: {e}", exc_info=True)
|
||||
|
||||
|
||||
def rename_history_data(old_id: str, new_id: str, character: str, mode: str):
|
||||
"""
|
||||
Renames the history data file when a history is renamed.
|
||||
NOTE: This function does NOT update the in-memory loaded_history if the renamed history was the active one.
|
||||
The caller should handle reloading the data under the new ID if necessary.
|
||||
"""
|
||||
try:
|
||||
old_history_data_path = get_history_data_path(old_id, character, mode)
|
||||
new_history_data_path = get_history_data_path(new_id, character, mode)
|
||||
|
||||
if new_history_data_path.exists():
|
||||
logger.warning(f"Cannot rename message versioning history data: Target path {new_history_data_path} already exists.")
|
||||
return
|
||||
|
||||
if old_history_data_path.exists():
|
||||
old_history_data_path.rename(new_history_data_path)
|
||||
else:
|
||||
logger.warning(f"Message versioning history data file not found for renaming: {old_history_data_path}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error renaming message versioning history data file from {old_id} to {new_id}: {e}", exc_info=True)
|
||||
|
||||
|
||||
# --- Core Logic ---
|
||||
|
||||
|
@ -280,24 +58,24 @@ def length(data: Iterable):
|
|||
return 0
|
||||
|
||||
|
||||
def get_message_metadata(history: dict, history_index: int, msg_type: int) -> dict:
|
||||
msg_role = 'assistant' if msg_type == 1 else 'user'
|
||||
return recursive_get(history, ['metadata', f'{msg_role}_{history_index}'])
|
||||
|
||||
# --- Core Functions ---
|
||||
|
||||
def get_message_positions(history_index: int, msg_type: int):
|
||||
def get_message_positions(history: dict, history_index: int, msg_type: int):
|
||||
"""
|
||||
Get the message position and total message versions for a specific message from the currently loaded history data.
|
||||
Assumes the correct history data has already been loaded by the calling context.
|
||||
"""
|
||||
global loaded_history
|
||||
|
||||
# Path: ['visible' or 'internal'][history_index][msg_type]['pos' or 'text']
|
||||
msg_data = recursive_get(loaded_history, ['visible', history_index, msg_type])
|
||||
msg_data = get_message_metadata(history, history_index, msg_type)
|
||||
|
||||
if msg_data is None:
|
||||
logger.warning(f"History data not found for index {history_index}, type {msg_type}. Was history loaded correctly before calling this?")
|
||||
return 0, 0
|
||||
|
||||
pos = recursive_get(msg_data, ['pos'], 0)
|
||||
total = length(recursive_get(msg_data, ['text'], []))
|
||||
pos = msg_data.get('current_message_index', 0)
|
||||
total = length(msg_data.get('versions', []))
|
||||
return pos, total
|
||||
|
||||
|
||||
|
@ -307,15 +85,14 @@ def navigate_message_version(history_index: float, msg_type: float, direction: s
|
|||
Updates the history dictionary directly and returns the modified history.
|
||||
The calling function will be responsible for regenerating the HTML.
|
||||
"""
|
||||
global loaded_history, last_history_index, last_msg_type
|
||||
global last_history_index, last_msg_type
|
||||
try:
|
||||
i = int(history_index)
|
||||
m_type = int(msg_type)
|
||||
last_history_index = i # Default to current index/type
|
||||
last_msg_type = m_type
|
||||
|
||||
load_or_init_history_data(character, unique_id, mode)
|
||||
current_pos, total_pos = get_message_positions(i, m_type)
|
||||
current_pos, total_pos = get_message_positions(history, i, m_type)
|
||||
|
||||
if total_pos <= 1:
|
||||
return history
|
||||
|
@ -334,51 +111,36 @@ def navigate_message_version(history_index: float, msg_type: float, direction: s
|
|||
logger.warning(f"Invalid navigation direction: {direction}")
|
||||
return history
|
||||
|
||||
loaded_history_data = loaded_history
|
||||
|
||||
history_metadata = history['metadata']
|
||||
msg_metadata = get_message_metadata(history, i, m_type)
|
||||
if msg_metadata is None:
|
||||
logger.error(f"Message metadata not found for index {i}, type {m_type}. Data: {history_metadata}")
|
||||
return history
|
||||
versions = msg_metadata.get('versions', [])
|
||||
new_ver = versions[new_pos]
|
||||
if new_ver is None:
|
||||
logger.error(f"Message version not found for metadata index {i}, type {m_type}, version index {new_pos}. Data: {history_metadata}")
|
||||
return history
|
||||
|
||||
ver_content = new_ver['content']
|
||||
visible_ver_content = new_ver['visible_content']
|
||||
|
||||
visible_msg_data = recursive_get(loaded_history_data, ['visible', i, m_type])
|
||||
internal_msg_data = recursive_get(loaded_history_data, ['internal', i, m_type])
|
||||
|
||||
if not visible_msg_data or not internal_msg_data or 'text' not in visible_msg_data or 'text' not in internal_msg_data:
|
||||
logger.error(f"Loaded history data structure invalid during navigation for index {i}, type {m_type}. Data: {loaded_history_data}")
|
||||
if ver_content is None or visible_ver_content is None:
|
||||
logger.error(f"Loaded version data structure invalid during navigation for index {i}, type {m_type}. Data: {history_metadata}")
|
||||
return history
|
||||
|
||||
# Check bounds for the new position against the text lists
|
||||
if new_pos < 0 or new_pos >= len(visible_msg_data['text']):
|
||||
logger.error(f"Navigation error: new_pos {new_pos} out of bounds for visible history text list (len {len(visible_msg_data['text'])})")
|
||||
return history
|
||||
if new_pos < 0 or new_pos >= len(internal_msg_data['text']):
|
||||
logger.error(f"Navigation error: new_pos {new_pos} out of bounds for internal history text list (len {len(internal_msg_data['text'])})")
|
||||
if new_pos < 0 or new_pos >= len(msg_metadata['versions']):
|
||||
logger.error(f"Navigation error: new_pos {new_pos} out of bounds for history text list (len {len(versions)})")
|
||||
return history
|
||||
|
||||
new_visible = visible_msg_data['text'][new_pos]
|
||||
new_internal = internal_msg_data['text'][new_pos]
|
||||
|
||||
# Update the position ('pos') within the loaded history data structure
|
||||
visible_msg_data['pos'] = new_pos
|
||||
internal_msg_data['pos'] = new_pos
|
||||
|
||||
if character and unique_id:
|
||||
save_history_data(character, unique_id, mode, loaded_history_data)
|
||||
else:
|
||||
logger.warning("Could not save history data after navigation: character or unique_id missing in state.")
|
||||
|
||||
# Update the main chat history dictionary in the state
|
||||
current_history = history
|
||||
if i < len(current_history['visible']) and m_type < len(current_history['visible'][i]):
|
||||
current_history['visible'][i][m_type] = new_visible
|
||||
else:
|
||||
logger.error(f"History structure invalid (visible) for update at index {i}, type {m_type}")
|
||||
|
||||
if i < len(current_history['internal']) and m_type < len(current_history['internal'][i]):
|
||||
current_history['internal'][i][m_type] = new_internal
|
||||
else:
|
||||
logger.error(f"History structure invalid (internal) for update at index {i}, type {m_type}")
|
||||
msg_metadata['current_message_index'] = new_pos # Updates history object
|
||||
|
||||
last_history_index = i
|
||||
last_msg_type = m_type
|
||||
|
||||
return current_history
|
||||
return history
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error during message version navigation: {e}", exc_info=True)
|
||||
|
@ -388,7 +150,7 @@ def navigate_message_version(history_index: float, msg_type: float, direction: s
|
|||
# --- Functions to be called during HTML Generation ---
|
||||
|
||||
|
||||
def get_message_version_nav_elements(history_index: int, msg_type: int):
|
||||
def get_message_version_nav_elements(history: dict, history_index: int, msg_type: int):
|
||||
"""
|
||||
Generates the HTML for the message version navigation arrows and position indicator.
|
||||
Should be called by the HTML generator.
|
||||
|
@ -398,15 +160,14 @@ def get_message_version_nav_elements(history_index: int, msg_type: int):
|
|||
return ""
|
||||
|
||||
try:
|
||||
current_pos, total_pos = get_message_positions(history_index, msg_type)
|
||||
current_pos, total_pos = get_message_positions(history, history_index, msg_type)
|
||||
|
||||
if total_pos <= 1:
|
||||
return ""
|
||||
|
||||
left_activated_attr = f' activated="true" data-history-index="{history_index}" data-msg-type="{msg_type}" data-direction="left"' if current_pos > 0 else ''
|
||||
right_activated_attr = f' activated="true" data-history-index="{history_index}" data-msg-type="{msg_type}" data-direction="right"' if current_pos < total_pos - 1 else ''
|
||||
left_activated_attr = f' activated="true"' if current_pos > 0 else ''
|
||||
right_activated_attr = f' activated="true"' if current_pos < total_pos - 1 else ''
|
||||
|
||||
# Similar size to copy/refresh icons
|
||||
left_arrow_svg = '''<svg xmlns="http://www.w3.org/2000/svg" width="16" height="16" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><polyline points="15 18 9 12 15 6"></polyline></svg>'''
|
||||
right_arrow_svg = '''<svg xmlns="http://www.w3.org/2000/svg" width="16" height="16" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><polyline points="9 18 15 12 9 6"></polyline></svg>'''
|
||||
|
||||
|
@ -442,33 +203,6 @@ def is_message_selected(history_index: int, msg_type: int) -> bool:
|
|||
return last_history_index == history_index and last_msg_type == msg_type
|
||||
|
||||
|
||||
# --- Functions to be called by Gradio Event Handlers ---
|
||||
|
||||
|
||||
def handle_unique_id_change(history: Dict, unique_id: str, name1: str, name2: str, mode: str, chat_style: str, character: str):
|
||||
"""
|
||||
Handles changes to the unique_id.
|
||||
Loads the corresponding message versioning history data.
|
||||
"""
|
||||
global last_history_index, last_msg_type
|
||||
last_history_index = -1
|
||||
last_msg_type = -1
|
||||
|
||||
if not unique_id:
|
||||
logger.warning("handle_unique_id_change called with empty unique_id.")
|
||||
return
|
||||
|
||||
import modules.chat as chat
|
||||
if not character:
|
||||
logger.warning(f"Cannot load history for unique_id {unique_id}: character_menu not found in state.")
|
||||
# Avoid using stale data from a previous selection
|
||||
load_or_init_history_data(None, unique_id, mode)
|
||||
else:
|
||||
load_or_init_history_data(character, unique_id, mode)
|
||||
|
||||
return chat.redraw_html(history, name1, name2, mode, chat_style, character)
|
||||
|
||||
|
||||
def handle_display_mode_change(display_mode: str):
|
||||
"""
|
||||
Handles changes to the message versioning display mode radio button.
|
||||
|
|
|
@ -256,9 +256,6 @@ def create_event_handlers():
|
|||
shared.gradio['unique_id'].select(
|
||||
ui.gather_interface_values, gradio(shared.input_elements), gradio('interface_state')).then(
|
||||
chat.handle_unique_id_select, gradio('interface_state'), gradio('history', 'display'), show_progress=False)
|
||||
shared.gradio['unique_id'].change(
|
||||
ui.gather_interface_values, gradio(shared.input_elements), gradio('interface_state')).then(
|
||||
message_versioning.handle_unique_id_change, gradio('history', 'unique_id', 'name1', 'name2', 'mode', 'chat_style', 'character_menu'), gradio('display'), show_progress=False)
|
||||
|
||||
shared.gradio['Start new chat'].click(
|
||||
ui.gather_interface_values, gradio(shared.input_elements), gradio('interface_state')).then(
|
||||
|
|
Loading…
Add table
Reference in a new issue