#!/usr/bin/env python3 """ FastAPI application for SS Crawler API. Provides endpoints for checking domains and managing crawl jobs. """ from fastapi import FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, JSONResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from typing import Optional, Dict, Any import uvicorn from pathlib import Path import json from api.crawl.check_domain import check_domain from api.crawl.db import create_job, get_job, update_job_status, init_db from api.logs import router as logs_router app = FastAPI(title="SS Crawler API", version="1.0.0") # Include logs router app.include_router(logs_router) # Also include logs router with /ss-crawler-api prefix for reverse proxy compatibility from fastapi import APIRouter ss_crawler_api_router = APIRouter(prefix="/ss-crawler-api") ss_crawler_api_router.include_router(logs_router) app.include_router(ss_crawler_api_router) # Initialize database on startup @app.on_event("startup") async def startup_event(): """Initialize database when API server starts.""" init_db() # Serve static files from api directory api_dir = Path(__file__).parent project_root = api_dir.parent # Mount exports directory static_dir = api_dir / 'exports' if static_dir.exists(): app.mount("/api/exports", StaticFiles(directory=str(static_dir)), name="exports") # Mount wordpress-plugin directory to serve plugin ZIP plugin_dir = project_root / 'wordpress-plugin' if plugin_dir.exists(): app.mount("/wordpress-plugin", StaticFiles(directory=str(plugin_dir)), name="wordpress-plugin") # Mount data/exports directory to serve CSV export files data_exports_dir = project_root / 'data' / 'exports' if data_exports_dir.exists(): app.mount("/data/exports", StaticFiles(directory=str(data_exports_dir)), name="data-exports") # Serve plugin info.json @app.get("/api/plugin/info.json") async def get_plugin_info_json(): """Serve plugin info.json file.""" info_json_path = api_dir / 'plugin' / 'info.json' if info_json_path.exists(): return FileResponse(str(info_json_path), media_type='application/json') else: raise HTTPException(status_code=404, detail="Plugin info not found") # Serve plugin info.php (read and return JSON) @app.get("/api/plugin/info.php") async def get_plugin_info_php(token: Optional[str] = None, request: Optional[Any] = None): """Serve plugin info via PHP-compatible endpoint.""" from fastapi import Request info_json_path = api_dir / 'plugin' / 'info.json' if not info_json_path.exists(): raise HTTPException(status_code=404, detail="Plugin info not found") try: with open(info_json_path, 'r', encoding='utf-8') as f: info = json.load(f) # Build absolute download URL # Get protocol and host from request headers zip_path = info.get('zip_path', '/wordpress-plugin/ss-crawler-importer.zip') # Try to get host from request # Since we're behind Caddy, check X-Forwarded-Host or Host header # Default to 116.103.108.134 if not available host = '116.103.108.134' protocol = 'https' # Caddy handles HTTPS # Build absolute URL info['download_url'] = f'{protocol}://{host}/ss-crawler-api{zip_path}' # Add token if provided if token: separator = '&' if '?' in info['download_url'] else '?' info['download_url'] += f'{separator}token={token}' return JSONResponse(content=info) except Exception as e: raise HTTPException(status_code=500, detail=f"Error reading plugin info: {str(e)}") # CORS middleware - allow requests from WordPress app.add_middleware( CORSMiddleware, allow_origins=["*"], # In production, specify WordPress domain allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Request/Response models class CheckRequest(BaseModel): domain: str analyze: bool = False class CrawlRequest(BaseModel): domain: str config: Optional[Dict[str, Any]] = None @app.get("/") def root(): """Root endpoint.""" return {"message": "SS Crawler API", "version": "1.0.0"} @app.post("/api/crawl/check") def check(request: CheckRequest): """ Check domain and return detection results. Args: request: CheckRequest with domain and optional analyze flag Returns: Detection results including platform, config, and optional analysis """ try: result = check_domain(request.domain, analyze=request.analyze) if not result.get('success'): raise HTTPException(status_code=400, detail=result.get('error', 'Check failed')) return { "success": True, "data": result } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/crawl/start") def start_crawl(request: CrawlRequest): """ Start a crawl job (add to queue). Args: request: CrawlRequest with domain and optional config Returns: Job information including job_id """ try: config_data = dict(request.config) if request.config else {} needs_config = not config_data.get('config_path') if needs_config: detection = check_domain(request.domain, analyze=False) if not detection.get('success'): raise HTTPException(status_code=500, detail="Failed to auto-generate config for domain") detected_config_path = detection.get('config_path') if not detected_config_path: raise HTTPException(status_code=500, detail="Config path missing from detection result") config_data['config_path'] = detected_config_path if detection.get('type'): config_data.setdefault('detected_type', detection['type']) if detection.get('export') and detection['export'].get('domain_slug'): config_data.setdefault('export_slug', detection['export']['domain_slug']) if detection.get('etsy'): config_data.setdefault('etsy', detection['etsy']) job_id = create_job(request.domain, config_data) job = get_job(job_id) return { "success": True, "data": { "job_id": job_id, "domain": request.domain, "status": "queued", "created_at": job['created_at'] } } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/crawl/status/{job_id}") def get_status(job_id: str): """ Get crawl job status. Args: job_id: Job ID Returns: Job status and progress information """ job = get_job(job_id) if not job: raise HTTPException(status_code=404, detail="Job not found") return { "success": True, "data": { "job_id": job['job_id'], "domain": job['domain'], "status": job['status'], "progress": job['progress'] or 0, "current_step": job['current_step'], "products_processed": job['products_processed'] or 0, "products_total": job['products_total'] or 0, "file_path": job['file_path'], "products_count": job['products_count'], "error": job['error'], "created_at": job['created_at'], "started_at": job['started_at'], "completed_at": job['completed_at'] } } @app.get("/api/crawl/jobs") def list_jobs(limit: int = 50): """ Get list of crawl jobs. Args: limit: Maximum number of jobs to return (default: 50) Returns: List of jobs ordered by created_at DESC """ from api.crawl.db import get_db_connection conn = get_db_connection() conn.row_factory = lambda c, r: dict(zip([col[0] for col in c.description], r)) cursor = conn.cursor() cursor.execute(""" SELECT job_id, domain, status, progress, current_step, products_processed, products_total, products_count, file_path, error, created_at, started_at, completed_at FROM jobs ORDER BY created_at DESC LIMIT ? """, (limit,)) jobs = cursor.fetchall() conn.close() return { "success": True, "data": jobs } @app.post("/api/crawl/cancel/{job_id}") def cancel_job(job_id: str): """ Cancel a crawl job. Args: job_id: Job ID Returns: Success message """ job = get_job(job_id) if not job: raise HTTPException(status_code=404, detail="Job not found") if job['status'] in ('completed', 'failed', 'cancelled'): raise HTTPException( status_code=400, detail=f"Cannot cancel job with status: {job['status']}" ) # Update job status to cancelled update_job_status(job_id, 'cancelled', current_step='Cancelled') return { "success": True, "message": "Job cancelled successfully" } @app.delete("/api/crawl/job/{job_id}") @app.post("/api/crawl/job/{job_id}/delete") def delete_job(job_id: str): """ Delete a crawl job from database. Args: job_id: Job ID Returns: Success message """ from api.crawl.db import get_db_connection conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT job_id FROM jobs WHERE job_id = ?", (job_id,)) if not cursor.fetchone(): conn.close() raise HTTPException(status_code=404, detail="Job not found") cursor.execute("DELETE FROM jobs WHERE job_id = ?", (job_id,)) conn.commit() conn.close() return { "success": True, "message": "Job deleted successfully" } class DeleteFileRequest(BaseModel): file: str @app.post("/api/exports/delete") @app.delete("/api/exports/delete") async def delete_export_file(request: Request, file: Optional[str] = None): """ Delete an export file. Args: request: FastAPI request object file: File name to delete (from query param or body) Returns: Success message """ import os # Get file name from query param, body, or form data if not file: # Try to get from form data (POST) try: form = await request.form() if 'file' in form: file = form['file'] except: try: body = await request.body() if body: # Try to parse as form data from urllib.parse import parse_qs parsed = parse_qs(body.decode('utf-8')) if 'file' in parsed: file = parsed['file'][0] except: pass # Try to get from query param if not file: file = request.query_params.get('file') if not file: raise HTTPException(status_code=400, detail="File name is required") # Security: Only allow CSV files if not file.endswith('.csv'): raise HTTPException(status_code=400, detail="Invalid file type") # Security: Only allow basename (no path traversal) file_name = os.path.basename(file) # Path to exports directory project_root = Path(__file__).parent.parent.parent exports_dir = project_root / 'data' / 'exports' if not exports_dir.exists(): raise HTTPException(status_code=404, detail="Exports directory not found") file_path = exports_dir / file_name # Security: Ensure file is within exports directory try: real_file_path = file_path.resolve() real_exports_dir = exports_dir.resolve() if not str(real_file_path).startswith(str(real_exports_dir)): raise HTTPException(status_code=403, detail="Invalid file path") except Exception: raise HTTPException(status_code=403, detail="Invalid file path") # Check if file exists if not file_path.exists(): raise HTTPException(status_code=404, detail="File not found") # Delete the file try: file_path.unlink() return { "success": True, "message": "File deleted successfully", "file": file_name } except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to delete file: {str(e)}") @app.get("/api/crawl/result/{job_id}") def get_result(job_id: str): """ Get crawl job result (file path, product count, etc.). Tries to find the latest export file created after job started, falls back to file_path from job if not found. Args: job_id: Job ID Returns: Job result information """ from pathlib import Path from datetime import datetime, timedelta import re from urllib.parse import urlparse job = get_job(job_id) if not job: raise HTTPException(status_code=404, detail="Job not found") if job['status'] != 'completed': raise HTTPException( status_code=400, detail=f"Job is not completed. Current status: {job['status']}" ) # Try to find the latest export file created after job started file_path = job.get('file_path') file_name = None file_url = None # Get job start time job_started_at = None if job.get('started_at'): try: job_started_at = datetime.fromisoformat(job['started_at'].replace('Z', '+00:00')) except: pass if not job_started_at and job.get('created_at'): try: job_started_at = datetime.fromisoformat(job['created_at'].replace('Z', '+00:00')) except: pass # Normalize domain to find matching files domain = job.get('domain', '') try: parsed = urlparse(domain) netloc = parsed.netloc or domain netloc = netloc.replace('www.', '') normalized_domain = netloc.split('/')[0].rstrip('/') # Include Etsy shop slug if present if 'etsy.com' in normalized_domain and parsed.path: match = re.search(r'/shop/([^/?#]+)', parsed.path, re.IGNORECASE) if match: normalized_domain = f"{normalized_domain}-{match.group(1).lower()}" except: normalized_domain = domain.replace('https://', '').replace('http://', '').replace('www.', '').split('/')[0].rstrip('/') pattern = normalized_domain.replace('.', '-') # Find matching CSV files exports_dir = Path('data/exports') if exports_dir.exists(): csv_files = list(exports_dir.glob(f"{pattern}-*-wc.csv")) if csv_files and job_started_at: # Filter files created after job started filtered_files = [] for csv_file in csv_files: file_mtime = datetime.fromtimestamp(csv_file.stat().st_mtime) # File must be created after job started (with 1 minute buffer for clock skew) if file_mtime >= (job_started_at - timedelta(minutes=1)): filtered_files.append(csv_file) if filtered_files: # Sort by modification time, get latest filtered_files.sort(key=lambda p: p.stat().st_mtime, reverse=True) file_path = str(filtered_files[0]) file_name = filtered_files[0].name elif csv_files: # Fallback: use latest file even if before job started csv_files.sort(key=lambda p: p.stat().st_mtime, reverse=True) file_path = str(csv_files[0]) file_name = csv_files[0].name elif csv_files: # No job_started_at, use latest file csv_files.sort(key=lambda p: p.stat().st_mtime, reverse=True) file_path = str(csv_files[0]) file_name = csv_files[0].name # Fallback to file_path from job if not found if not file_path and job.get('file_path'): file_path = job['file_path'] file_name = Path(file_path).name # Construct file URL if file_name: file_url = f"/data/exports/{file_name}" return { "success": True, "data": { "job_id": job['job_id'], "domain": job['domain'], "status": job['status'], "file": file_name, "file_path": file_path, "file_url": file_url, "products_count": job['products_count'], "completed_at": job['completed_at'] } } if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8001)