Fixed s.to

This commit is contained in:
JMcrafter26 2025-12-08 19:27:48 +01:00
parent da75042805
commit dcd7c03ef2
10 changed files with 2682 additions and 19 deletions

736
test/host.py Normal file
View file

@ -0,0 +1,736 @@
import requests
import sys
import rich
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from rich.layout import Layout
from rich.text import Text
from rich import box
import os
import json
import subprocess
import inquirer
import webbrowser
console = Console()
module = {}
temp_dir = os.path.join(os.getcwd(), "temp_modules")
moduleHostExe = os.path.join(os.getcwd(), "modulehost.exe")
script_path = ""
current_search_results = []
current_search_query = ""
debug_mode = False
def setup():
# make sure temp directory exists
if not debug_mode:
os.makedirs(temp_dir, exist_ok=True)
if not os.path.exists(moduleHostExe):
console.print(f"[red]Error: {moduleHostExe} not found. Please ensure it is in the current directory.[/red]")
exit(1)
def get_module_info(manifest_url):
try:
response = requests.get(manifest_url)
response.raise_for_status()
manifest = response.json()
if not sanitize_manifest(manifest):
return None
module['manifest'] = manifest
# save manifest to a temporary file
with open(f"{temp_dir}/{module['filename']}_manifest.json", "w", encoding="utf-8") as manifest_file:
json.dump(manifest, manifest_file, ensure_ascii=False, indent=4)
return module
except requests.RequestException as e:
console.print(f"[red]Error fetching manifest:[/red] {e}")
return None
except ValueError:
console.print("[red]Error parsing JSON from manifest.[/red]")
return None
def sanitize_manifest(manifest):
required_fields = ['sourceName', 'scriptUrl', 'iconUrl', 'version']
for field in required_fields:
if field not in manifest:
console.print(f"[red]Manifest missing required field:[/red] {field}")
return False
# create safe filename (only alphanumeric, underscores, () and hyphens)
module['filename'] = ''.join(c for c in manifest['sourceName'] if c.isalnum() or c in (' ', '_', '-', '(', ')')).rstrip().replace(' ', '_')
return True
def get_module_script(module):
script_url = module['manifest']['scriptUrl']
try:
response = requests.get(script_url)
response.raise_for_status()
# Save script to a temporary file utf-8 encoded
with open(f"{temp_dir}/{module['filename']}.js", "w", encoding="utf-8") as script_file:
script_file.write(response.text)
return f"{temp_dir}/{module['filename']}.js"
except requests.RequestException as e:
console.print(f"[red]Error fetching script:[/red] {e}")
return None
def get_module_from_id(module_id):
modules_api = "https://library.cufiy.net/api/modules/get?id="
# validate module ID format (at least 3 characters, alphanumeric, case sensitive)
if not (3 <= len(module_id) <= 50 and module_id.isalnum()):
console.print("[red]Invalid Module ID format.[/red]")
return None
try:
response = requests.get(modules_api + module_id)
response.raise_for_status()
data = response.json()
if not data['data']:
console.print("[red]Module ID not found.[/red]")
return None
if 'manifestUrl' not in data['data']:
console.print("[red]Module ID not found or invalid response.[/red]")
return None
module['manifest'] = data['data']
return get_module_info(data['data']['manifestUrl'])
except requests.RequestException as e:
console.print(f"[red]Error fetching module by ID:[/red] {e}")
return None
def run_command(command, args, silent=True):
commands = {
"search": "searchResults",
"details": "extractDetails",
"episodes": "extractEpisodes",
"stream": "extractStreamUrl"
}
if command not in commands:
if not silent:
console.print(f"[red]Unknown command:[/red] {command}")
return None
if not silent:
console.print(f"[blue]Running command:[/blue] {command}")
# run the command using subprocess or other methods as needed and await the result
# {"status":"error","message":"Usage: node run_module.js --path <file> --function <fnName> [--param <jsonOrString>]","data":null,"debug":[]}
process_args = [moduleHostExe, "--path", args[0], "--function", commands[command]]
if debug_mode:
process_args.append("--debug")
process_args.append("--debug-full")
process_args.append("--trace-warnings")
if len(args) > 1:
process_args += ["--param", args[1]]
try:
result = subprocess.run(process_args, capture_output=True, text=False)
if debug_mode:
# load last command output and merge with current debug
debug_output = []
if os.path.exists("last_command_output.json"):
with open("last_command_output.json", "r", encoding="utf-8") as f:
try:
debug_output = json.load(f)
except Exception:
debug_output = []
# append current debug
try:
current_debug = json.loads(result.stdout.decode('utf-8', errors='replace'))
if isinstance(current_debug, list):
debug_output.extend(current_debug)
else:
debug_output.append(current_debug)
except Exception:
pass
# save merged debug output
with open("last_command_output.json", "w", encoding="utf-8") as f:
json.dump(debug_output, f, ensure_ascii=False, indent=4)
except Exception as e:
if not silent:
console.print(f"[red]Error running command:[/red] {e}")
return None
# decode output bytes to text with sensible fallbacks
stdout_bytes = result.stdout if result.stdout is not None else result.stderr
try:
stdout_text = stdout_bytes.decode('utf-8') if isinstance(stdout_bytes, (bytes, bytearray)) else str(stdout_bytes)
except Exception:
# fallback replace mode
try:
stdout_text = stdout_bytes.decode('utf-8', errors='replace') if isinstance(stdout_bytes, (bytes, bytearray)) else str(stdout_bytes)
except Exception:
stdout_text = ''
if result.returncode == 0:
if not silent:
console.print(f"[green]Command output:[/green] {stdout_text}")
# try to json parse the output (utf-8)
try:
output = json.loads(stdout_text)
# if status is success, return data
if output.get('status') == 'success':
return output.get('data')
else:
if not silent:
console.print(f"[red]Command failed:[/red] {output.get('message')}")
return None
except json.JSONDecodeError:
if not silent:
console.print("[red]Error parsing JSON output.[/red]")
return None
else:
if not silent:
console.print(f"[red]Command error:[/red] {result.stderr}")
return None
def cls():
os.system('cls' if os.name=='nt' else 'clear')
def show_header():
"""Display app header"""
header = Text("MODULE HOST", style="bold cyan", justify="center")
if 'manifest' in module:
subheader = Text(f"📦 {module['manifest']['sourceName']} v{module['manifest']['version']}",
style="dim cyan", justify="center")
console.print(Panel.fit(header, subtitle=subheader, border_style="cyan"))
else:
console.print(Panel.fit(header, border_style="cyan"))
console.print()
def app():
cls()
show_header()
# check if modules are saved in temp directory, if yes list them and ask if user wants to load one
existing_modules = [f[:-14] for f in os.listdir(temp_dir) if f.endswith("_manifest.json")]
if existing_modules:
console.print("[green]📚 Existing modules found[/green]")
console.print()
# add option to skip loading existing module
existing_modules.append(" Add new module")
questions = [
inquirer.List('module',
message="Select a module to load",
choices=existing_modules,
),
]
answers = inquirer.prompt(questions)
if answers:
if answers['module'] == " Add new module":
pass # Continue to add new module
else:
selected_module = answers['module']
# load manifest
with open(f"{temp_dir}/{selected_module}_manifest.json", "r", encoding="utf-8") as manifest_file:
manifest = json.load(manifest_file)
module['manifest'] = manifest
module['filename'] = selected_module
console.print(f"[green]✓ Module '{module['manifest']['sourceName']}' loaded successfully![/green]")
console.print()
if 'manifest' not in module:
cls()
show_header()
user_input = console.input("[cyan]🔗 Enter module manifest URL or module ID:[/cyan] ").strip()
if user_input.startswith("http"):
# If the input is a URL, fetch the module info directly
get_module_info(user_input)
else:
# Otherwise, treat it as a module ID
get_module_from_id(user_input)
if 'manifest' in module:
console.print(f"[green]✓ Module '{module['manifest']['sourceName']}' loaded successfully![/green]")
else:
console.print("[red]✗ Failed to load module.[/red]")
return
# if file already exists, skip downloading
if os.path.exists(f"{temp_dir}/{module['filename']}.js"):
script_path = f"{temp_dir}/{module['filename']}.js"
else:
script_path = get_module_script(module)
if script_path:
console.print(f"[green]✓ Module script saved[/green]")
# goto search
show_search(script_path)
def show_search(script_path, search_query=None):
"""Search page with navigation"""
global current_search_results, current_search_query
cls()
show_header()
if search_query is None:
search_query = console.input("[cyan]🔍 Enter search query:[/cyan] ").strip()
if not search_query:
search_query = "naruto"
current_search_query = search_query
console.print(f"\n[dim]Searching for '[cyan]{search_query}[/cyan]'...[/dim]")
search_results = run_command("search", [script_path, search_query], silent=True)
if search_results is None or len(search_results) == 0:
console.print("[red]✗ No search results found.[/red]\n")
input("Press Enter to search again...")
show_search(script_path)
return
current_search_results = search_results
cls()
show_header()
# Display search results in a nice panel
console.print(Panel(
f"[cyan]Search Results[/cyan]\n[dim]Found {len(search_results)} results for '{search_query}'[/dim]",
border_style="cyan"
))
console.print()
# Create choices with navigation options
choices = [f" {item['title']}" for item in search_results]
choices.append("" * 50)
choices.append("🔍 New Search")
choices.append("🚪 Exit")
questions = [
inquirer.List('result',
message="Select an option",
choices=choices,
),
]
answers = inquirer.prompt(questions)
if not answers:
return
selection = answers['result']
if selection == "🔍 New Search":
show_search(script_path)
return
elif selection == "🚪 Exit":
return
elif selection.startswith(""):
show_search(script_path, current_search_query)
return
# Find selected item
selected_index = None
for i, item in enumerate(search_results):
if f" {item['title']}" == selection:
selected_index = i
break
if selected_index is not None:
show_details(script_path, search_results[selected_index])
else:
show_search(script_path, current_search_query)
def show_details(script_path, item):
"""Details page with beautiful layout"""
cls()
show_header()
console.print(f"[dim]Loading details...[/dim]\n")
# Fetch details and merge with the original item data
details = run_command("details", [script_path, item['href']], silent=True)
if details:
item = {**item, **details[0]}
cls()
show_header()
# Display title in a prominent panel
title_panel = Panel(
Text(item['title'], style="bold cyan", justify="center"),
border_style="cyan",
box=box.DOUBLE
)
console.print(title_panel)
console.print()
# Display details in a nice table
details_table = Table(show_header=False, box=box.SIMPLE, padding=(0, 2))
details_table.add_column("Field", style="cyan", width=15)
details_table.add_column("Value", style="white")
if 'description' in item and item['description']:
details_table.add_row("📝 Description", item['description'][:200] + "..." if len(item.get('description', '')) > 200 else item.get('description', 'N/A'))
if 'aliases' in item and item['aliases']:
details_table.add_row("⏱️ Duration", item['aliases'])
if 'airdate' in item and item['airdate']:
details_table.add_row("📅 Air Date", item['airdate'])
console.print(Panel(details_table, title="[cyan]Details[/cyan]", border_style="dim"))
console.print()
else:
console.print("[red]✗ Failed to fetch details.[/red]\n")
input("Press Enter to go back...")
show_search(script_path, current_search_query)
return
console.print("[dim]Loading episodes...[/dim]\n")
episodes = run_command("episodes", [script_path, item['href']], silent=True)
if episodes:
# Display episodes in a panel
episodes_panel = Panel(
f"[cyan]Episodes[/cyan]\n[dim]{len(episodes)} episodes available[/dim]",
border_style="cyan"
)
console.print(episodes_panel)
console.print()
# Create episode choices with navigation
choices = [f" Episode {ep['number']}" for ep in episodes]
choices.append("" * 50)
choices.append("⬅️ Back to Search")
choices.append("🚪 Exit")
questions = [
inquirer.List('episode',
message="Select an episode",
choices=choices,
),
]
answers = inquirer.prompt(questions)
if not answers:
show_search(script_path, current_search_query)
return
selection = answers['episode']
if selection == "⬅️ Back to Search":
show_search(script_path, current_search_query)
return
elif selection == "🚪 Exit":
return
elif selection.startswith(""):
show_details(script_path, item)
return
# Extract episode number and find the episode
episode_num = int(selection.split("Episode ")[1])
episode_item = next((ep for ep in episodes if ep['number'] == episode_num), None)
if episode_item:
href = episode_item['href']
if not href.startswith("http"):
# prepend the base url from the module manifest if exists
base_url = module['manifest'].get('baseUrl', '')
href = f"{base_url}/{href.lstrip('/')}"
console.print(f"\n[dim]Fetching stream URL for Episode {episode_item['number']}...[/dim]")
stream_url = run_command("stream", [script_path, href], silent=True)
if stream_url:
# Handle different stream formats
stream_data = None
final_stream_url = None
streams = []
# Parse the response
if isinstance(stream_url, str):
# Check if it's a direct URL (starts with http)
if stream_url.startswith('http'):
final_stream_url = stream_url
else:
# Try to parse as JSON
try:
stream_data = json.loads(stream_url)
except:
# Not JSON, might be just text - skip it
console.print(f"[yellow]⚠ Unexpected stream format: {stream_url}[/yellow]\n")
input("Press Enter to continue...")
show_details(script_path, item)
return
elif isinstance(stream_url, dict):
stream_data = stream_url
elif isinstance(stream_url, list):
# Direct list of streams
streams = stream_url
# Extract URL from parsed data
if stream_data:
if 'streams' in stream_data and isinstance(stream_data['streams'], list) and len(stream_data['streams']) > 0:
# Handle flat list format: ['DUB', 'url1', 'SUB', 'url2']
raw_streams = stream_data['streams']
# Parse the flat list into structured streams
parsed_streams = []
i = 0
while i < len(raw_streams):
item_val = raw_streams[i]
# If it's not a URL, it's likely a label
if isinstance(item_val, str) and not item_val.startswith('http'):
# Next item should be the URL
if i + 1 < len(raw_streams) and isinstance(raw_streams[i + 1], str) and raw_streams[i + 1].startswith('http'):
parsed_streams.append({
'title': item_val,
'streamUrl': raw_streams[i + 1]
})
i += 2
else:
i += 1
elif isinstance(item_val, str) and item_val.startswith('http'):
# URL without label
parsed_streams.append({
'title': f'Server {len(parsed_streams) + 1}',
'streamUrl': item_val
})
i += 1
elif isinstance(item_val, dict):
# Already structured
parsed_streams.append(item_val)
i += 1
else:
i += 1
streams = parsed_streams
if len(streams) == 1:
final_stream_url = streams[0].get('streamUrl')
elif len(streams) > 1:
# Let user choose server
cls()
show_header()
console.print(Panel(
f"[cyan]Select Server for Episode {episode_item['number']}[/cyan]\n[dim]{len(streams)} servers available[/dim]",
border_style="cyan"
))
console.print()
server_choices = [f" {s.get('title', f'Server {i+1}')}" for i, s in enumerate(streams)]
server_choices.append("" * 50)
server_choices.append("⬅️ Back")
questions = [
inquirer.List('server',
message="Select a server",
choices=server_choices,
),
]
answers = inquirer.prompt(questions)
if answers and not answers['server'].startswith("") and answers['server'] != "⬅️ Back":
selected_server_idx = None
for i, s in enumerate(streams):
if f" {s.get('title', f'Server {i+1}')}" == answers['server']:
selected_server_idx = i
break
if selected_server_idx is not None:
final_stream_url = streams[selected_server_idx].get('streamUrl')
else:
show_details(script_path, item)
return
else:
# No valid streams found
console.print("[red]✗ No valid stream URLs found.[/red]\n")
input("Press Enter to continue...")
show_details(script_path, item)
return
elif 'streamUrl' in stream_data:
final_stream_url = stream_data['streamUrl']
else:
# Assume it's a direct URL
final_stream_url = str(stream_data) if stream_data else None
if final_stream_url:
cls()
show_header()
# Display stream URL
stream_panel = Panel(
f"[green]{final_stream_url}[/green]",
title=f"[cyan]Stream URL - Episode {episode_item['number']}[/cyan]",
border_style="green"
)
console.print(stream_panel)
console.print()
# Offer options to open/download
action_choices = [
"▶️ Open in VLC",
"🌐 Open in Browser",
"📺 Open in MPV",
"💾 Download with Browser",
"📋 Copy URL to Clipboard",
"" * 50,
"⬅️ Back to Episodes"
]
questions = [
inquirer.List('action',
message="What would you like to do?",
choices=action_choices,
),
]
answers = inquirer.prompt(questions)
if answers:
action = answers['action']
if action == "▶️ Open in VLC":
console.print(f"\n[dim]Opening in VLC...[/dim]")
try:
subprocess.Popen(['vlc', final_stream_url])
console.print("[green]✓ Opened in VLC[/green]\n")
except FileNotFoundError:
console.print("[yellow]⚠ VLC not found. Please install VLC or add it to PATH.[/yellow]\n")
input("Press Enter to continue...")
show_details(script_path, item)
elif action == "🌐 Open in Browser":
console.print(f"\n[dim]Opening in browser...[/dim]")
import webbrowser
webbrowser.open(final_stream_url)
console.print("[green]✓ Opened in browser[/green]\n")
input("Press Enter to continue...")
show_details(script_path, item)
elif action == "📺 Open in MPV":
console.print(f"\n[dim]Opening in MPV...[/dim]")
try:
subprocess.Popen(['mpv', final_stream_url])
console.print("[green]✓ Opened in MPV[/green]\n")
except FileNotFoundError:
console.print("[yellow]⚠ MPV not found. Please install MPV or add it to PATH.[/yellow]\n")
input("Press Enter to continue...")
show_details(script_path, item)
elif action == "💾 Download with Browser":
console.print(f"\n[dim]Opening download in browser...[/dim]")
import webbrowser
webbrowser.open(final_stream_url)
console.print("[green]✓ Download started in browser[/green]")
console.print("[dim]Note: Some streams may require a download manager or may not be directly downloadable.[/dim]\n")
input("Press Enter to continue...")
show_details(script_path, item)
elif action == "📋 Copy URL to Clipboard":
try:
import pyperclip
pyperclip.copy(final_stream_url)
console.print("[green]✓ URL copied to clipboard[/green]\n")
except ImportError:
console.print("[yellow]⚠ pyperclip not installed. Install with: pip install pyperclip[/yellow]")
console.print(f"\n[cyan]URL:[/cyan] {final_stream_url}\n")
input("Press Enter to continue...")
show_details(script_path, item)
elif action == "⬅️ Back to Episodes":
show_details(script_path, item)
else:
show_details(script_path, item)
else:
console.print("[red]✗ Failed to extract stream URL.[/red]\n")
input("Press Enter to continue...")
show_details(script_path, item)
else:
console.print("[red]✗ Failed to fetch stream URL.[/red]\n")
input("Press Enter to continue...")
show_details(script_path, item)
else:
console.print("[red]✗ Failed to fetch episodes.[/red]\n")
input("Press Enter to go back...")
show_search(script_path, current_search_query)
return
def debug_module(modulePath, debug_function = None, query = None):
"""Function to debug module in debug viewer"""
cls()
show_header()
# clear last debug output
if os.path.exists("last_command_output.json"):
os.remove("last_command_output.json")
if debug_function is not None:
console.print(f"[dim]Debugging module function '{debug_function}' at '{modulePath}' with query '{query}'...[/dim]\n")
debug_data = run_command(debug_function, [modulePath, query], silent=False)
else:
console.print(f"[dim]Running all tests for module at '{modulePath}'...[/dim]\n")
console.print("[dim]Use --function <search|details|episodes|stream> to debug a specific function and provide --query <query> for input.[/dim]\n")
if query is None:
query = "naruto"
print("[dim]1. Search Test[/dim]")
debug_data = run_command("search", [modulePath, query], silent=False)
print("\n[dim]2. Details Test[/dim]")
if debug_data and len(debug_data) > 0:
first_item = debug_data[0]
details_data = run_command("details", [modulePath, first_item['href']], silent=False)
print("\n[dim]3. Episodes Test[/dim]")
episodes_data = run_command("episodes", [modulePath, first_item['href']], silent=False)
if episodes_data and len(episodes_data) > 0:
first_episode = episodes_data[0]
print("\n[dim]4. Stream URL Test[/dim]")
stream_data = run_command("stream", [modulePath, first_episode['href']], silent=False)
console.print("\n[dim]Debugging complete and saved. Reload debug viewer to see changes.[/dim]")
exit(0)
if __name__ == "__main__":
# if --debug or -d is passed, run debug mode
if "--debug" in sys.argv or "-d" in sys.argv:
debug_mode = True
setup()
# expect --path <modulePath> --function <functionName> [--query <query>]
modulePath = None
functionName = None
query = None
if "--path" in sys.argv or "-p" in sys.argv:
path_index = sys.argv.index("--path") + 1
if path_index < len(sys.argv):
modulePath = sys.argv[path_index]
if "--function" in sys.argv or "-f" in sys.argv:
func_index = sys.argv.index("--function") + 1
if func_index < len(sys.argv):
functionName = sys.argv[func_index]
if "--query" in sys.argv or "-q" in sys.argv:
query_index = sys.argv.index("--query") + 1
if query_index < len(sys.argv):
query = sys.argv[query_index]
if modulePath is not None:
debug_module(modulePath, functionName, query)
else:
console.print("[red]✗ Please provide a module path with --path <modulePath>[/red]")
exit(1)
if "--help" in sys.argv or "-h" in sys.argv:
console.print("Usage: python host.py [--debug|-d] [--help|-h]")
console.print("--debug/-d : Run in debug mode to test a module")
console.print("--help/-h : Show this help message")
exit(0)
setup()
app()