import os
import sys
import tempfile
import shutil
import zipfile
import tarfile
import click
import logging
from fnmatch import fnmatch
from typing import Callable, List, Optional, Tuple
# Optional dependencies for specialized archive handling or .gitignore parsing
try:
import rarfile
except ImportError:
rarfile = None
try:
import py7zr
except ImportError:
py7zr = None
try:
import pathspec
except ImportError:
pathspec = None
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
stream=sys.stderr,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------
# Handling Path Traversal in Tar Extraction
# ---------------------------------------------------------------------
def is_within_directory(directory: str, target: str) -> bool:
"""
Prevent path traversal beyond the intended extraction directory.
"""
abs_directory = os.path.abspath(directory)
abs_target = os.path.abspath(target)
return os.path.commonprefix([abs_directory, abs_target]) == abs_directory
def safe_extract(tar: tarfile.TarFile, path: str = ".", members=None) -> None:
"""
Extract all members from a tarfile safely, ensuring no path traversal occurs.
"""
for member in (members or tar.getmembers()):
member_path = os.path.join(path, member.name)
if not is_within_directory(path, member_path):
raise ValueError("Detected path traversal attempt.")
tar.extractall(path=path, members=members)
# ---------------------------------------------------------------------
# Archive Handlers
# ---------------------------------------------------------------------
def handle_zip(file_path: str, extract_dir: str) -> bool:
try:
with zipfile.ZipFile(file_path, "r") as zf:
zf.extractall(extract_dir)
return True
except zipfile.BadZipFile as e:
logger.warning(f"Bad ZIP file {file_path}: {str(e)}")
return False
def handle_rar(file_path: str, extract_dir: str) -> bool:
if not rarfile:
logger.warning("RAR handling requires 'rarfile' to be installed.")
return False
try:
with rarfile.RarFile(file_path, "r") as rf:
rf.extractall(extract_dir)
return True
except rarfile.Error as e:
logger.warning(f"RAR extraction failed: {str(e)}")
return False
def handle_7z(file_path: str, extract_dir: str) -> bool:
if not py7zr:
logger.warning("7z handling requires 'py7zr' to be installed.")
return False
try:
with py7zr.SevenZipFile(file_path, "r") as sz:
sz.extractall(extract_dir)
return True
except py7zr.exceptions.Bad7zFile as e:
logger.warning(f"7z extraction failed: {str(e)}")
return False
def handle_tar(file_path: str, extract_dir: str) -> bool:
try:
with tarfile.open(file_path, "r:*") as tf:
safe_extract(tf, extract_dir)
return True
except tarfile.TarError as e:
logger.warning(f"TAR extraction failed: {str(e)}")
return False
ARCHIVE_HANDLERS = {
".zip": handle_zip,
".rar": handle_rar,
".7z": handle_7z,
".tar": handle_tar,
".gz": handle_tar,
".bz2": handle_tar,
}
# Office documents use ZIP-based archives internally
OFFICE_EXTENSIONS = [".docx", ".xlsx", ".pptx", ".odt", ".ods", ".odp"]
# ---------------------------------------------------------------------
# Gitignore Handler
# ---------------------------------------------------------------------
class GitignoreHandler:
"""
Attempts to load and parse a .gitignore file using pathspec when available,
otherwise uses a basic fnmatch fallback.
"""
def __init__(self, directory: str):
self.spec = None
self.rules = []
gitignore_file = os.path.join(directory, ".gitignore")
if os.path.isfile(gitignore_file):
with open(gitignore_file, "r", encoding="utf-8") as f:
lines = [line.strip() for line in f if line.strip() and not line.startswith("#")]
if pathspec:
self.spec = pathspec.PathSpec.from_lines("gitwildmatch", lines)
else:
self.rules = lines
def should_ignore(self, path_to_check: str) -> bool:
if self.spec:
return self.spec.match_file(path_to_check)
base = os.path.basename(path_to_check)
if os.path.isdir(path_to_check):
base += "/"
return any(fnmatch(base, rule) for rule in self.rules)
# ---------------------------------------------------------------------
# Output Formatting: Plain-text or XML
# ---------------------------------------------------------------------
class OutputFormatter:
def __init__(self, writer: Callable[[str], None], xml_mode: bool):
self.writer = writer
self.xml_mode = xml_mode
self.xml_index = 1
def write(self, path: str, content: str) -> None:
if self.xml_mode:
self.writer(f'')
self.writer(f' {path}</source>')
self.writer(' ')
# Indent content for clarity in XML
for line in content.splitlines():
self.writer(f' {line}')
self.writer(' ')
self.writer('')
self.xml_index += 1
else:
self.writer(path)
self.writer("---")
self.writer(content)
self.writer("")
self.writer("---")
# ---------------------------------------------------------------------
# File Processing Class
# ---------------------------------------------------------------------
class FileProcessor:
"""
Recursively processes files and directories, optionally extracting
archives and applying .gitignore rules. Outputs in plain text or XML.
"""
def __init__(
self,
extensions: Tuple[str, ...],
include_hidden: bool,
ignore_gitignore: bool,
ignore_patterns: Tuple[str, ...],
formatter: OutputFormatter,
max_depth: int = 5,
):
self.extensions = [ext.lower() for ext in extensions]
self.include_hidden = include_hidden
self.ignore_gitignore = ignore_gitignore
self.ignore_patterns = ignore_patterns
self.formatter = formatter
self.max_depth = max_depth
def process_path(self, path: str, depth: int = 0) -> None:
if depth > self.max_depth:
logger.warning(f"Max recursion depth ({self.max_depth}) reached at {path}.")
return
if os.path.isfile(path):
self._handle_file(path, depth)
elif os.path.isdir(path):
self._handle_directory(path, depth)
def _handle_file(self, path: str, depth: int) -> None:
ext = os.path.splitext(path)[1].lower()
# If archive or Office doc, handle as an archive; otherwise just read content
if ext in ARCHIVE_HANDLERS or ext in OFFICE_EXTENSIONS:
self._extract_and_recurse(path, ext, depth)
else:
self._read_and_output(path)
def _extract_and_recurse(self, path: str, ext: str, depth: int) -> None:
handler_func = ARCHIVE_HANDLERS.get(ext)
# Office documents use the ZIP handler
if ext in OFFICE_EXTENSIONS:
handler_func = ARCHIVE_HANDLERS[".zip"]
if not handler_func:
logger.warning(f"No valid handler for extension: {ext}")
return
with tempfile.TemporaryDirectory() as tmpdir:
success = handler_func(path, tmpdir)
if success:
self.process_path(tmpdir, depth + 1)
else:
logger.warning(f"Extraction failed for {path}")
def _read_and_output(self, path: str) -> None:
# Attempt to read as UTF-8, fallback to Latin-1
for encoding in ("utf-8", "latin-1"):
try:
with open(path, "r", encoding=encoding) as f:
content = f.read()
self.formatter.write(path, content)
return
except UnicodeDecodeError:
pass
except Exception as e:
logger.warning(f"Error reading file {path}: {str(e)}")
return
logger.warning(f"Could not read file {path} with provided encodings.")
def _handle_directory(self, directory: str, depth: int) -> None:
gitignore_handler = None
if not self.ignore_gitignore:
gitignore_handler = GitignoreHandler(directory)
for root, dirs, files in os.walk(directory):
if not self.include_hidden:
dirs[:] = [d for d in dirs if not d.startswith(".")]
files = [f for f in files if not f.startswith(".")]
if gitignore_handler:
dirs[:] = [d for d in dirs if not gitignore_handler.should_ignore(os.path.join(root, d))]
files = [f for f in files if not gitignore_handler.should_ignore(os.path.join(root, f))]
if self.ignore_patterns:
files = [
f
for f in files
if not any(fnmatch(f, pattern) for pattern in self.ignore_patterns)
]
if self.extensions:
files = [
f
for f in files
if any(f.lower().endswith(ext) for ext in self.extensions)
]
for file_name in sorted(files):
self.process_path(os.path.join(root, file_name), depth + 1)
# ---------------------------------------------------------------------
# Click CLI Declaration
# ---------------------------------------------------------------------
@click.command()
@click.argument("paths", nargs=-1, type=click.Path(exists=True))
@click.option("-e", "--extension", "extensions", multiple=True,
help="Specify file extensions (e.g., .txt, .md) to include.")
@click.option("--include-hidden", is_flag=True, default=False,
help="Whether to include hidden files and directories.")
@click.option("--ignore-gitignore", is_flag=True, default=False,
help="Disable .gitignore-based filtering.")
@click.option("--ignore", "ignore_patterns", multiple=True,
help="One or more glob patterns to exclude (e.g., *.log, temp*).")
@click.option("-o", "--output", "output_file", type=click.Path(writable=True),
help="File path to save results (stdout by default).")
@click.option("--xml", "output_xml", is_flag=True, default=False,
help="Output results in an XML-like format.")
@click.option("-d", "--max-depth", default=5,
help="Maximum recursion depth for nested archives.")
def cli(paths, extensions, include_hidden, ignore_gitignore, ignore_patterns,
output_file, output_xml, max_depth):
"""
This tool processes files and directories, including archive extraction
and Office document reading, optionally respecting .gitignore rules.
"""
writer = click.echo
file_handle = None
if output_file:
try:
file_handle = open(output_file, "w", encoding="utf-8")
writer = lambda msg: print(msg, file=file_handle)
except IOError as e:
logger.error(f"Could not open output file {output_file}: {str(e)}")
sys.exit(1)
formatter = OutputFormatter(writer, output_xml)
# Print optional XML headers/footers if needed
if output_xml:
writer("")
writer("")
processor = FileProcessor(
extensions=extensions,
include_hidden=include_hidden,
ignore_gitignore=ignore_gitignore,
ignore_patterns=ignore_patterns,
formatter=formatter,
max_depth=max_depth,
)
# Process each path recursively
for path in paths:
processor.process_path(path)
if output_xml:
writer("")
writer("
")
if file_handle:
file_handle.close()
# Entry point
if __name__ == "__main__":
cli()
URL: https://ib.bsb.br/files-to-prompt-ftppy