WhatsapPY data

Slug: whatsappy

11806 characters 924 words
import pandas as pd import re from typing import List, Tuple, Optional, Dict, Any from pathlib import Path from dataclasses import dataclass from datetime import datetime, time import logging from enum import Enum class MessageType(Enum): TEXT = "text" MEDIA = "media" SYSTEM = "system" @dataclass class Message: date: datetime time: time sender: str content: str type: MessageType class WhatsAppDataProcessor: def __init__(self, file_path: str, config: Optional[Dict[str, Any]] = None): """Initialize the WhatsApp chat processor with optional configuration. Args: file_path: Path to the WhatsApp chat export file config: Optional configuration dictionary with processing settings Raises: FileNotFoundError: If the specified file doesn't exist """ self.file_path = Path(file_path) if not self.file_path.exists(): raise FileNotFoundError(f"Chat file not found: {file_path}") # Default configuration self.config = { 'max_line_length': 32767, # Excel's maximum cell content length 'preserve_emoji': True, 'remove_system_messages': False, 'date_format': '%d/%m/%y', 'time_format': '%H:%M:%S', 'output_encoding': 'utf-8', 'normalize_whitespace': True } if config: self.config.update(config) self.chat_data: Optional[str] = None self.data_frame: Optional[pd.DataFrame] = None # Set up logging logging.basicConfig(level=logging.INFO) self.logger = logging.getLogger(__name__) def _is_system_message(self, sender: str, content: str) -> bool: """Detect if a message is a system message. Args: sender: Message sender content: Message content Returns: bool indicating if the message is a system message """ system_patterns = [ r"changed the subject to", r"added \d", r"left$", r"changed this group's icon", r"Messages and calls are end-to-end encrypted", ] return any(re.search(pattern, content) for pattern in system_patterns) def _normalize_message(self, message: str) -> str: """Normalize a message by converting newlines and handling special characters. Args: message: The raw message text Returns: Normalized message as a single line """ if not self.config['normalize_whitespace']: return message # Preserve emojis if configured if self.config['preserve_emoji']: # Convert emojis to temporary placeholders emoji_pattern = r'[\U0001F000-\U0001F999]' emojis = re.finditer(emoji_pattern, message) emoji_map = {m.group(): f"__EMOJI_{i}__" for i, m in enumerate(emojis)} for emoji, placeholder in emoji_map.items(): message = message.replace(emoji, placeholder) # Normalize whitespace and newlines message = re.sub(r'\r\n|\r|\n', ' ', message) message = re.sub(r'\s+', ' ', message) message = message.strip() # Restore emojis if they were preserved if self.config['preserve_emoji']: for emoji, placeholder in emoji_map.items(): message = message.replace(placeholder, emoji) # Truncate if exceeds max length if len(message) > self.config['max_line_length']: message = message[:self.config['max_line_length']-3] + "..." return message def _detect_message_type(self, content: str) -> MessageType: """Detect the type of message based on its content. Args: content: Message content Returns: MessageType enum value """ media_patterns = [ r'<Media omitted>', r'image omitted', r'video omitted', r'audio omitted', r'document omitted', r'sticker omitted', r'GIF omitted' ] if any(re.search(pattern, content, re.IGNORECASE) for pattern in media_patterns): return MessageType.MEDIA return MessageType.TEXT def _process_message_chunk(self, chunk: List[Tuple[str, str, str, str]]) -> Optional[Message]: """Process a chunk of message lines into a single message entry. Args: chunk: List containing the message header and continuation lines Returns: Optional[Message] object containing the processed message data """ if not chunk: return None date_str, time_str, sender, first_line = chunk[0] continuation_lines = [line[0] for line in chunk[1:]] # Combine message lines full_message = first_line if continuation_lines: full_message += ' ' + ' '.join(continuation_lines) # Normalize the message normalized_message = self._normalize_message(full_message) # Convert date and time strings to proper types try: date = datetime.strptime(date_str, self.config['date_format']).date() time_obj = datetime.strptime(time_str, self.config['time_format']).time() except ValueError as e: self.logger.warning(f"Date/time parsing error: {e}") return None # Detect message type msg_type = self._detect_message_type(normalized_message) # Check if it's a system message if self._is_system_message(sender, normalized_message): msg_type = MessageType.SYSTEM if self.config['remove_system_messages']: return None return Message( date=date, time=time_obj, sender=sender.strip(), content=normalized_message, type=msg_type ) def parse_chat(self) -> None: """Parses the chat data into structured components. Raises: ValueError: If chat data isn't loaded or if no valid messages are found RuntimeError: If message parsing fails """ if self.chat_data is None: raise ValueError("Chat data is not loaded. Please run read_chat() first.") # Enhanced regex pattern for better message header detection date_time_pattern = ( r'^\[(?P<date>\d{2}/\d{2}/\d{2}), (?P<time>\d{2}:\d{2}:\d{2})\] ' r'(?P<sender>[^:]+): (?P<content>.*?)$' ) try: lines = [line.strip() for line in self.chat_data.split('\n') if line.strip()] messages: List[Message] = [] current_chunk = [] for line in lines: match = re.match(date_time_pattern, line) if match: # Process previous chunk if it exists if current_chunk: processed_msg = self._process_message_chunk(current_chunk) if processed_msg: messages.append(processed_msg) current_chunk = [] # Start new message chunk current_chunk.append(match.groups()) elif current_chunk: # Add continuation line current_chunk.append([line]) # Process the final chunk if current_chunk: processed_msg = self._process_message_chunk(current_chunk) if processed_msg: messages.append(processed_msg) if not messages: raise ValueError("No valid messages found in the chat data") # Convert messages to DataFrame self.data_frame = pd.DataFrame([ { 'Date': msg.date, 'Time': msg.time, 'Sender': msg.sender, 'Message': msg.content, 'Type': msg.type.value } for msg in messages ]) except Exception as e: raise RuntimeError(f"Error parsing chat data: {e}") def save_to_file(self, output_path: str = 'result.csv') -> None: """Saves the parsed data to a CSV file with proper encoding and escaping. Args: output_path: Path where the CSV file will be saved Raises: ValueError: If data frame is not created IOError: If there are issues saving the file """ if self.data_frame is None: raise ValueError("Data frame is not created. Please run parse_chat() first.") try: self.data_frame.to_csv( output_path, index=False, sep='\t', encoding=self.config['output_encoding'], quoting=1, # Quote all non-numeric fields escapechar='\\', # Use backslash as escape character date_format='%Y-%m-%d' # ISO format for dates ) self.logger.info(f"Successfully saved {len(self.data_frame)} messages to {output_path}") except IOError as e: raise IOError(f"Error saving to file: {e}") def read_chat(self) -> None: """Reads the chat data from the file with proper encoding handling. Raises: IOError: If there are issues reading the file """ try: with open(self.file_path, 'r', encoding=self.config['output_encoding'], errors='ignore') as file: self.chat_data = file.read() if not self.chat_data.strip(): raise ValueError("The chat file is empty") self.logger.info(f"Successfully read chat file: {self.file_path}") except IOError as e: raise IOError(f"Error reading chat file: {e}") def main(): try: # Example configuration config = { 'preserve_emoji': True, 'remove_system_messages': True, 'normalize_whitespace': True } processor = WhatsAppDataProcessor('chat.txt', config) processor.read_chat() processor.parse_chat() processor.save_to_file() except Exception as e: logging.error(f"Error processing chat: {e}") if __name__ == "__main__": main()
URL: https://ib.bsb.br/whatsappy
Ref. https://github.com/chris18369/Whatsappdata