Spaces:
Sleeping
Sleeping
import logging | |
import os | |
import sqlite3 | |
import re | |
from datetime import datetime | |
import hashlib | |
import json | |
import math | |
import gradio as gr | |
import torah # Assuming you have your torah module | |
from utils import date_to_words, custom_normalize | |
from gematria import calculate_gematria, strip_diacritics | |
from urllib.parse import quote_plus | |
from gradio_calendar import Calendar | |
# --- Setup Logging --- | |
logger = logging.getLogger(__name__) | |
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') | |
# --- Constants and Global Variables --- | |
DATABASE_FILE = 'gematria.db' | |
ELS_CACHE_DB = 'els_cache.db' | |
DATABASE_TIMEOUT = 60 | |
ADJUSTMENT_CONSTANT = 137.035999177 # Use the more precise constant | |
# --- Helper Functions --- | |
def get_query_hash(func, args, kwargs): | |
key = (func.__name__, args, kwargs) | |
logger.debug(f"Generating query hash for key: {key}") | |
hash_value = hashlib.sha256(json.dumps(key).encode()).hexdigest() | |
logger.debug(f"Generated hash: {hash_value}") | |
return hash_value | |
def create_els_cache_table(): | |
if not os.path.exists(ELS_CACHE_DB): | |
logger.debug(f"Creating ELS cache table in {ELS_CACHE_DB}") | |
with sqlite3.connect(ELS_CACHE_DB) as conn: | |
conn.execute(''' | |
CREATE TABLE IF NOT EXISTS els_cache ( | |
query_hash TEXT PRIMARY KEY, | |
function_name TEXT, | |
args TEXT, | |
kwargs TEXT, | |
results TEXT | |
) | |
''') | |
logger.debug("ELS cache table created.") | |
else: | |
logger.debug(f"ELS cache table already exists in {ELS_CACHE_DB}") | |
def cached_process_json_files(func, *args, **kwargs): | |
logger.debug(f"Entering cached_process_json_files with func: {func}, args: {args}, kwargs: {kwargs}") | |
params = { | |
"function": f"{func.__module__}.{func.__name__}" | |
} | |
arg_names = func.__code__.co_varnames[:func.__code__.co_argcount] | |
for name, value in zip(arg_names, args): | |
params[name] = value | |
for name, value in kwargs.items(): | |
params[name] = value | |
params_json = json.dumps(params) | |
logger.debug(f"Parameters as JSON: {params_json}") | |
query_hash = get_query_hash(func, params_json, "") | |
create_els_cache_table() | |
try: | |
with sqlite3.connect(ELS_CACHE_DB, timeout=DATABASE_TIMEOUT) as conn: | |
cursor = conn.cursor() | |
cursor.execute( | |
"SELECT results FROM els_cache WHERE query_hash = ?", (query_hash,)) | |
result = cursor.fetchone() | |
if result: | |
logger.info(f"Cache hit for query: {query_hash}") | |
logger.debug(f"Cached result: {result[0]}") | |
return json.loads(result[0]) | |
except sqlite3.Error as e: | |
logger.error(f"Database error checking cache: {e}") | |
logger.info(f"Cache miss for query: {query_hash}") | |
results = func(*args, **kwargs) | |
logger.debug(f"Results from function call: {results}") | |
try: | |
with sqlite3.connect(ELS_CACHE_DB, timeout=DATABASE_TIMEOUT) as conn: | |
cursor = conn.cursor() | |
cursor.execute( | |
"INSERT INTO els_cache (query_hash, function_name, args, kwargs, results) VALUES (?, ?, ?, ?, ?)", | |
(query_hash, params["function"], params_json, json.dumps({}), json.dumps(results))) | |
conn.commit() | |
logger.debug("Cached results in database.") | |
except sqlite3.Error as e: | |
logger.error(f"Database error caching results: {e}") | |
logger.debug(f"Exiting cached_process_json_files, returning: {results}") | |
return results | |
def calculate_gematria_sum(text, date_words): | |
logger.debug(f"Entering calculate_gematria_sum with text: '{text}', date_words: '{date_words}'") | |
if text or date_words: | |
combined_input = f"{text} {date_words}" | |
logger.info(f"Calculating Gematria sum for input: {combined_input}") | |
numbers = re.findall(r'\d+', combined_input) | |
logger.debug(f"Extracted numbers: {numbers}") | |
text_without_numbers = re.sub(r'\d+', '', combined_input) | |
logger.debug(f"Text without numbers: {text_without_numbers}") | |
number_sum = sum(int(number) for number in numbers) | |
logger.debug(f"Sum of numbers: {number_sum}") | |
text_gematria = calculate_gematria(strip_diacritics(text_without_numbers)) | |
logger.debug(f"Gematria of text: {text_gematria}") | |
total_sum = text_gematria + number_sum | |
logger.debug(f"Calculated Gematria sum: {total_sum}") | |
logger.debug(f"Exiting calculate_gematria_sum, returning: {total_sum}") | |
return total_sum | |
else: | |
logger.debug("No input text or date words provided. Returning None.") | |
return None | |
def get_first_els_result_genesis(gematria_sum, tlang="en"): | |
"""Gets the first ELS result from Genesis (book 1) using cached processing.""" | |
logger.debug(f"Entering get_first_els_result_genesis with gematria_sum: {gematria_sum}, tlang: {tlang}") | |
results = cached_process_json_files( | |
torah.process_json_files, | |
1, 1, # Only Genesis (book 1 to 1) | |
gematria_sum, # Use gematria sum as step | |
"1,-1", # Default rounds_combination | |
0, # length | |
tlang, | |
True, # strip_spaces | |
True, # strip_in_braces | |
True # strip_diacritics_chk | |
) | |
if results: | |
logger.debug(f"First ELS result from Genesis: {results[0]}") | |
logger.debug(f"Exiting get_first_els_result_genesis, returning: {results[0]}") | |
return results[0] | |
else: | |
logger.debug("No ELS results found in Genesis.") | |
logger.debug(f"Exiting get_first_els_result_genesis, returning: None") | |
return None | |
def find_shortest_psalm_match(gematria_sum): | |
"""Finds the shortest Psalm entry in gematria.db.""" | |
logger.debug(f"Entering find_shortest_psalm_match with gematria_sum: {gematria_sum}") | |
with sqlite3.connect(DATABASE_FILE) as conn: | |
cursor = conn.cursor() | |
cursor.execute(''' | |
SELECT words, chapter, verse | |
FROM results | |
WHERE gematria_sum = ? AND book = 'Psalms' | |
ORDER BY LENGTH(words) ASC | |
LIMIT 1 | |
''', (gematria_sum,)) | |
result = cursor.fetchone() | |
if result: | |
logger.debug(f"Shortest Psalm match found: {result}") | |
logger.debug(f"Exiting find_shortest_psalm_match, returning: {result}") | |
return {"words": result[0], "chapter": result[1], "verse": result[2], "phrase_length": None} | |
logger.debug("No matching Psalm found.") | |
logger.debug(f"Exiting find_shortest_psalm_match, returning: None") | |
return None | |
def create_biblegateway_iframe(book_name, chapter, verse): | |
"""Creates an iframe HTML string for Bible Gateway.""" | |
logger.debug(f"Entering create_biblegateway_iframe with book_name: {book_name}, chapter: {chapter}, verse: {verse}") | |
encoded_book_name = quote_plus(book_name) | |
url = f"https://www.biblegateway.com/passage/?search={encoded_book_name}+{chapter}&version=CJB" | |
iframe = f'<iframe src="{url}" width="800" height="600"></iframe>' | |
logger.debug(f"Generated iframe: {iframe}") | |
logger.debug(f"Exiting create_biblegateway_iframe, returning: {iframe}") | |
return iframe | |
# --- Gradio Interface --- | |
with gr.Blocks() as app: | |
gr.Markdown("# Daily Psalm Finder") | |
with gr.Row(): | |
name_input = gr.Textbox(label="Name (optional)") | |
date_input = Calendar(label="Date", type="date") | |
with gr.Row(): | |
adjusted_sum_check = gr.Checkbox(label="Journal Sum + (Journal Sum / 137)", value=False) | |
with gr.Row(): | |
run_button = gr.Button("Find Psalm") | |
with gr.Row(): | |
iframe_output = gr.HTML(label="Psalm Display") | |
def main_function(name, date, adjusted_sum_check): | |
logger.debug(f"Entering main_function with name: '{name}', date: '{date}', adjusted_sum: {adjusted_sum_check}") | |
if isinstance(date, str): | |
date = datetime.strptime(date, "%Y-%m-%d") | |
logger.debug(f"Converted date string to datetime: {date}") | |
date_str = date.strftime("%Y-%m-%d") | |
logger.debug(f"Formatted date string: {date_str}") | |
date_words = date_to_words(date_str) | |
logger.debug(f"Date words: {date_words}") | |
initial_gematria_sum = calculate_gematria_sum(name, date_words) | |
logger.debug(f"Initial Gematria Sum: {initial_gematria_sum}") | |
if initial_gematria_sum is None: | |
logger.debug("Initial Gematria sum is None. Returning error message.") | |
return "Could not calculate initial Gematria sum." | |
# Adjust gematria sum if checkbox is checked | |
if adjusted_sum_check: | |
adjusted_sum = initial_gematria_sum + (initial_gematria_sum / ADJUSTMENT_CONSTANT) | |
initial_gematria_sum = math.ceil(adjusted_sum) # Use the adjusted sum | |
logger.debug(f"Adjusted Gematria Sum (using formula): {initial_gematria_sum}") | |
els_result = get_first_els_result_genesis(initial_gematria_sum, tlang='en') # Use adjusted sum here, and set target language to en | |
if not els_result: | |
logger.debug("No ELS result. Returning error message.") | |
return "No ELS result found in Genesis." | |
els_gematria_sum = calculate_gematria(els_result['result_text']) | |
logger.debug(f"ELS Gematria sum: {els_gematria_sum}") | |
psalm_match = find_shortest_psalm_match(els_gematria_sum) | |
logger.debug(f"Psalm Match result: {psalm_match}") | |
if not psalm_match: | |
logger.debug("No Psalm match. Returning error message.") | |
return "No matching Psalm found for the ELS result." | |
logger.debug("Creating Bible Gateway iframe...") | |
iframe_html = create_biblegateway_iframe("Psalms", psalm_match["chapter"], psalm_match["verse"]) | |
logger.debug(f"Returning iframe HTML: {iframe_html}") | |
return iframe_html | |
run_button.click( | |
main_function, | |
inputs=[name_input, date_input, adjusted_sum_check], | |
outputs=[iframe_output] | |
) | |
if __name__ == "__main__": | |
app.launch(share=False) |