Files
QBPOS-Help/python/secure_production_server.py

205 lines
6.5 KiB
Python

#!/usr/bin/env python3
"""
Secure Production HTTP Server for QuickBooks POS Help Documentation
Features:
- IP whitelist security
- Rate limiting
- Auto-restart capability
- Production-ready error handling
- Logging
- HTTPS ready (configurable)
"""
import http.server
import socketserver
import os
import logging
import signal
import sys
from datetime import datetime
from collections import defaultdict
from threading import Lock
import time
# ============ CONFIGURATION ============
PORT = 8888
DIRECTORY = "QB_Help_Web"
# Security: Allowed IP addresses (empty list = allow all)
# Add your allowed IPs here, e.g., ['192.168.10.0/24', '10.0.0.1']
ALLOWED_IPS = [] # Empty = allow all. Populate for security.
# Rate limiting: max requests per IP per minute
RATE_LIMIT_REQUESTS = 1000
RATE_LIMIT_WINDOW = 60 # seconds
# Logging configuration
LOG_FILE = "/tmp/qbpos_help_server.log"
LOG_LEVEL = logging.INFO
# HTTPS Configuration (set to True when ready)
USE_HTTPS = False
SSL_CERT_FILE = "/etc/ssl/certs/server.crt" # Update path when ready
SSL_KEY_FILE = "/etc/ssl/private/server.key" # Update path when ready
# ============ END CONFIGURATION ============
# Configure logging
logging.basicConfig(
level=LOG_LEVEL,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
# Rate limiting storage
rate_limit_data = defaultdict(list)
rate_limit_lock = Lock()
def is_ip_allowed(client_ip):
"""Check if IP is in whitelist. If whitelist is empty, allow all."""
if not ALLOWED_IPS:
return True
# Simple IP matching (for production, use ipaddress module for CIDR)
for allowed in ALLOWED_IPS:
if client_ip.startswith(allowed.split('/')[0]):
return True
return False
def check_rate_limit(client_ip):
"""Check if client has exceeded rate limit."""
with rate_limit_lock:
current_time = time.time()
# Clean old entries
rate_limit_data[client_ip] = [
t for t in rate_limit_data[client_ip]
if current_time - t < RATE_LIMIT_WINDOW
]
# Check limit
if len(rate_limit_data[client_ip]) >= RATE_LIMIT_REQUESTS:
return False
# Add current request
rate_limit_data[client_ip].append(current_time)
return True
class SecureHTTPRequestHandler(http.server.SimpleHTTPRequestHandler):
"""Secure HTTP request handler with IP whitelist and rate limiting."""
def __init__(self, *args, **kwargs):
super().__init__(*args, directory=DIRECTORY, **kwargs)
def do_GET(self):
"""Handle GET requests with security checks."""
client_ip = self.client_address[0]
# Check IP whitelist
if not is_ip_allowed(client_ip):
logger.warning(f"Blocked request from unauthorized IP: {client_ip}")
self.send_error(403, "Forbidden: Access denied")
return
# Check rate limit
if not check_rate_limit(client_ip):
logger.warning(f"Rate limit exceeded for IP: {client_ip}")
self.send_error(429, "Too Many Requests: Rate limit exceeded")
return
# Process request
try:
super().do_GET()
except Exception as e:
logger.error(f"Error processing request from {client_ip}: {e}")
self.send_error(500, "Internal Server Error")
def log_message(self, format, *args):
"""Log only errors and important events."""
if args[1][0] in ('4', '5'): # 4xx and 5xx errors
logger.warning(f"{self.client_address[0]} - {format % args}")
def end_headers(self):
"""Add security and cache headers."""
# Security headers
self.send_header('X-Content-Type-Options', 'nosniff')
self.send_header('X-Frame-Options', 'SAMEORIGIN')
self.send_header('X-XSS-Protection', '1; mode=block')
# Cache control
self.send_header('Cache-Control', 'no-cache, no-store, must-revalidate')
self.send_header('Pragma', 'no-cache')
self.send_header('Expires', '0')
super().end_headers()
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
"""Threaded TCP server for concurrent connections."""
allow_reuse_address = True
daemon_threads = True
request_queue_size = 100
def signal_handler(sig, frame):
"""Handle graceful shutdown."""
logger.info("\nShutting down server gracefully...")
sys.exit(0)
def main():
"""Main server function."""
# Change to script directory
os.chdir(os.path.dirname(os.path.abspath(__file__)))
# Register signal handlers
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Print startup information
protocol = "HTTPS" if USE_HTTPS else "HTTP"
print("=" * 70)
print("QuickBooks POS Help Documentation Server - SECURE MODE")
print("=" * 70)
print(f"Server running at:")
print(f" Local: {protocol.lower()}://localhost:{PORT}/POS_Help.html")
print(f" Network: {protocol.lower()}://192.168.10.130:{PORT}/POS_Help.html")
print(f"\nSecurity Features:")
print(f" ✓ IP Whitelist: {'Enabled' if ALLOWED_IPS else 'Disabled (Allow All)'}")
print(f" ✓ Rate Limiting: {RATE_LIMIT_REQUESTS} req/min per IP")
print(f" ✓ Security Headers: Enabled")
print(f" ✓ HTTPS: {'Enabled' if USE_HTTPS else 'Ready (Set USE_HTTPS=True)'}")
print(f"\nLogging:")
print(f" ✓ Log File: {LOG_FILE}")
print(f" ✓ Log Level: {logging.getLevelName(LOG_LEVEL)}")
print(f"\nPress Ctrl+C to stop")
print("=" * 70)
logger.info(f"Server starting on port {PORT}")
try:
if USE_HTTPS:
import ssl
# HTTPS server (when ready)
httpd = ThreadedTCPServer(("0.0.0.0", PORT), SecureHTTPRequestHandler)
httpd.socket = ssl.wrap_socket(
httpd.socket,
certfile=SSL_CERT_FILE,
keyfile=SSL_KEY_FILE,
server_side=True
)
logger.info("HTTPS enabled")
else:
# HTTP server
httpd = ThreadedTCPServer(("0.0.0.0", PORT), SecureHTTPRequestHandler)
logger.info("Server ready to accept connections")
httpd.serve_forever()
except Exception as e:
logger.error(f"Server error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()