import hashlib import logging import os import aioredis import uuid import redis import json from fastapi import FastAPI, Query, HTTPException, WebSocket, WebSocketDisconnect, Request, Depends from fastapi.responses import FileResponse from fastapi.staticfiles import StaticFiles from fastapi_limiter import FastAPILimiter from fastapi_limiter.depends import RateLimiter from playwright.async_api import async_playwright app = FastAPI() # Secret API Key SECRET_PHRASE = os.getenv("API_SECRET", "my_secret_key") # Use the Redis password from the environment variable REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0") @app.on_event("startup") async def startup(): """Initialize FastAPI Limiter with async Redis connection.""" redis = await aioredis.from_url(REDIS_URL, decode_responses=True) await FastAPILimiter.init(redis) # Setup Static Files for Documentation app.mount("/", StaticFiles(directory="app/docs", html=True), name="docs") # WebSocket Connections Dictionary active_connections = {} # Device Profiles for Emulation DEVICE_PROFILES = { "mobile": {"width": 375, "height": 667, "user_agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X)"}, "tablet": {"width": 768, "height": 1024, "user_agent": "Mozilla/5.0 (iPad; CPU OS 14_0 like Mac OS X)"}, "desktop": {"width": 1280, "height": 720, "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"}, } def generate_md5_hash(url: str, secret: str) -> str: """Generate MD5 hash for authentication.""" return hashlib.md5(f"{url}{secret}".encode()).hexdigest() async def take_screenshot(websocket: WebSocket, url: str, width: int, height: int, full_height: bool, format: str, click_selectors: list, delay: int, device: str): """Capture a screenshot using Playwright and return the file path.""" file_ext = format.lower() screenshot_path = f"app/docs/screenshots/{uuid.uuid4()}.{file_ext}" async with async_playwright() as p: browser = await p.chromium.launch(headless=True) await websocket.send_text("Browser launched...") viewport = {"width": width, "height": height} user_agent = None if device in DEVICE_PROFILES: viewport = {"width": DEVICE_PROFILES[device]["width"], "height": DEVICE_PROFILES[device]["height"]} user_agent = DEVICE_PROFILES[device]["user_agent"] context = await browser.new_context(viewport=viewport, user_agent=user_agent) page = await context.new_page() await page.goto(url, wait_until="load") await websocket.send_text("Page loaded...") for selector in click_selectors: try: await page.click(selector, timeout=2000) await websocket.send_text(f"Clicked element: {selector}") except Exception: pass # Ignore if element is not found if delay > 0: await websocket.send_text(f"Waiting for {delay}ms...") await page.wait_for_timeout(delay) if full_height: height = await page.evaluate("() => document.body.scrollHeight") await page.set_viewport_size({"width": viewport["width"], "height": height}) await page.screenshot(path=screenshot_path, full_page=full_height, type=file_ext) await websocket.send_text("Screenshot captured.") await browser.close() return screenshot_path @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): """WebSocket Endpoint for Real-Time Updates.""" await websocket.accept() active_connections[websocket] = True try: while True: data = await websocket.receive_text() params = json.loads(data) screenshot_path = await take_screenshot(websocket, **params) await websocket.send_text(json.dumps({"screenshot_url": f"/screenshots/{os.path.basename(screenshot_path)}"})) except WebSocketDisconnect: active_connections.pop(websocket, None)