Files
screenshot-api/app/main.py
2025-02-06 15:44:16 +00:00

104 lines
3.9 KiB
Python

import hashlib
import logging
import os
import uuid
import redis
import json
from fastapi import FastAPI, Query, HTTPException, WebSocket, WebSocketDisconnect, Request, Depends
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from fastapi_limiter import FastAPILimiter
from fastapi_limiter.depends import RateLimiter
from playwright.async_api import async_playwright
app = FastAPI()
# Secret API Key
SECRET_PHRASE = os.getenv("API_SECRET", "my_secret_key")
# Use the Redis password from the environment variable
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0")
@app.on_event("startup")
async def startup():
"""Initialize FastAPI Limiter with async Redis connection."""
redis = await aioredis.from_url(REDIS_URL, decode_responses=True)
await FastAPILimiter.init(redis)
# Setup Static Files for Documentation
app.mount("/", StaticFiles(directory="app/docs", html=True), name="docs")
# WebSocket Connections Dictionary
active_connections = {}
# Device Profiles for Emulation
DEVICE_PROFILES = {
"mobile": {"width": 375, "height": 667, "user_agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X)"},
"tablet": {"width": 768, "height": 1024, "user_agent": "Mozilla/5.0 (iPad; CPU OS 14_0 like Mac OS X)"},
"desktop": {"width": 1280, "height": 720, "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"},
}
def generate_md5_hash(url: str, secret: str) -> str:
"""Generate MD5 hash for authentication."""
return hashlib.md5(f"{url}{secret}".encode()).hexdigest()
async def take_screenshot(websocket: WebSocket, url: str, width: int, height: int, full_height: bool, format: str, click_selectors: list, delay: int, device: str):
"""Capture a screenshot using Playwright and return the file path."""
file_ext = format.lower()
screenshot_path = f"app/docs/screenshots/{uuid.uuid4()}.{file_ext}"
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
await websocket.send_text("Browser launched...")
viewport = {"width": width, "height": height}
user_agent = None
if device in DEVICE_PROFILES:
viewport = {"width": DEVICE_PROFILES[device]["width"], "height": DEVICE_PROFILES[device]["height"]}
user_agent = DEVICE_PROFILES[device]["user_agent"]
context = await browser.new_context(viewport=viewport, user_agent=user_agent)
page = await context.new_page()
await page.goto(url, wait_until="load")
await websocket.send_text("Page loaded...")
for selector in click_selectors:
try:
await page.click(selector, timeout=2000)
await websocket.send_text(f"Clicked element: {selector}")
except Exception:
pass # Ignore if element is not found
if delay > 0:
await websocket.send_text(f"Waiting for {delay}ms...")
await page.wait_for_timeout(delay)
if full_height:
height = await page.evaluate("() => document.body.scrollHeight")
await page.set_viewport_size({"width": viewport["width"], "height": height})
await page.screenshot(path=screenshot_path, full_page=full_height, type=file_ext)
await websocket.send_text("Screenshot captured.")
await browser.close()
return screenshot_path
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket Endpoint for Real-Time Updates."""
await websocket.accept()
active_connections[websocket] = True
try:
while True:
data = await websocket.receive_text()
params = json.loads(data)
screenshot_path = await take_screenshot(websocket, **params)
await websocket.send_text(json.dumps({"screenshot_url": f"/screenshots/{os.path.basename(screenshot_path)}"}))
except WebSocketDisconnect:
active_connections.pop(websocket, None)