Code for How to Recover Deleted Files with Python Tutorial


View on Github

file_recovery.py

import os
import sys
import argparse
import struct
import time
import logging
import subprocess
import signal
from datetime import datetime, timedelta
from pathlib import Path
import shutil
import binascii

# File signatures (magic numbers) for common file types
FILE_SIGNATURES = {
    'jpg': [bytes([0xFF, 0xD8, 0xFF, 0xE0]), bytes([0xFF, 0xD8, 0xFF, 0xE1])],
    'png': [bytes([0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A])],
    'gif': [bytes([0x47, 0x49, 0x46, 0x38, 0x37, 0x61]), bytes([0x47, 0x49, 0x46, 0x38, 0x39, 0x61])],
    'pdf': [bytes([0x25, 0x50, 0x44, 0x46])],
    'zip': [bytes([0x50, 0x4B, 0x03, 0x04])],
    'docx': [bytes([0x50, 0x4B, 0x03, 0x04, 0x14, 0x00, 0x06, 0x00])],  # More specific signature
    'xlsx': [bytes([0x50, 0x4B, 0x03, 0x04, 0x14, 0x00, 0x06, 0x00])],  # More specific signature
    'pptx': [bytes([0x50, 0x4B, 0x03, 0x04, 0x14, 0x00, 0x06, 0x00])],  # More specific signature
    'mp3': [bytes([0x49, 0x44, 0x33])],
    'mp4': [bytes([0x00, 0x00, 0x00, 0x18, 0x66, 0x74, 0x79, 0x70])],
    'avi': [bytes([0x52, 0x49, 0x46, 0x46])],
}

# Additional validation patterns to check after finding the signature
# This helps reduce false positives
VALIDATION_PATTERNS = {
    'docx': [b'word/', b'[Content_Types].xml'],
    'xlsx': [b'xl/', b'[Content_Types].xml'],
    'pptx': [b'ppt/', b'[Content_Types].xml'],
    'zip': [b'PK\x01\x02'],  # Central directory header
    'pdf': [b'obj', b'endobj'],
}

# File endings (trailer signatures) for some file types
FILE_TRAILERS = {
    'jpg': bytes([0xFF, 0xD9]),
    'png': bytes([0x49, 0x45, 0x4E, 0x44, 0xAE, 0x42, 0x60, 0x82]),
    'gif': bytes([0x00, 0x3B]),
    'pdf': bytes([0x25, 0x25, 0x45, 0x4F, 0x46]),
}

# Maximum file sizes to prevent recovering corrupted files
MAX_FILE_SIZES = {
    'jpg': 30 * 1024 * 1024,  # 30MB
    'png': 50 * 1024 * 1024,  # 50MB
    'gif': 20 * 1024 * 1024,  # 20MB
    'pdf': 100 * 1024 * 1024,  # 100MB
    'zip': 200 * 1024 * 1024,  # 200MB
    'docx': 50 * 1024 * 1024,  # 50MB
    'xlsx': 50 * 1024 * 1024,  # 50MB
    'pptx': 100 * 1024 * 1024,  # 100MB
    'mp3': 50 * 1024 * 1024,  # 50MB
    'mp4': 1024 * 1024 * 1024,  # 1GB
    'avi': 1024 * 1024 * 1024,  # 1GB
}

