Initial commit - QBPOS Help
This commit is contained in:
204
python/secure_production_server.py
Normal file
204
python/secure_production_server.py
Normal file
@@ -0,0 +1,204 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Secure Production HTTP Server for QuickBooks POS Help Documentation
|
||||
Features:
|
||||
- IP whitelist security
|
||||
- Rate limiting
|
||||
- Auto-restart capability
|
||||
- Production-ready error handling
|
||||
- Logging
|
||||
- HTTPS ready (configurable)
|
||||
"""
|
||||
import http.server
|
||||
import socketserver
|
||||
import os
|
||||
import logging
|
||||
import signal
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from collections import defaultdict
|
||||
from threading import Lock
|
||||
import time
|
||||
|
||||
# ============ CONFIGURATION ============
|
||||
PORT = 8888
|
||||
DIRECTORY = "QB_Help_Web"
|
||||
|
||||
# Security: Allowed IP addresses (empty list = allow all)
|
||||
# Add your allowed IPs here, e.g., ['192.168.10.0/24', '10.0.0.1']
|
||||
ALLOWED_IPS = [] # Empty = allow all. Populate for security.
|
||||
|
||||
# Rate limiting: max requests per IP per minute
|
||||
RATE_LIMIT_REQUESTS = 1000
|
||||
RATE_LIMIT_WINDOW = 60 # seconds
|
||||
|
||||
# Logging configuration
|
||||
LOG_FILE = "/tmp/qbpos_help_server.log"
|
||||
LOG_LEVEL = logging.INFO
|
||||
|
||||
# HTTPS Configuration (set to True when ready)
|
||||
USE_HTTPS = False
|
||||
SSL_CERT_FILE = "/etc/ssl/certs/server.crt" # Update path when ready
|
||||
SSL_KEY_FILE = "/etc/ssl/private/server.key" # Update path when ready
|
||||
|
||||
# ============ END CONFIGURATION ============
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=LOG_LEVEL,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
handlers=[
|
||||
logging.FileHandler(LOG_FILE),
|
||||
logging.StreamHandler(sys.stdout)
|
||||
]
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Rate limiting storage
|
||||
rate_limit_data = defaultdict(list)
|
||||
rate_limit_lock = Lock()
|
||||
|
||||
def is_ip_allowed(client_ip):
|
||||
"""Check if IP is in whitelist. If whitelist is empty, allow all."""
|
||||
if not ALLOWED_IPS:
|
||||
return True
|
||||
|
||||
# Simple IP matching (for production, use ipaddress module for CIDR)
|
||||
for allowed in ALLOWED_IPS:
|
||||
if client_ip.startswith(allowed.split('/')[0]):
|
||||
return True
|
||||
return False
|
||||
|
||||
def check_rate_limit(client_ip):
|
||||
"""Check if client has exceeded rate limit."""
|
||||
with rate_limit_lock:
|
||||
current_time = time.time()
|
||||
|
||||
# Clean old entries
|
||||
rate_limit_data[client_ip] = [
|
||||
t for t in rate_limit_data[client_ip]
|
||||
if current_time - t < RATE_LIMIT_WINDOW
|
||||
]
|
||||
|
||||
# Check limit
|
||||
if len(rate_limit_data[client_ip]) >= RATE_LIMIT_REQUESTS:
|
||||
return False
|
||||
|
||||
# Add current request
|
||||
rate_limit_data[client_ip].append(current_time)
|
||||
return True
|
||||
|
||||
class SecureHTTPRequestHandler(http.server.SimpleHTTPRequestHandler):
|
||||
"""Secure HTTP request handler with IP whitelist and rate limiting."""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, directory=DIRECTORY, **kwargs)
|
||||
|
||||
def do_GET(self):
|
||||
"""Handle GET requests with security checks."""
|
||||
client_ip = self.client_address[0]
|
||||
|
||||
# Check IP whitelist
|
||||
if not is_ip_allowed(client_ip):
|
||||
logger.warning(f"Blocked request from unauthorized IP: {client_ip}")
|
||||
self.send_error(403, "Forbidden: Access denied")
|
||||
return
|
||||
|
||||
# Check rate limit
|
||||
if not check_rate_limit(client_ip):
|
||||
logger.warning(f"Rate limit exceeded for IP: {client_ip}")
|
||||
self.send_error(429, "Too Many Requests: Rate limit exceeded")
|
||||
return
|
||||
|
||||
# Process request
|
||||
try:
|
||||
super().do_GET()
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing request from {client_ip}: {e}")
|
||||
self.send_error(500, "Internal Server Error")
|
||||
|
||||
def log_message(self, format, *args):
|
||||
"""Log only errors and important events."""
|
||||
if args[1][0] in ('4', '5'): # 4xx and 5xx errors
|
||||
logger.warning(f"{self.client_address[0]} - {format % args}")
|
||||
|
||||
def end_headers(self):
|
||||
"""Add security and cache headers."""
|
||||
# Security headers
|
||||
self.send_header('X-Content-Type-Options', 'nosniff')
|
||||
self.send_header('X-Frame-Options', 'SAMEORIGIN')
|
||||
self.send_header('X-XSS-Protection', '1; mode=block')
|
||||
|
||||
# Cache control
|
||||
self.send_header('Cache-Control', 'no-cache, no-store, must-revalidate')
|
||||
self.send_header('Pragma', 'no-cache')
|
||||
self.send_header('Expires', '0')
|
||||
|
||||
super().end_headers()
|
||||
|
||||
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
|
||||
"""Threaded TCP server for concurrent connections."""
|
||||
allow_reuse_address = True
|
||||
daemon_threads = True
|
||||
request_queue_size = 100
|
||||
|
||||
def signal_handler(sig, frame):
|
||||
"""Handle graceful shutdown."""
|
||||
logger.info("\nShutting down server gracefully...")
|
||||
sys.exit(0)
|
||||
|
||||
def main():
|
||||
"""Main server function."""
|
||||
# Change to script directory
|
||||
os.chdir(os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
# Register signal handlers
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
signal.signal(signal.SIGTERM, signal_handler)
|
||||
|
||||
# Print startup information
|
||||
protocol = "HTTPS" if USE_HTTPS else "HTTP"
|
||||
print("=" * 70)
|
||||
print("QuickBooks POS Help Documentation Server - SECURE MODE")
|
||||
print("=" * 70)
|
||||
print(f"Server running at:")
|
||||
print(f" Local: {protocol.lower()}://localhost:{PORT}/POS_Help.html")
|
||||
print(f" Network: {protocol.lower()}://192.168.10.130:{PORT}/POS_Help.html")
|
||||
print(f"\nSecurity Features:")
|
||||
print(f" ✓ IP Whitelist: {'Enabled' if ALLOWED_IPS else 'Disabled (Allow All)'}")
|
||||
print(f" ✓ Rate Limiting: {RATE_LIMIT_REQUESTS} req/min per IP")
|
||||
print(f" ✓ Security Headers: Enabled")
|
||||
print(f" ✓ HTTPS: {'Enabled' if USE_HTTPS else 'Ready (Set USE_HTTPS=True)'}")
|
||||
print(f"\nLogging:")
|
||||
print(f" ✓ Log File: {LOG_FILE}")
|
||||
print(f" ✓ Log Level: {logging.getLevelName(LOG_LEVEL)}")
|
||||
print(f"\nPress Ctrl+C to stop")
|
||||
print("=" * 70)
|
||||
|
||||
logger.info(f"Server starting on port {PORT}")
|
||||
|
||||
try:
|
||||
if USE_HTTPS:
|
||||
import ssl
|
||||
# HTTPS server (when ready)
|
||||
httpd = ThreadedTCPServer(("0.0.0.0", PORT), SecureHTTPRequestHandler)
|
||||
httpd.socket = ssl.wrap_socket(
|
||||
httpd.socket,
|
||||
certfile=SSL_CERT_FILE,
|
||||
keyfile=SSL_KEY_FILE,
|
||||
server_side=True
|
||||
)
|
||||
logger.info("HTTPS enabled")
|
||||
else:
|
||||
# HTTP server
|
||||
httpd = ThreadedTCPServer(("0.0.0.0", PORT), SecureHTTPRequestHandler)
|
||||
|
||||
logger.info("Server ready to accept connections")
|
||||
httpd.serve_forever()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Server error: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user