How to Build a TCP Proxy with Python

Learn to build a TCP proxy in Python to intercept and analyze real-time data flow between client and server. Ideal for debugging, protocol analysis, and understanding network communication, this guide provides step-by-step implementation and practical examples.
  · 9 min read · Updated nov 2024 · Python Standard Library

Welcome! Meet our Python Code Assistant, your new coding buddy. Why wait? Start exploring now!

Introduction

In this tutorial, we’ll explore how to build a TCP proxy in Python. A TCP (Transmission Control Protocol) proxy is a server that sits between a client and a remote server, relaying data back and forth between them. This setup is useful in scenarios where you want to monitor, analyze, or modify the data being exchanged between two systems without disrupting the connection.

Table of Contents:

Real-Life Scenario

Imagine you’re troubleshooting a service that uses the FTP protocol. By setting up a TCP proxy between the FTP client and server, you can inspect the data in real-time. This is especially useful for analyzing protocols, debugging network issues, and learning how different applications communicate over the internet.

Requirements

Python3 and Kali Linux are what I'll be using for this demonstration. But you can use any OS you can access an FTP server on.

Implementation

Setting Up the Proxy Class

The main logic for our TCP proxy is wrapped in the TcpProxy class. Let’s go through each part of the code.

import sys
import socket
import threading
import time
from typing import Optional, Tuple, Dict

Imports: We import standard modules needed for network connections (socket), threading (threading), and other utilities like sys for command-line arguments, time for delays, and typing for type annotations.

class TcpProxy:
    def __init__(self):
        self._local_addr: str = ""
        self._local_port: int = 0
        self._remote_addr: str = ""
        self._remote_port: int = 0
        self._preload: bool = False
        self._backlog: int = 5
        self._chunk_size: int = 16
        self._timeout: int = 5
        self._buffer_size: int = 4096
        self._termination_flags: Dict[bytes, bool] = {
            b'220 ': True,
            b'331 ': True,
            b'230 ': True,
            b'530 ': True
        }

Initialization: We define several variables to configure the proxy, such as local_addr and local_port for the local binding address, remote_addr and remote_port for the target server, and _preload to determine if we should load remote data first.

Additional Configurations: _chunk_size controls how data is processed in chunks (in this case, 16 bytes), _timeout defines the time to wait for data before giving up, and _buffer_size sets the amount of data to read at a time. _termination_flags contains markers that signal certain statuses in protocols like FTP (e.g., 220 means service ready, 331 requests a password).

Data Processing and Stream Extraction

    def _process_data(self, stream: bytes) -> None:
        # Transform data stream for analysis
        for offset in range(0, len(stream), self._chunk_size):
            block = stream[offset:offset + self._chunk_size]
            # Format block representation
            bytes_view = ' '.join(f'{byte:02X}' for byte in block)
            text_view = ''.join(chr(byte) if 32 <= byte <= 126 else '.' for byte in block)
            # Display formatted line
            print(f"{offset:04X}   {bytes_view:<{self._chunk_size * 3}}   {text_view}")

Explanation: This method breaks down the incoming data (stream) into readable chunks. bytes_view represents each byte in hexadecimal form, while text_view shows printable ASCII characters.

Purpose: This method is helpful for analyzing raw data, such as binary or encoded text, in both hex and ASCII formats.

    def _extract_stream(self, conn: socket.socket) -> bytes:
        # Extract data stream from connection
        accumulator = b''
        conn.settimeout(self._timeout)
        try:
            while True:
                fragment = conn.recv(self._buffer_size)
                if not fragment:
                    break        
                accumulator += fragment
                # Check for protocol markers
                if accumulator.endswith(b'\r\n'):
                    for flag in self._termination_flags:
                        if flag in accumulator:
                            return accumulator
        except socket.timeout:
            pass
        return accumulator

Explanation: This method reads data from a connection in fragments until there is no more data or the connection times out. It checks for specific protocol markers to determine if a response or request is complete (e.g., messages that end with \r\n).

Purpose: By breaking data into manageable chunks and looking for protocol-specific markers, this method ensures smooth data handling and avoids partial data issues.

    def _monitor_stream(self, direction: str, stream: bytes) -> bytes:
        # Monitor and decode stream content
        try:
            content = stream.decode('utf-8').strip()
            marker = ">>>" if direction == "in" else "<<<"
            print(f"{marker} {content}")
        except UnicodeDecodeError:
            print(f"{direction}: [binary content]")
        return stream

Explanation: _monitor_stream() decodes data into UTF-8 and displays it along with a direction marker (>>> for incoming and <<< for outgoing data).

Purpose: This method helps monitor and print readable data for analysis while logging binary content separately.

Managing the Bridge Connections

    def _bridge_connections(self, entry_point: socket.socket) -> None:
        #Establish and maintain connection bridge
        # Initialize exit point
        exit_point = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            exit_point.connect((self._remote_addr, self._remote_port))
            # Handle initial remote response
            if self._preload:
                remote_data = self._extract_stream(exit_point)
                if remote_data:
                    self._process_data(remote_data)
                    processed = self._monitor_stream("out", remote_data)
                    entry_point.send(processed)
            # Main interaction loop
            while True:
                # Process incoming traffic
                entry_data = self._extract_stream(entry_point)
                if entry_data:
                    print(f"\n[>] Captured {len(entry_data)} bytes incoming")
                    self._process_data(entry_data)
                    processed = self._monitor_stream("in", entry_data)
                    exit_point.send(processed)
                # Process outgoing traffic
                exit_data = self._extract_stream(exit_point)
                if exit_data:
                    print(f"\n[<] Captured {len(exit_data)} bytes outgoing")
                    self._process_data(exit_data)
                    processed = self._monitor_stream("out", exit_data)
                    entry_point.send(processed)
                # Prevent CPU saturation
                if not (entry_data or exit_data):
                    time.sleep(0.1)
        except Exception as e:
            print(f"[!] Bridge error: {str(e)}")
        finally:
            print("[*] Closing bridge")
            entry_point.close()
            exit_point.close()

