import pandas as pd
import re
from typing import List, Tuple, Optional, Dict, Any
from pathlib import Path
from dataclasses import dataclass
from datetime import datetime, time
import logging
from enum import Enum
class MessageType(Enum):
TEXT = "text"
MEDIA = "media"
SYSTEM = "system"
@dataclass
class Message:
date: datetime
time: time
sender: str
content: str
type: MessageType
class WhatsAppDataProcessor:
def __init__(self, file_path: str, config: Optional[Dict[str, Any]] = None):
"""Initialize the WhatsApp chat processor with optional configuration.
Args:
file_path: Path to the WhatsApp chat export file
config: Optional configuration dictionary with processing settings
Raises:
FileNotFoundError: If the specified file doesn't exist
"""
self.file_path = Path(file_path)
if not self.file_path.exists():
raise FileNotFoundError(f"Chat file not found: {file_path}")
# Default configuration
self.config = {
'max_line_length': 32767, # Excel's maximum cell content length
'preserve_emoji': True,
'remove_system_messages': False,
'date_format': '%d/%m/%y',
'time_format': '%H:%M:%S',
'output_encoding': 'utf-8',
'normalize_whitespace': True
}
if config:
self.config.update(config)
self.chat_data: Optional[str] = None
self.data_frame: Optional[pd.DataFrame] = None
# Set up logging
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def _is_system_message(self, sender: str, content: str) -> bool:
"""Detect if a message is a system message.
Args:
sender: Message sender
content: Message content
Returns:
bool indicating if the message is a system message
"""
system_patterns = [
r"changed the subject to",
r"added \d",
r"left$",
r"changed this group's icon",
r"Messages and calls are end-to-end encrypted",
]
return any(re.search(pattern, content) for pattern in system_patterns)
def _normalize_message(self, message: str) -> str:
"""Normalize a message by converting newlines and handling special characters.
Args:
message: The raw message text
Returns:
Normalized message as a single line
"""
if not self.config['normalize_whitespace']:
return message
# Preserve emojis if configured
if self.config['preserve_emoji']:
# Convert emojis to temporary placeholders
emoji_pattern = r'[\U0001F000-\U0001F999]'
emojis = re.finditer(emoji_pattern, message)
emoji_map = {m.group(): f"__EMOJI_{i}__" for i, m in enumerate(emojis)}
for emoji, placeholder in emoji_map.items():
message = message.replace(emoji, placeholder)
# Normalize whitespace and newlines
message = re.sub(r'\r\n|\r|\n', ' ', message)
message = re.sub(r'\s+', ' ', message)
message = message.strip()
# Restore emojis if they were preserved
if self.config['preserve_emoji']:
for emoji, placeholder in emoji_map.items():
message = message.replace(placeholder, emoji)
# Truncate if exceeds max length
if len(message) > self.config['max_line_length']:
message = message[:self.config['max_line_length']-3] + "..."
return message
def _detect_message_type(self, content: str) -> MessageType:
"""Detect the type of message based on its content.
Args:
content: Message content
Returns:
MessageType enum value
"""
media_patterns = [
r'',
r'image omitted',
r'video omitted',
r'audio omitted',
r'document omitted',
r'sticker omitted',
r'GIF omitted'
]
if any(re.search(pattern, content, re.IGNORECASE) for pattern in media_patterns):
return MessageType.MEDIA
return MessageType.TEXT
def _process_message_chunk(self, chunk: List[Tuple[str, str, str, str]]) -> Optional[Message]:
"""Process a chunk of message lines into a single message entry.
Args:
chunk: List containing the message header and continuation lines
Returns:
Optional[Message] object containing the processed message data
"""
if not chunk:
return None
date_str, time_str, sender, first_line = chunk[0]
continuation_lines = [line[0] for line in chunk[1:]]
# Combine message lines
full_message = first_line
if continuation_lines:
full_message += ' ' + ' '.join(continuation_lines)
# Normalize the message
normalized_message = self._normalize_message(full_message)
# Convert date and time strings to proper types
try:
date = datetime.strptime(date_str, self.config['date_format']).date()
time_obj = datetime.strptime(time_str, self.config['time_format']).time()
except ValueError as e:
self.logger.warning(f"Date/time parsing error: {e}")
return None
# Detect message type
msg_type = self._detect_message_type(normalized_message)
# Check if it's a system message
if self._is_system_message(sender, normalized_message):
msg_type = MessageType.SYSTEM
if self.config['remove_system_messages']:
return None
return Message(
date=date,
time=time_obj,
sender=sender.strip(),
content=normalized_message,
type=msg_type
)
def parse_chat(self) -> None:
"""Parses the chat data into structured components.
Raises:
ValueError: If chat data isn't loaded or if no valid messages are found
RuntimeError: If message parsing fails
"""
if self.chat_data is None:
raise ValueError("Chat data is not loaded. Please run read_chat() first.")
# Enhanced regex pattern for better message header detection
date_time_pattern = (
r'^\[(?P\d{2}/\d{2}/\d{2}), (?P
Reference:
https://github.com/chris18369/Whatsappdata