files-to-prompt ftp.py

Slug: files-to-prompt-ftppy

22590 characters 2173 words
import os import sys import tempfile import shutil import zipfile import tarfile import click import logging from fnmatch import fnmatch from typing import Callable, List, Optional, Tuple # Optional dependencies for specialized archive handling or .gitignore parsing try: import rarfile except ImportError: rarfile = None try: import py7zr except ImportError: py7zr = None try: import pathspec except ImportError: pathspec = None logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", stream=sys.stderr, ) logger = logging.getLogger(__name__) # --------------------------------------------------------------------- # Handling Path Traversal in Tar Extraction # --------------------------------------------------------------------- def is_within_directory(directory: str, target: str) -> bool: """ Prevent path traversal beyond the intended extraction directory. """ abs_directory = os.path.abspath(directory) abs_target = os.path.abspath(target) return os.path.commonprefix([abs_directory, abs_target]) == abs_directory def safe_extract(tar: tarfile.TarFile, path: str = ".", members=None) -> None: """ Extract all members from a tarfile safely, ensuring no path traversal occurs. """ for member in (members or tar.getmembers()): member_path = os.path.join(path, member.name) if not is_within_directory(path, member_path): raise ValueError("Detected path traversal attempt.") tar.extractall(path=path, members=members) # --------------------------------------------------------------------- # Archive Handlers # --------------------------------------------------------------------- def handle_zip(file_path: str, extract_dir: str) -> bool: try: with zipfile.ZipFile(file_path, "r") as zf: zf.extractall(extract_dir) return True except zipfile.BadZipFile as e: logger.warning(f"Bad ZIP file {file_path}: {str(e)}") return False def handle_rar(file_path: str, extract_dir: str) -> bool: if not rarfile: logger.warning("RAR handling requires 'rarfile' to be installed.") return False try: with rarfile.RarFile(file_path, "r") as rf: rf.extractall(extract_dir) return True except rarfile.Error as e: logger.warning(f"RAR extraction failed: {str(e)}") return False def handle_7z(file_path: str, extract_dir: str) -> bool: if not py7zr: logger.warning("7z handling requires 'py7zr' to be installed.") return False try: with py7zr.SevenZipFile(file_path, "r") as sz: sz.extractall(extract_dir) return True except py7zr.exceptions.Bad7zFile as e: logger.warning(f"7z extraction failed: {str(e)}") return False def handle_tar(file_path: str, extract_dir: str) -> bool: try: with tarfile.open(file_path, "r:*") as tf: safe_extract(tf, extract_dir) return True except tarfile.TarError as e: logger.warning(f"TAR extraction failed: {str(e)}") return False ARCHIVE_HANDLERS = { ".zip": handle_zip, ".rar": handle_rar, ".7z": handle_7z, ".tar": handle_tar, ".gz": handle_tar, ".bz2": handle_tar, } # Office documents use ZIP-based archives internally OFFICE_EXTENSIONS = [".docx", ".xlsx", ".pptx", ".odt", ".ods", ".odp"] # --------------------------------------------------------------------- # Gitignore Handler # --------------------------------------------------------------------- class GitignoreHandler: """ Attempts to load and parse a .gitignore file using pathspec when available, otherwise uses a basic fnmatch fallback. """ def __init__(self, directory: str): self.spec = None self.rules = [] gitignore_file = os.path.join(directory, ".gitignore") if os.path.isfile(gitignore_file): with open(gitignore_file, "r", encoding="utf-8") as f: lines = [line.strip() for line in f if line.strip() and not line.startswith("#")] if pathspec: self.spec = pathspec.PathSpec.from_lines("gitwildmatch", lines) else: self.rules = lines def should_ignore(self, path_to_check: str) -> bool: if self.spec: return self.spec.match_file(path_to_check) base = os.path.basename(path_to_check) if os.path.isdir(path_to_check): base += "/" return any(fnmatch(base, rule) for rule in self.rules) # --------------------------------------------------------------------- # Output Formatting: Plain-text or XML # --------------------------------------------------------------------- class OutputFormatter: def __init__(self, writer: Callable[[str], None], xml_mode: bool): self.writer = writer self.xml_mode = xml_mode self.xml_index = 1 def write(self, path: str, content: str) -> None: if self.xml_mode: self.writer(f'') self.writer(f' {path}</source>') self.writer(' ') # Indent content for clarity in XML for line in content.splitlines(): self.writer(f' {line}') self.writer(' ') self.writer('') self.xml_index += 1 else: self.writer(path) self.writer("---") self.writer(content) self.writer("") self.writer("---") # --------------------------------------------------------------------- # File Processing Class # --------------------------------------------------------------------- class FileProcessor: """ Recursively processes files and directories, optionally extracting archives and applying .gitignore rules. Outputs in plain text or XML. """ def __init__( self, extensions: Tuple[str, ...], include_hidden: bool, ignore_gitignore: bool, ignore_patterns: Tuple[str, ...], formatter: OutputFormatter, max_depth: int = 5, ): self.extensions = [ext.lower() for ext in extensions] self.include_hidden = include_hidden self.ignore_gitignore = ignore_gitignore self.ignore_patterns = ignore_patterns self.formatter = formatter self.max_depth = max_depth def process_path(self, path: str, depth: int = 0) -> None: if depth > self.max_depth: logger.warning(f"Max recursion depth ({self.max_depth}) reached at {path}.") return if os.path.isfile(path): self._handle_file(path, depth) elif os.path.isdir(path): self._handle_directory(path, depth) def _handle_file(self, path: str, depth: int) -> None: ext = os.path.splitext(path)[1].lower() # If archive or Office doc, handle as an archive; otherwise just read content if ext in ARCHIVE_HANDLERS or ext in OFFICE_EXTENSIONS: self._extract_and_recurse(path, ext, depth) else: self._read_and_output(path) def _extract_and_recurse(self, path: str, ext: str, depth: int) -> None: handler_func = ARCHIVE_HANDLERS.get(ext) # Office documents use the ZIP handler if ext in OFFICE_EXTENSIONS: handler_func = ARCHIVE_HANDLERS[".zip"] if not handler_func: logger.warning(f"No valid handler for extension: {ext}") return with tempfile.TemporaryDirectory() as tmpdir: success = handler_func(path, tmpdir) if success: self.process_path(tmpdir, depth + 1) else: logger.warning(f"Extraction failed for {path}") def _read_and_output(self, path: str) -> None: # Attempt to read as UTF-8, fallback to Latin-1 for encoding in ("utf-8", "latin-1"): try: with open(path, "r", encoding=encoding) as f: content = f.read() self.formatter.write(path, content) return except UnicodeDecodeError: pass except Exception as e: logger.warning(f"Error reading file {path}: {str(e)}") return logger.warning(f"Could not read file {path} with provided encodings.") def _handle_directory(self, directory: str, depth: int) -> None: gitignore_handler = None if not self.ignore_gitignore: gitignore_handler = GitignoreHandler(directory) for root, dirs, files in os.walk(directory): if not self.include_hidden: dirs[:] = [d for d in dirs if not d.startswith(".")] files = [f for f in files if not f.startswith(".")] if gitignore_handler: dirs[:] = [d for d in dirs if not gitignore_handler.should_ignore(os.path.join(root, d))] files = [f for f in files if not gitignore_handler.should_ignore(os.path.join(root, f))] if self.ignore_patterns: files = [ f for f in files if not any(fnmatch(f, pattern) for pattern in self.ignore_patterns) ] if self.extensions: files = [ f for f in files if any(f.lower().endswith(ext) for ext in self.extensions) ] for file_name in sorted(files): self.process_path(os.path.join(root, file_name), depth + 1) # --------------------------------------------------------------------- # Click CLI Declaration # --------------------------------------------------------------------- @click.command() @click.argument("paths", nargs=-1, type=click.Path(exists=True)) @click.option("-e", "--extension", "extensions", multiple=True, help="Specify file extensions (e.g., .txt, .md) to include.") @click.option("--include-hidden", is_flag=True, default=False, help="Whether to include hidden files and directories.") @click.option("--ignore-gitignore", is_flag=True, default=False, help="Disable .gitignore-based filtering.") @click.option("--ignore", "ignore_patterns", multiple=True, help="One or more glob patterns to exclude (e.g., *.log, temp*).") @click.option("-o", "--output", "output_file", type=click.Path(writable=True), help="File path to save results (stdout by default).") @click.option("--xml", "output_xml", is_flag=True, default=False, help="Output results in an XML-like format.") @click.option("-d", "--max-depth", default=5, help="Maximum recursion depth for nested archives.") def cli(paths, extensions, include_hidden, ignore_gitignore, ignore_patterns, output_file, output_xml, max_depth): """ This tool processes files and directories, including archive extraction and Office document reading, optionally respecting .gitignore rules. """ writer = click.echo file_handle = None if output_file: try: file_handle = open(output_file, "w", encoding="utf-8") writer = lambda msg: print(msg, file=file_handle) except IOError as e: logger.error(f"Could not open output file {output_file}: {str(e)}") sys.exit(1) formatter = OutputFormatter(writer, output_xml) # Print optional XML headers/footers if needed if output_xml: writer("
") writer("") processor = FileProcessor( extensions=extensions, include_hidden=include_hidden, ignore_gitignore=ignore_gitignore, ignore_patterns=ignore_patterns, formatter=formatter, max_depth=max_depth, ) # Process each path recursively for path in paths: processor.process_path(path) if output_xml: writer("") writer(" ") if file_handle: file_handle.close() # Entry point if __name__ == "__main__": cli()
URL: https://ib.bsb.br/files-to-prompt-ftppy