Explanation:This method creates a "bridge" between the client (entry_point) and the target server (exit_point). The code uses a loop to handle incoming and outgoing data, forwarding each chunk from client to server and back.

  • It first establishes a connection to the remote server.
  • If the preload flag is set, it retrieves an initial response from the remote server and sends it back to the client.
  • The loop continues to capture incoming data from the client, process and monitor it, and forward it to the server, and vice versa.

CPU Optimization: If no data is sent or received, it briefly pauses to avoid unnecessary CPU usage.

Orchestrating the Proxy Execution:

    def orchestrate(self) -> None:
        # Orchestrate the proxy operation
        # Validate input
        if len(sys.argv[1:]) != 5:
            print("Usage: script.py [local_addr] [local_port] [remote_addr] [remote_port] [preload]")
            print("Example: script.py 127.0.0.1 8080 target.com 80 True")
            sys.exit(1)
        # Configure proxy parameters
        self._local_addr = sys.argv[1]
        self._local_port = int(sys.argv[2])
        self._remote_addr = sys.argv[3]
        self._remote_port = int(sys.argv[4])
        self._preload = "true" in sys.argv[5].lower()
        # Initialize listener
        listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        try:
            listener.bind((self._local_addr, self._local_port))
        except socket.error as e:
            print(f"[!] Binding failed: {e}")
            sys.exit(1)
        listener.listen(self._backlog)
        print(f"[*] Service active on {self._local_addr}:{self._local_port}")
        # Main service loop
        while True:
            client, address = listener.accept()
            print(f"[+] Connection from {address[0]}:{address[1]}")
            bridge = threading.Thread(
                target=self._bridge_connections,
                args=(client,)
            )
            bridge.daemon = True
            bridge.start()

if __name__ == "__main__":
    bridge = TcpProxy()
    bridge.orchestrate()

Explanation: The orchestrate() method is the main entry point for running the proxy. Here’s what each section does:

Command-Line Arguments: It reads and validates the command-line arguments, which set up the local and remote addresses, ports, and whether or not to preload data from the server.

Listener Setup: It binds a listener socket to the local address and port to accept incoming client connections. The SO_REUSEADDR option allows the socket to reuse the address if it’s in use.

Connection Handling: Each client connection is handled in a separate thread using _bridge_connections() to manage communication between the client and the remote server.

How to Run the Proxy

Save the code to a file, say tcp_proxy.py, and run it in the terminal like this:

$ python tcp_proxy.py 127.0.0.1 8080 example.com 80 True

Replace the IPs and ports with the addresses of the local and remote systems you want to bridge.

Practical Testing

To test this program practically, we are going to be using a publicly available FTP server. This is the remote FTP server you want the proxy to connect to. ftp.sun.ac.za is a public FTP server at Stellenbosch University in South Africa, which allows anonymous FTP access. It’s totally permissible and legal to access this server. But it’s advisable to do so anonymously. 

So on one Kali Terminal, run:

$ sudo python3 tcp_proxy.py 192.168.17.128 21 ftp.sun.ac.za 21 True

sudo: Running as superuser is necessary to bind to certain ports (like port 21), as these require elevated privileges on Linux systems.

python3 tcp_proxy.py: This runs the tcp_proxy.py Python script with Python 3.

192.168.17.128: This is the local IP address on your Kali Linux machine (please replace with yours) where the proxy listens for incoming connections. In this case, it's configured to listen for connections on your private network.

21: This is the local port that the proxy will listen on, which matches the standard FTP port (21). Any traffic sent to 192.168.17.128 on port 21 will be captured by your proxy.

ftp.sun.ac.za: This is the remote FTP server you want the proxy to connect to. ftp.sun.ac.za is a public FTP server at Stellenbosch University in South Africa, which allows anonymous FTP access.

21: The remote port the proxy will connect to on the ftp.sun.ac.za server. Again, it's set to the standard FTP port, 21.

True: This indicates that you want the preload feature to be active, which typically means that the proxy will pull and display any initial response from the remote server (like a welcome message) before any client data is sent.

On the other kali terminal, simply run:

$ ftp 192.168.17.128 21

Any FTP client that connects to 192.168.17.128:21 will be intercepted by the proxy. When prompted to enter login credentials, use “anonymous” as the username and simply skip the password by hitting enter. The server doesn’t check passwords.

Results:

So now any interaction between the client and the server (right part of the screen) will be intercepted by our proxy (left part of the screen). 

Conclusion

This TCP proxy enables you to intercept and analyze traffic between a client and a server in real-time. By breaking down the data in manageable chunks, decoding it, and monitoring the flow in both directions, you can inspect and potentially alter traffic for debugging or testing.

Next Steps

You can extend this proxy to handle additional protocols, filter specific traffic, or add logging and data manipulation features. Experimenting with it is a great way to understand network communication at a deeper level!

I hope you enjoyed this one! Till next time

Related Tutorials

Found the article interesting? You'll love our Python Code Generator! Give AI a chance to do the heavy lifting for you. Check it out!

View Full Code Explain My Code
Sharing is caring!



Read Also



Comment panel

    Got a coding query or need some guidance before you comment? Check out this Python Code Assistant for expert advice and handy tips. It's like having a coding tutor right in your fingertips!