#!/usr/bin/env python3 """ Secure Production HTTP Server for QuickBooks POS Help Documentation Features: - IP whitelist security - Rate limiting - Auto-restart capability - Production-ready error handling - Logging - HTTPS ready (configurable) """ import http.server import socketserver import os import logging import signal import sys from datetime import datetime from collections import defaultdict from threading import Lock import time # ============ CONFIGURATION ============ PORT = 8888 DIRECTORY = "QB_Help_Web" # Security: Allowed IP addresses (empty list = allow all) # Add your allowed IPs here, e.g., ['192.168.10.0/24', '10.0.0.1'] ALLOWED_IPS = [] # Empty = allow all. Populate for security. # Rate limiting: max requests per IP per minute RATE_LIMIT_REQUESTS = 1000 RATE_LIMIT_WINDOW = 60 # seconds # Logging configuration LOG_FILE = "/tmp/qbpos_help_server.log" LOG_LEVEL = logging.INFO # HTTPS Configuration (set to True when ready) USE_HTTPS = False SSL_CERT_FILE = "/etc/ssl/certs/server.crt" # Update path when ready SSL_KEY_FILE = "/etc/ssl/private/server.key" # Update path when ready # ============ END CONFIGURATION ============ # Configure logging logging.basicConfig( level=LOG_LEVEL, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(LOG_FILE), logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger(__name__) # Rate limiting storage rate_limit_data = defaultdict(list) rate_limit_lock = Lock() def is_ip_allowed(client_ip): """Check if IP is in whitelist. If whitelist is empty, allow all.""" if not ALLOWED_IPS: return True # Simple IP matching (for production, use ipaddress module for CIDR) for allowed in ALLOWED_IPS: if client_ip.startswith(allowed.split('/')[0]): return True return False def check_rate_limit(client_ip): """Check if client has exceeded rate limit.""" with rate_limit_lock: current_time = time.time() # Clean old entries rate_limit_data[client_ip] = [ t for t in rate_limit_data[client_ip] if current_time - t < RATE_LIMIT_WINDOW ] # Check limit if len(rate_limit_data[client_ip]) >= RATE_LIMIT_REQUESTS: return False # Add current request rate_limit_data[client_ip].append(current_time) return True class SecureHTTPRequestHandler(http.server.SimpleHTTPRequestHandler): """Secure HTTP request handler with IP whitelist and rate limiting.""" def __init__(self, *args, **kwargs): super().__init__(*args, directory=DIRECTORY, **kwargs) def do_GET(self): """Handle GET requests with security checks.""" client_ip = self.client_address[0] # Check IP whitelist if not is_ip_allowed(client_ip): logger.warning(f"Blocked request from unauthorized IP: {client_ip}") self.send_error(403, "Forbidden: Access denied") return # Check rate limit if not check_rate_limit(client_ip): logger.warning(f"Rate limit exceeded for IP: {client_ip}") self.send_error(429, "Too Many Requests: Rate limit exceeded") return # Process request try: super().do_GET() except Exception as e: logger.error(f"Error processing request from {client_ip}: {e}") self.send_error(500, "Internal Server Error") def log_message(self, format, *args): """Log only errors and important events.""" if args[1][0] in ('4', '5'): # 4xx and 5xx errors logger.warning(f"{self.client_address[0]} - {format % args}") def end_headers(self): """Add security and cache headers.""" # Security headers self.send_header('X-Content-Type-Options', 'nosniff') self.send_header('X-Frame-Options', 'SAMEORIGIN') self.send_header('X-XSS-Protection', '1; mode=block') # Cache control self.send_header('Cache-Control', 'no-cache, no-store, must-revalidate') self.send_header('Pragma', 'no-cache') self.send_header('Expires', '0') super().end_headers() class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): """Threaded TCP server for concurrent connections.""" allow_reuse_address = True daemon_threads = True request_queue_size = 100 def signal_handler(sig, frame): """Handle graceful shutdown.""" logger.info("\nShutting down server gracefully...") sys.exit(0) def main(): """Main server function.""" # Change to script directory os.chdir(os.path.dirname(os.path.abspath(__file__))) # Register signal handlers signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # Print startup information protocol = "HTTPS" if USE_HTTPS else "HTTP" print("=" * 70) print("QuickBooks POS Help Documentation Server - SECURE MODE") print("=" * 70) print(f"Server running at:") print(f" Local: {protocol.lower()}://localhost:{PORT}/POS_Help.html") print(f" Network: {protocol.lower()}://192.168.10.130:{PORT}/POS_Help.html") print(f"\nSecurity Features:") print(f" ✓ IP Whitelist: {'Enabled' if ALLOWED_IPS else 'Disabled (Allow All)'}") print(f" ✓ Rate Limiting: {RATE_LIMIT_REQUESTS} req/min per IP") print(f" ✓ Security Headers: Enabled") print(f" ✓ HTTPS: {'Enabled' if USE_HTTPS else 'Ready (Set USE_HTTPS=True)'}") print(f"\nLogging:") print(f" ✓ Log File: {LOG_FILE}") print(f" ✓ Log Level: {logging.getLevelName(LOG_LEVEL)}") print(f"\nPress Ctrl+C to stop") print("=" * 70) logger.info(f"Server starting on port {PORT}") try: if USE_HTTPS: import ssl # HTTPS server (when ready) httpd = ThreadedTCPServer(("0.0.0.0", PORT), SecureHTTPRequestHandler) httpd.socket = ssl.wrap_socket( httpd.socket, certfile=SSL_CERT_FILE, keyfile=SSL_KEY_FILE, server_side=True ) logger.info("HTTPS enabled") else: # HTTP server httpd = ThreadedTCPServer(("0.0.0.0", PORT), SecureHTTPRequestHandler) logger.info("Server ready to accept connections") httpd.serve_forever() except Exception as e: logger.error(f"Server error: {e}") sys.exit(1) if __name__ == "__main__": main()