class FileRecoveryTool:
    def __init__(self, source, output_dir, file_types=None, deep_scan=False, 
                 block_size=512, log_level=logging.INFO, skip_existing=True,
                 max_scan_size=None, timeout_minutes=None):
        """
        Initialize the file recovery tool
        
        Args:
            source (str): Path to the source device or directory
            output_dir (str): Directory to save recovered files
            file_types (list): List of file types to recover
            deep_scan (bool): Whether to perform a deep scan
            block_size (int): Block size for reading data
            log_level (int): Logging level
            skip_existing (bool): Skip existing files in output directory
            max_scan_size (int): Maximum number of bytes to scan
            timeout_minutes (int): Timeout in minutes
        """
        self.source = source
        self.output_dir = Path(output_dir)
        self.file_types = file_types if file_types else list(FILE_SIGNATURES.keys())
        self.deep_scan = deep_scan
        self.block_size = block_size
        self.skip_existing = skip_existing
        self.max_scan_size = max_scan_size
        self.timeout_minutes = timeout_minutes
        self.timeout_reached = False
        
        # Setup logging
        self.setup_logging(log_level)
        
        # Create output directory if it doesn't exist
        self.output_dir.mkdir(parents=True, exist_ok=True)
        
        # Statistics
        self.stats = {
            'total_files_recovered': 0,
            'recovered_by_type': {},
            'start_time': time.time(),
            'bytes_scanned': 0,
            'false_positives': 0
        }
        
        for file_type in self.file_types:
            self.stats['recovered_by_type'][file_type] = 0
    
    def setup_logging(self, log_level):
        """Set up logging configuration"""
        logging.basicConfig(
            level=log_level,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.StreamHandler(),
                logging.FileHandler(f"recovery_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")
            ]
        )
        self.logger = logging.getLogger('file_recovery')
    
    def _setup_timeout(self):
        """Set up a timeout handler"""
        if self.timeout_minutes:
            def timeout_handler(signum, frame):
                self.logger.warning(f"Timeout of {self.timeout_minutes} minutes reached!")
                self.timeout_reached = True
            
            # Set the timeout
            signal.signal(signal.SIGALRM, timeout_handler)
            signal.alarm(int(self.timeout_minutes * 60))
    
    def get_device_size(self):
        """Get the size of the device or file"""
        if os.path.isfile(self.source):
            # Regular file
            return os.path.getsize(self.source)
        else:
            # Block device
            try:
                # Try using blockdev command (Linux)
                result = subprocess.run(['blockdev', '--getsize64', self.source], 
                                      capture_output=True, text=True, check=True)
                return int(result.stdout.strip())
            except (subprocess.SubprocessError, FileNotFoundError):
                try:
                    # Try using ioctl (requires root)
                    import fcntl
                    with open(self.source, 'rb') as fd:
                        # BLKGETSIZE64 = 0x80081272
                        buf = bytearray(8)
                        fcntl.ioctl(fd, 0x80081272, buf)
                        return struct.unpack('L', buf)[0]
                except:
                    # Last resort: try to seek to the end
                    try:
                        with open(self.source, 'rb') as fd:
                            fd.seek(0, 2)  # Seek to end
                            return fd.tell()
                    except:
                        self.logger.warning("Could not determine device size. Using fallback size.")
                        # Fallback to a reasonable size for testing
                        return 1024 * 1024 * 1024  # 1GB
    
    def scan_device(self):
        """Scan the device for deleted files"""
        self.logger.info(f"Starting scan of {self.source}")
        self.logger.info(f"Looking for file types: {', '.join(self.file_types)}")
        
        try:
            # Get device size
            device_size = self.get_device_size()
            self.logger.info(f"Device size: {self._format_size(device_size)}")
            
            # Set up timeout if specified
            if self.timeout_minutes:
                self._setup_timeout()
                self.logger.info(f"Timeout set for {self.timeout_minutes} minutes")
            
            with open(self.source, 'rb', buffering=0) as device:  # buffering=0 for direct I/O
                self._scan_device_data(device, device_size)
                
        except (IOError, OSError) as e:
            self.logger.error(f"Error accessing source: {e}")
            return False
        
        self._print_summary()
        return True
    
    def _scan_device_data(self, device, device_size):
        """Scan the device data for file signatures"""
        position = 0
        
        # Limit scan size if specified
        if self.max_scan_size and self.max_scan_size < device_size:
            self.logger.info(f"Limiting scan to first {self._format_size(self.max_scan_size)} of device")
            device_size = self.max_scan_size
        
        # Create subdirectories for each file type
        for file_type in self.file_types:
            (self.output_dir / file_type).mkdir(exist_ok=True)
        
        scan_start_time = time.time()
        last_progress_time = scan_start_time
        
        # Read the device in blocks
        while position < device_size:
            # Check if timeout reached
            if self.timeout_reached:
                self.logger.warning("Stopping scan due to timeout")
                break
                
            try:
                # Seek to position first
                device.seek(position)
                
                # Read a block of data
                data = device.read(self.block_size)
                if not data:
                    break
                    
                self.stats['bytes_scanned'] += len(data)
                
                # Check for file signatures in this block
                for file_type in self.file_types:
                    signatures = FILE_SIGNATURES.get(file_type, [])
                    
                    for signature in signatures:
                        sig_pos = data.find(signature)
                        
                        if sig_pos != -1:
                            # Found a file signature, try to recover the file
                            absolute_pos = position + sig_pos
                            device.seek(absolute_pos)
                            
                            self.logger.debug(f"Found {file_type} signature at position {absolute_pos}")
                            
                            # Recover the file
                            if self._recover_file(device, file_type, absolute_pos):
                                self.stats['total_files_recovered'] += 1
                                self.stats['recovered_by_type'][file_type] += 1
                            else:
                                self.stats['false_positives'] += 1
                            
                            # Reset position to continue scanning
                            device.seek(position + self.block_size)
                
                # Update position and show progress
                position += self.block_size
                current_time = time.time()
                
                # Show progress every 5MB or 10 seconds, whichever comes first
                if (position % (5 * 1024 * 1024) == 0) or (current_time - last_progress_time >= 10):
                    percent = (position / device_size) * 100 if device_size > 0 else 0
                    elapsed = current_time - self.stats['start_time']
                    
                    # Calculate estimated time remaining
                    if position > 0 and device_size > 0:
                        bytes_per_second = position / elapsed if elapsed > 0 else 0
                        remaining_bytes = device_size - position
                        eta_seconds = remaining_bytes / bytes_per_second if bytes_per_second > 0 else 0
                        eta_str = str(timedelta(seconds=int(eta_seconds)))
                    else:
                        eta_str = "unknown"
                    
                    self.logger.info(f"Progress: {percent:.2f}% ({self._format_size(position)} / {self._format_size(device_size)}) - "
                                    f"{self.stats['total_files_recovered']} files recovered - "
                                    f"Elapsed: {timedelta(seconds=int(elapsed))} - ETA: {eta_str}")
                    last_progress_time = current_time
                    
            except Exception as e:
                self.logger.error(f"Error reading at position {position}: {e}")
                position += self.block_size  # Skip this block and continue
    
    def _validate_file_content(self, data, file_type):
        """
        Additional validation to reduce false positives
        
        Args:
            data: File data to validate
            file_type: Type of file to validate
            
        Returns:
            bool: True if file content appears valid
        """
        # Check minimum size
        if len(data) < 100:
            return False
            
        # Check for validation patterns
        patterns = VALIDATION_PATTERNS.get(file_type, [])
        if patterns:
            for pattern in patterns:
                if pattern in data:
                    return True
            return False  # None of the patterns were found
            
        # For file types without specific validation patterns
        return True
    
    def _recover_file(self, device, file_type, start_position):
        """
        Recover a file of the given type starting at the given position
        
        Args:
            device: Open file handle to the device
            file_type: Type of file to recover
            start_position: Starting position of the file
            
        Returns:
            bool: True if file was recovered successfully
        """
        max_size = MAX_FILE_SIZES.get(file_type, 10 * 1024 * 1024)  # Default to 10MB
        trailer = FILE_TRAILERS.get(file_type)
        
        # Generate a unique filename
        filename = f"{file_type}_{start_position}_{int(time.time())}_{binascii.hexlify(os.urandom(4)).decode()}.{file_type}"
        output_path = self.output_dir / file_type / filename
        
        if self.skip_existing and output_path.exists():
            self.logger.debug(f"Skipping existing file: {output_path}")
            return False
        
        # Save the current position to restore later
        current_pos = device.tell()
        
        try:
            # Seek to the start of the file
            device.seek(start_position)
            
            # Read the file data
            if trailer and self.deep_scan:
                # If we know the trailer and deep scan is enabled, read until trailer
                file_data = self._read_until_trailer(device, trailer, max_size)
            else:
                # Otherwise, use heuristics to determine file size
                file_data = self._read_file_heuristic(device, file_type, max_size)
            
            if not file_data or len(file_data) < 100:  # Ignore very small files
                return False
                
            # Additional validation to reduce false positives
            if not self._validate_file_content(file_data, file_type):
                self.logger.debug(f"Skipping invalid {file_type} file at position {start_position}")
                return False
                
            # Write the recovered file
            with open(output_path, 'wb') as f:
                f.write(file_data)
                
            self.logger.info(f"Recovered {file_type} file: {filename} ({self._format_size(len(file_data))})")
            return True
            
        except Exception as e:
            self.logger.error(f"Error recovering file at position {start_position}: {e}")
            return False
        finally:
            # Restore the original position
            try:
                device.seek(current_pos)
            except:
                pass  # Ignore seek errors in finally block
    
    def _read_until_trailer(self, device, trailer, max_size):
        """Read data until a trailer signature is found or max size is reached"""
        buffer = bytearray()
        chunk_size = 4096
        
        while len(buffer) < max_size:
            try:
                chunk = device.read(chunk_size)
                if not chunk:
                    break
                    
                buffer.extend(chunk)
                
                # Check if trailer is in the buffer
                trailer_pos = buffer.find(trailer, max(0, len(buffer) - len(trailer) - chunk_size))
                if trailer_pos != -1:
                    # Found trailer, return data up to and including the trailer
                    return buffer[:trailer_pos + len(trailer)]
            except Exception as e:
                self.logger.error(f"Error reading chunk: {e}")
                break
        
        # If we reached max size without finding a trailer, return what we have
        return buffer if len(buffer) > 100 else None
    
    def _read_file_heuristic(self, device, file_type, max_size):
        """
        Use heuristics to determine file size when trailer is unknown
        This is a simplified approach - real tools use more sophisticated methods
        """
        buffer = bytearray()
        chunk_size = 4096
        valid_chunks = 0
        invalid_chunks = 0
        
        # For Office documents and ZIP files, read a larger initial chunk to validate
        initial_chunk_size = 16384 if file_type in ['docx', 'xlsx', 'pptx', 'zip'] else chunk_size
        
        # Read initial chunk for validation
        initial_chunk = device.read(initial_chunk_size)
        if not initial_chunk:
            return None
            
        buffer.extend(initial_chunk)
        
        # For Office documents, check if it contains required elements
        if file_type in ['docx', 'xlsx', 'pptx', 'zip']:
            # Basic validation for Office Open XML files
            if file_type == 'docx' and b'word/' not in initial_chunk:
                return None
            if file_type == 'xlsx' and b'xl/' not in initial_chunk:
                return None
            if file_type == 'pptx' and b'ppt/' not in initial_chunk:
                return None
            if file_type == 'zip' and b'PK\x01\x02' not in initial_chunk:
                return None
        
        # Continue reading chunks
        while len(buffer) < max_size:
            try:
                chunk = device.read(chunk_size)
                if not chunk:
                    break
                    
                buffer.extend(chunk)
                
                # Simple heuristic: for binary files, check if chunk contains too many non-printable characters
                # This is a very basic approach and would need to be refined for real-world use
                if file_type in ['jpg', 'png', 'gif', 'pdf', 'zip', 'docx', 'xlsx', 'pptx', 'mp3', 'mp4', 'avi']:
                    # For binary files, we continue reading until we hit max size or end of device
                    valid_chunks += 1
                    
                    # For ZIP-based formats, check for corruption
                    if file_type in ['zip', 'docx', 'xlsx', 'pptx'] and b'PK' not in chunk and valid_chunks > 10:
                        # If we've read several chunks and don't see any more PK signatures, we might be past the file
                        invalid_chunks += 1
                    
                else:
                    # For text files, we could check for text validity
                    printable_ratio = sum(32 <= b <= 126 or b in (9, 10, 13) for b in chunk) / len(chunk)
                    if printable_ratio < 0.7:  # If less than 70% printable characters
                        invalid_chunks += 1
                    else:
                        valid_chunks += 1
                        
                # If we have too many invalid chunks in a row, stop
                if invalid_chunks > 3:
                    return buffer[:len(buffer) - (invalid_chunks * chunk_size)]
            except Exception as e:
                self.logger.error(f"Error reading chunk in heuristic: {e}")
                break
        
        return buffer
    
    def _format_size(self, size_bytes):
        """Format size in bytes to a human-readable string"""
        for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
            if size_bytes < 1024 or unit == 'TB':
                return f"{size_bytes:.2f} {unit}"
            size_bytes /= 1024
    
    def _print_summary(self):
        """Print a summary of the recovery operation"""
        elapsed = time.time() - self.stats['start_time']
        
        self.logger.info("=" * 50)
        self.logger.info("Recovery Summary")
        self.logger.info("=" * 50)
        self.logger.info(f"Total files recovered: {self.stats['total_files_recovered']}")
        self.logger.info(f"False positives detected and skipped: {self.stats['false_positives']}")
        self.logger.info(f"Total data scanned: {self._format_size(self.stats['bytes_scanned'])}")
        self.logger.info(f"Time elapsed: {timedelta(seconds=int(elapsed))}")
        self.logger.info("Files recovered by type:")
        
        for file_type, count in self.stats['recovered_by_type'].items():
            if count > 0:
                self.logger.info(f"  - {file_type}: {count}")
        
        if self.timeout_reached:
            self.logger.info("Note: Scan was stopped due to timeout")
            
        self.logger.info("=" * 50)


