Basic changes for new structure

Removed unnecessary .boogaplus saving+loading and structure validation
Fix replace_last_reply (using 'text' as Textbox when MultimodalTextbox)

TODO: Merge message_versioning.py into chat.py (depending on how to make html_generator.py implement versioning HTML)
This commit is contained in:
Th-Underscore 2025-05-22 18:28:08 -04:00
parent fee48eb2a8
commit b549ac9cfe
No known key found for this signature in database
GPG key ID: 8D0551EF8593B2F0
4 changed files with 46 additions and 330 deletions

View file

@ -17,7 +17,7 @@ from jinja2.sandbox import ImmutableSandboxedEnvironment
from PIL import Image
import modules.shared as shared
from modules import utils, message_versioning
from modules import utils
from modules.extensions import apply_extensions
from modules.html_generator import (
chat_html_wrapper,
@ -667,13 +667,6 @@ def generate_chat_reply_wrapper(text, state, regenerate=False, _continue=False):
send_dummy_reply(state['start_with'], state)
history = state['history']
initial_history_len = len(history['internal'])
if not regenerate and not _continue and text.strip():
temp_history = copy.deepcopy(history)
visible_text = html.escape(text)
temp_history['internal'].append([text, ''])
temp_history['visible'].append([visible_text, ''])
message_versioning.append_message_version(temp_history, state, is_bot=False)
last_save_time = time.monotonic()
save_interval = 8
@ -686,9 +679,6 @@ def generate_chat_reply_wrapper(text, state, regenerate=False, _continue=False):
save_history(history, state['unique_id'], state['character_menu'], state['mode'])
last_save_time = current_time
if len(history['internal']) > initial_history_len or regenerate or _continue:
message_versioning.append_message_version(history, state, is_bot=True)
save_history(history, state['unique_id'], state['character_menu'], state['mode'])
yield chat_html_wrapper(history, state['name1'], state['name2'], state['mode'], state['chat_style'], state['character_menu']), history
@ -721,8 +711,9 @@ def send_last_reply_to_input(history):
return ''
def replace_last_reply(text, state):
def replace_last_reply(textbox, state):
history = state['history']
text = textbox['text']
# Initialize metadata if not present
if 'metadata' not in history:
@ -835,7 +826,6 @@ def rename_history(old_id, new_id, character, mode):
else:
logger.info(f"Renaming \"{old_p}\" to \"{new_p}\"")
old_p.rename(new_p)
message_versioning.rename_history_data(old_id, new_id, character, mode)
def get_paths(state):
@ -1334,8 +1324,6 @@ def handle_replace_last_reply_click(text, state):
last_reply = state['history']['internal'][-1][1] if len(state['history']['internal']) > 0 else None
history = replace_last_reply(text, state)
save_history(history, state['unique_id'], state['character_menu'], state['mode'])
if len(history['internal']) > 0 and history['internal'][-1][1] != last_reply: # Differs from last reply
message_versioning.append_message_version(history, state, is_bot=True)
html = redraw_html(history, state['name1'], state['name2'], state['mode'], state['chat_style'], state['character_menu'])
return [history, html, {"text": "", "files": []}]
@ -1344,7 +1332,6 @@ def handle_replace_last_reply_click(text, state):
def handle_send_dummy_message_click(text, state):
history = send_dummy_message(text, state)
save_history(history, state['unique_id'], state['character_menu'], state['mode'])
message_versioning.append_message_version(history, state, is_bot=False)
html = redraw_html(history, state['name1'], state['name2'], state['mode'], state['chat_style'], state['character_menu'])
return [history, html, {"text": "", "files": []}]
@ -1353,7 +1340,6 @@ def handle_send_dummy_message_click(text, state):
def handle_send_dummy_reply_click(text, state):
history = send_dummy_reply(text, state)
save_history(history, state['unique_id'], state['character_menu'], state['mode'])
message_versioning.append_message_version(history, state, is_bot=True)
html = redraw_html(history, state['name1'], state['name2'], state['mode'], state['chat_style'], state['character_menu'])
return [history, html, {"text": "", "files": []}]
@ -1401,7 +1387,6 @@ def handle_delete_chat_confirm_click(state):
index = str(all_histories.index(unique_id_to_delete))
delete_history(unique_id_to_delete, character_to_delete, mode_to_delete)
message_versioning.clear_history_data(unique_id_to_delete, character_to_delete, mode_to_delete)
# Load the next appropriate history
history, unique_id = load_history_after_deletion(state, index)

View file

@ -397,8 +397,8 @@ def generate_instruct_html(history):
row_visible = history['visible'][i]
row_internal = history['internal'][i]
converted_visible = [convert_to_markdown_wrapped(entry, message_id=i, use_cache=i != len(history['visible']) - 1) for entry in row_visible]
versioning_nav_user = message_versioning.get_message_version_nav_elements(i, 0)
versioning_nav_bot = message_versioning.get_message_version_nav_elements(i, 1)
versioning_nav_user = message_versioning.get_message_version_nav_elements(history, i, 0)
versioning_nav_bot = message_versioning.get_message_version_nav_elements(history, i, 1)
# Get timestamps
user_timestamp = format_message_timestamp(history, "user", i)
@ -470,8 +470,8 @@ def generate_cai_chat_html(history, name1, name2, style, character, reset_cache=
row_visible = history['visible'][i]
row_internal = history['internal'][i]
converted_visible = [convert_to_markdown_wrapped(entry, message_id=i, use_cache=i != len(history['visible']) - 1) for entry in row_visible]
versioning_nav_user = message_versioning.get_message_version_nav_elements(i, 0)
versioning_nav_bot = message_versioning.get_message_version_nav_elements(i, 1)
versioning_nav_user = message_versioning.get_message_version_nav_elements(history, i, 0)
versioning_nav_bot = message_versioning.get_message_version_nav_elements(history, i, 1)
# Get timestamps
user_timestamp = format_message_timestamp(history, "user", i)
@ -523,8 +523,8 @@ def generate_chat_html(history, name1, name2, reset_cache=False):
row_visible = history['visible'][i]
row_internal = history['internal'][i]
converted_visible = [convert_to_markdown_wrapped(entry, message_id=i, use_cache=i != len(history['visible']) - 1) for entry in row_visible]
versioning_nav_user = message_versioning.get_message_version_nav_elements(i, 0)
versioning_nav_bot = message_versioning.get_message_version_nav_elements(i, 1)
versioning_nav_user = message_versioning.get_message_version_nav_elements(history, i, 0)
versioning_nav_bot = message_versioning.get_message_version_nav_elements(history, i, 1)
# Get timestamps
user_timestamp = format_message_timestamp(history, "user", i)

View file

@ -6,193 +6,9 @@ from pathlib import Path
import json
import logging as logger
# --- History Storage Logic ---
# Global variables for the currently loaded history data and state
loaded_history = {'visible': [], 'internal': []}
last_state = {'character_menu': None, 'unique_id': None, 'mode': None, 'display_mode': 'html'}
def validate_list(lst: List, i: int):
"""Ensure list is properly extended to index i"""
if len(lst) <= i:
lst.extend([None] * (i + 1 - len(lst)))
def validate_history_structure(history_data: Dict, i: int):
"""Ensure history_data structure is properly extended to index i"""
for history_type in ['visible', 'internal']:
if history_type not in history_data:
history_data[history_type] = []
validate_list(history_data[history_type], i)
def init_history_entry(history_data: Dict, i: int):
"""Initialize history entry dicts at index i if nonexistent"""
for history_type in ['visible', 'internal']:
if history_data[history_type][i] is None or not isinstance(history_data[history_type][i], list):
history_data[history_type][i] = [
{'text': [], 'pos': 0}, # User message data
{'text': [], 'pos': 0} # Bot message data
]
def load_or_init_history_data(character: Optional[str], unique_id: Optional[str], mode: str) -> Dict:
"""
Loads history data from file if character and unique_id are provided, the file exists,
and the requested context (character, unique_id, mode) differs from the currently loaded context.
Otherwise, returns the existing in-memory history or a new empty history structure.
"""
global loaded_history, last_state
_character = last_state.get('character_menu')
_unique_id = last_state.get('unique_id')
_mode = last_state.get('mode')
# --- Check if requested context matches already loaded context ---
if (character == _character
and unique_id == _unique_id # noqa: W503
and mode == _mode # noqa: W503
and _character is not None): # noqa: W503
return loaded_history
# --- Context differs or history data not loaded, proceed with load/initialization ---
def _create_empty_history():
return {'visible': [], 'internal': []}
if not character or not unique_id:
logger.warning("Cannot load history data: Character or ID is missing.")
loaded_history = _create_empty_history()
return loaded_history
path = get_history_data_path(unique_id, character, mode)
if not path.exists():
loaded_history = _create_empty_history()
return loaded_history
try:
with open(path, 'r', encoding='utf-8') as f:
contents = f.read()
if contents:
loaded_data = json.loads(contents)
last_state['character_menu'] = character
last_state['unique_id'] = unique_id
last_state['mode'] = mode
loaded_history = loaded_data
return loaded_history
else:
loaded_history = _create_empty_history()
return loaded_history
except json.JSONDecodeError:
logger.error(f"Error decoding JSON from history data file: {path}. Initializing empty.", exc_info=True)
loaded_history = _create_empty_history()
return loaded_history
except Exception as e:
logger.error(f"Error loading message versioning history data from {path}: {e}", exc_info=True)
loaded_history = _create_empty_history()
return loaded_history
def append_message_version(history: Dict, state: Dict, is_bot=True):
"""
Append a message to the end of the currently loaded history data (loaded_history).
Requires state to potentially save the updated history.
"""
global loaded_history
# NOTE: Assumes the correct history for the state's character/unique_id is already loaded into loaded_history
msg_type = 1 if is_bot else 0
if not history or not history.get('visible') or not history.get('internal'):
logger.warning("Attempted to append to history data with invalid history.")
return
i = len(history['visible']) - 1
if i < 0 or len(history['visible'][i]) <= msg_type or len(history['internal'][i]) <= msg_type:
logger.warning(f"Input history index {i} or msg_type {msg_type} out of bounds for appending.")
return
visible_text = history['visible'][i][msg_type]
if not visible_text:
return
internal_text = history['internal'][i][msg_type]
try:
validate_history_structure(loaded_history, i)
init_history_entry(loaded_history, i)
# Append the strings to the respective lists in loaded_history
if 'text' not in loaded_history['visible'][i][msg_type]:
loaded_history['visible'][i][msg_type]['text'] = []
if 'text' not in loaded_history['internal'][i][msg_type]:
loaded_history['internal'][i][msg_type]['text'] = []
vis_list = loaded_history['visible'][i][msg_type]['text']
int_list = loaded_history['internal'][i][msg_type]['text']
vis_list.append(visible_text)
int_list.append(internal_text)
# Update position to the new last index
new_pos = len(vis_list) - 1
loaded_history['visible'][i][msg_type]['pos'] = new_pos
loaded_history['internal'][i][msg_type]['pos'] = new_pos
character = state['character_menu']
unique_id = state['unique_id']
mode = state['mode']
if character and unique_id:
save_history_data(character, unique_id, mode, loaded_history)
else:
logger.warning("Could not save history data after append: character or unique_id missing in state.")
except IndexError:
logger.error(f"IndexError adding message version: index={i}, msg_type={msg_type}. Message versions: {loaded_history}", exc_info=True)
except Exception as e:
logger.error(f"Error adding message version: {e}", exc_info=True)
return
def save_history_data(character: str, unique_id: str, mode: str, history_data: Dict) -> bool:
"""Save the provided history data structure to disk"""
if not unique_id or not character:
logger.warning("Cannot save history data: Character or ID is not set.")
return False
current_mode = mode or last_state.get('mode') or shared.persistent_interface_state.get('mode', 'chat')
path = get_history_data_path(unique_id, character, current_mode)
try:
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, 'w', encoding='utf-8') as f:
data_to_save = {'visible': history_data['visible'], 'internal': history_data['internal']}
json.dump(data_to_save, f, indent=4)
return True
except TypeError as e:
logger.error(f"TypeError saving history data (possibly non-serializable data): {e}. Data: {history_data}", exc_info=True)
except Exception as e:
logger.error(f"Error saving message versioning history data to {path}: {e}", exc_info=True)
return False
def get_history_data_path(unique_id: str, character: str, mode: str) -> Path:
"""Get the path to the history data file"""
if not unique_id or not character or not mode:
raise ValueError("unique_id, character, and mode must be provided to get history data path.")
try:
import modules.chat as chat
base_history_path = chat.get_history_file_path(unique_id, character, mode)
path = base_history_path.parent
except Exception as e:
# Fallback or error handling if get_history_file_path fails (shouldn't happen, but just in case)
logger.error(f"Failed to get base history path via get_history_file_path: {e}.", exc_info=True)
path.mkdir(parents=True, exist_ok=True)
storage_filename = f'{unique_id}.boogaplus'
final_path = path / storage_filename
return final_path
last_state = {
'display_mode': 'html', # Default display mode
}
# --- Functions to be called by integrated logic ---
@ -209,44 +25,6 @@ def get_versioning_display_mode():
return current_mode
def clear_history_data(unique_id: str, character: str, mode: str):
"""
Deletes the history data file associated with a history.
NOTE: This function does NOT clear the in-memory loaded_history.
The caller should handle resetting/reloading the in-memory data if the deleted history was the active one.
"""
try:
history_data_path = get_history_data_path(unique_id, character, mode)
if history_data_path.exists():
history_data_path.unlink()
else:
logger.warning(f"Message versioning history data file not found for deletion: {history_data_path}")
except Exception as e:
logger.error(f"Error deleting message versioning history data file {history_data_path}: {e}", exc_info=True)
def rename_history_data(old_id: str, new_id: str, character: str, mode: str):
"""
Renames the history data file when a history is renamed.
NOTE: This function does NOT update the in-memory loaded_history if the renamed history was the active one.
The caller should handle reloading the data under the new ID if necessary.
"""
try:
old_history_data_path = get_history_data_path(old_id, character, mode)
new_history_data_path = get_history_data_path(new_id, character, mode)
if new_history_data_path.exists():
logger.warning(f"Cannot rename message versioning history data: Target path {new_history_data_path} already exists.")
return
if old_history_data_path.exists():
old_history_data_path.rename(new_history_data_path)
else:
logger.warning(f"Message versioning history data file not found for renaming: {old_history_data_path}")
except Exception as e:
logger.error(f"Error renaming message versioning history data file from {old_id} to {new_id}: {e}", exc_info=True)
# --- Core Logic ---
@ -280,24 +58,24 @@ def length(data: Iterable):
return 0
def get_message_metadata(history: dict, history_index: int, msg_type: int) -> dict:
msg_role = 'assistant' if msg_type == 1 else 'user'
return recursive_get(history, ['metadata', f'{msg_role}_{history_index}'])
# --- Core Functions ---
def get_message_positions(history_index: int, msg_type: int):
def get_message_positions(history: dict, history_index: int, msg_type: int):
"""
Get the message position and total message versions for a specific message from the currently loaded history data.
Assumes the correct history data has already been loaded by the calling context.
"""
global loaded_history
# Path: ['visible' or 'internal'][history_index][msg_type]['pos' or 'text']
msg_data = recursive_get(loaded_history, ['visible', history_index, msg_type])
msg_data = get_message_metadata(history, history_index, msg_type)
if msg_data is None:
logger.warning(f"History data not found for index {history_index}, type {msg_type}. Was history loaded correctly before calling this?")
return 0, 0
pos = recursive_get(msg_data, ['pos'], 0)
total = length(recursive_get(msg_data, ['text'], []))
pos = msg_data.get('current_message_index', 0)
total = length(msg_data.get('versions', []))
return pos, total
@ -307,15 +85,14 @@ def navigate_message_version(history_index: float, msg_type: float, direction: s
Updates the history dictionary directly and returns the modified history.
The calling function will be responsible for regenerating the HTML.
"""
global loaded_history, last_history_index, last_msg_type
global last_history_index, last_msg_type
try:
i = int(history_index)
m_type = int(msg_type)
last_history_index = i # Default to current index/type
last_msg_type = m_type
load_or_init_history_data(character, unique_id, mode)
current_pos, total_pos = get_message_positions(i, m_type)
current_pos, total_pos = get_message_positions(history, i, m_type)
if total_pos <= 1:
return history
@ -334,51 +111,36 @@ def navigate_message_version(history_index: float, msg_type: float, direction: s
logger.warning(f"Invalid navigation direction: {direction}")
return history
loaded_history_data = loaded_history
visible_msg_data = recursive_get(loaded_history_data, ['visible', i, m_type])
internal_msg_data = recursive_get(loaded_history_data, ['internal', i, m_type])
history_metadata = history['metadata']
msg_metadata = get_message_metadata(history, i, m_type)
if msg_metadata is None:
logger.error(f"Message metadata not found for index {i}, type {m_type}. Data: {history_metadata}")
return history
versions = msg_metadata.get('versions', [])
new_ver = versions[new_pos]
if new_ver is None:
logger.error(f"Message version not found for metadata index {i}, type {m_type}, version index {new_pos}. Data: {history_metadata}")
return history
if not visible_msg_data or not internal_msg_data or 'text' not in visible_msg_data or 'text' not in internal_msg_data:
logger.error(f"Loaded history data structure invalid during navigation for index {i}, type {m_type}. Data: {loaded_history_data}")
ver_content = new_ver['content']
visible_ver_content = new_ver['visible_content']
if ver_content is None or visible_ver_content is None:
logger.error(f"Loaded version data structure invalid during navigation for index {i}, type {m_type}. Data: {history_metadata}")
return history
# Check bounds for the new position against the text lists
if new_pos < 0 or new_pos >= len(visible_msg_data['text']):
logger.error(f"Navigation error: new_pos {new_pos} out of bounds for visible history text list (len {len(visible_msg_data['text'])})")
return history
if new_pos < 0 or new_pos >= len(internal_msg_data['text']):
logger.error(f"Navigation error: new_pos {new_pos} out of bounds for internal history text list (len {len(internal_msg_data['text'])})")
if new_pos < 0 or new_pos >= len(msg_metadata['versions']):
logger.error(f"Navigation error: new_pos {new_pos} out of bounds for history text list (len {len(versions)})")
return history
new_visible = visible_msg_data['text'][new_pos]
new_internal = internal_msg_data['text'][new_pos]
# Update the position ('pos') within the loaded history data structure
visible_msg_data['pos'] = new_pos
internal_msg_data['pos'] = new_pos
if character and unique_id:
save_history_data(character, unique_id, mode, loaded_history_data)
else:
logger.warning("Could not save history data after navigation: character or unique_id missing in state.")
# Update the main chat history dictionary in the state
current_history = history
if i < len(current_history['visible']) and m_type < len(current_history['visible'][i]):
current_history['visible'][i][m_type] = new_visible
else:
logger.error(f"History structure invalid (visible) for update at index {i}, type {m_type}")
if i < len(current_history['internal']) and m_type < len(current_history['internal'][i]):
current_history['internal'][i][m_type] = new_internal
else:
logger.error(f"History structure invalid (internal) for update at index {i}, type {m_type}")
msg_metadata['current_message_index'] = new_pos # Updates history object
last_history_index = i
last_msg_type = m_type
return current_history
return history
except Exception as e:
logger.error(f"Error during message version navigation: {e}", exc_info=True)
@ -388,7 +150,7 @@ def navigate_message_version(history_index: float, msg_type: float, direction: s
# --- Functions to be called during HTML Generation ---
def get_message_version_nav_elements(history_index: int, msg_type: int):
def get_message_version_nav_elements(history: dict, history_index: int, msg_type: int):
"""
Generates the HTML for the message version navigation arrows and position indicator.
Should be called by the HTML generator.
@ -398,15 +160,14 @@ def get_message_version_nav_elements(history_index: int, msg_type: int):
return ""
try:
current_pos, total_pos = get_message_positions(history_index, msg_type)
current_pos, total_pos = get_message_positions(history, history_index, msg_type)
if total_pos <= 1:
return ""
left_activated_attr = f' activated="true" data-history-index="{history_index}" data-msg-type="{msg_type}" data-direction="left"' if current_pos > 0 else ''
right_activated_attr = f' activated="true" data-history-index="{history_index}" data-msg-type="{msg_type}" data-direction="right"' if current_pos < total_pos - 1 else ''
left_activated_attr = f' activated="true"' if current_pos > 0 else ''
right_activated_attr = f' activated="true"' if current_pos < total_pos - 1 else ''
# Similar size to copy/refresh icons
left_arrow_svg = '''<svg xmlns="http://www.w3.org/2000/svg" width="16" height="16" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><polyline points="15 18 9 12 15 6"></polyline></svg>'''
right_arrow_svg = '''<svg xmlns="http://www.w3.org/2000/svg" width="16" height="16" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><polyline points="9 18 15 12 9 6"></polyline></svg>'''
@ -442,33 +203,6 @@ def is_message_selected(history_index: int, msg_type: int) -> bool:
return last_history_index == history_index and last_msg_type == msg_type
# --- Functions to be called by Gradio Event Handlers ---
def handle_unique_id_change(history: Dict, unique_id: str, name1: str, name2: str, mode: str, chat_style: str, character: str):
"""
Handles changes to the unique_id.
Loads the corresponding message versioning history data.
"""
global last_history_index, last_msg_type
last_history_index = -1
last_msg_type = -1
if not unique_id:
logger.warning("handle_unique_id_change called with empty unique_id.")
return
import modules.chat as chat
if not character:
logger.warning(f"Cannot load history for unique_id {unique_id}: character_menu not found in state.")
# Avoid using stale data from a previous selection
load_or_init_history_data(None, unique_id, mode)
else:
load_or_init_history_data(character, unique_id, mode)
return chat.redraw_html(history, name1, name2, mode, chat_style, character)
def handle_display_mode_change(display_mode: str):
"""
Handles changes to the message versioning display mode radio button.

View file

@ -256,9 +256,6 @@ def create_event_handlers():
shared.gradio['unique_id'].select(
ui.gather_interface_values, gradio(shared.input_elements), gradio('interface_state')).then(
chat.handle_unique_id_select, gradio('interface_state'), gradio('history', 'display'), show_progress=False)
shared.gradio['unique_id'].change(
ui.gather_interface_values, gradio(shared.input_elements), gradio('interface_state')).then(
message_versioning.handle_unique_id_change, gradio('history', 'unique_id', 'name1', 'name2', 'mode', 'chat_style', 'character_menu'), gradio('display'), show_progress=False)
shared.gradio['Start new chat'].click(
ui.gather_interface_values, gradio(shared.input_elements), gradio('interface_state')).then(