def main():
    """Main function to parse arguments and run the recovery tool"""
    parser = argparse.ArgumentParser(description='File Recovery Tool - Recover deleted files from storage devices')
    
    parser.add_argument('source', help='Source device or directory to recover files from (e.g., /dev/sdb, /media/usb)')
    parser.add_argument('output', help='Directory to save recovered files')
    
    parser.add_argument('-t', '--types', nargs='+', choices=FILE_SIGNATURES.keys(), default=None,
                        help='File types to recover (default: all supported types)')
    
    parser.add_argument('-d', '--deep-scan', action='store_true',
                        help='Perform a deep scan (slower but more thorough)')
    
    parser.add_argument('-b', '--block-size', type=int, default=512,
                        help='Block size for reading data (default: 512 bytes)')
    
    parser.add_argument('-v', '--verbose', action='store_true',
                        help='Enable verbose output')
    
    parser.add_argument('-q', '--quiet', action='store_true',
                        help='Suppress all output except errors')
    
    parser.add_argument('--no-skip', action='store_true',
                        help='Do not skip existing files in output directory')
    
    parser.add_argument('--max-size', type=int, 
                        help='Maximum size to scan in MB (e.g., 1024 for 1GB)')
    
    parser.add_argument('--timeout', type=int, default=None,
                        help='Stop scanning after specified minutes')
    
    args = parser.parse_args()
    
    # Set logging level based on verbosity
    if args.quiet:
        log_level = logging.ERROR
    elif args.verbose:
        log_level = logging.DEBUG
    else:
        log_level = logging.INFO
    
    # Convert max size from MB to bytes if specified
    max_scan_size = args.max_size * 1024 * 1024 if args.max_size else None
    
    # Create and run the recovery tool
    recovery_tool = FileRecoveryTool(
        source=args.source,
        output_dir=args.output,
        file_types=args.types,
        deep_scan=args.deep_scan,
        block_size=args.block_size,
        log_level=log_level,
        skip_existing=not args.no_skip,
        max_scan_size=max_scan_size,
        timeout_minutes=args.timeout
    )
    
    try:
        recovery_tool.scan_device()
    except KeyboardInterrupt:
        print("\nRecovery process interrupted by user.")
        recovery_tool._print_summary()
        sys.exit(1)


if __name__ == "__main__":
    main()


🕒 Limited-Time Offer!
EBook Image

Spring Promotion Special! Get our The Python Code eBook Bundle at 40% off. Limited time only!

$122.00 $73.20