#!/usr/bin/env python3 """ Background worker to process crawl jobs from queue. Polls database for queued jobs and processes them. """ import sys import time import subprocess import json import re import sqlite3 from pathlib import Path from datetime import datetime, timedelta # Add project root to path project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) from api.crawl.db import get_db_connection, get_job, update_job_status def normalize_domain(domain: str) -> str: """Normalize domain by removing protocol, www, trailing slash, and path.""" from urllib.parse import urlparse try: parsed = urlparse(domain) netloc = parsed.netloc or domain netloc = netloc.replace('www.', '') normalized = netloc.split('/')[0].rstrip('/') # Include Etsy shop slug if present if 'etsy.com' in normalized and parsed.path: match = re.search(r'/shop/([^/?#]+)', parsed.path, re.IGNORECASE) if match: normalized = f"{normalized}-{match.group(1).lower()}" return normalized except Exception: domain = domain.replace('https://', '').replace('http://', '').replace('www.', '') return domain.split('/')[0].rstrip('/') def update_exports_manifest(): """Regenerate exports list JSON so WordPress sees latest files.""" script_path = project_root / 'scripts' / 'update_exports_list.py' if not script_path.exists(): return try: print(f"[{datetime.now()}] Updating exports manifest...") subprocess.run([sys.executable, str(script_path)], check=True) print(f"[{datetime.now()}] Exports manifest updated.") except Exception as e: print(f"[{datetime.now()}] Warning: Failed to update exports manifest: {e}") def find_latest_export_file(domain: str, exports_dir: Path, export_slug: str = None, job_started_at: datetime = None) -> Path: """ Find the latest export file for a domain. Args: domain: Domain name exports_dir: Directory to search for export files export_slug: Optional export slug override job_started_at: Optional datetime - only return files created after this time Returns: Path to latest export file, or None if not found """ if export_slug: pattern = export_slug else: normalized_domain = normalize_domain(domain) pattern = normalized_domain.replace('.', '-') # Find all matching CSV files csv_files = list(exports_dir.glob(f"{pattern}-*-wc.csv")) if not csv_files: return None # Filter files created after job started (if provided) if job_started_at: filtered_files = [] for csv_file in csv_files: file_mtime = datetime.fromtimestamp(csv_file.stat().st_mtime) # File must be created after job started (with 1 minute buffer for clock skew) if file_mtime >= (job_started_at - timedelta(minutes=1)): filtered_files.append(csv_file) if not filtered_files: return None csv_files = filtered_files # Sort by modification time, return latest csv_files.sort(key=lambda p: p.stat().st_mtime, reverse=True) return csv_files[0] def count_products_in_csv(file_path: Path) -> int: """Count products in CSV file (excluding header).""" import csv count = 0 try: with open(file_path, 'r', encoding='utf-8') as f: reader = csv.reader(f) next(reader, None) # Skip header for row in reader: if row and any(cell.strip() for cell in row): count += 1 except Exception as e: print(f"Error counting products: {e}") return count def process_job(job: dict): """Process a single crawl job.""" job_id = job['job_id'] domain = job['domain'] print(f"[{datetime.now()}] Processing job {job_id} for domain: {domain}") config_data = {} raw_config = job.get('config') if raw_config: if isinstance(raw_config, str): try: config_data = json.loads(raw_config) except json.JSONDecodeError: config_data = {} elif isinstance(raw_config, dict): config_data = raw_config config_path = config_data.get('config_path') if config_path: config_path_obj = Path(config_path) if not config_path_obj.is_absolute(): config_path_obj = project_root / config_path_obj if config_path_obj.exists(): config_path = str(config_path_obj) # Merge config payload with config file (for settings like crawl_reviews) if config_data.get('crawling'): try: import yaml # Read existing config with open(config_path_obj, 'r', encoding='utf-8') as f: yaml_config = yaml.safe_load(f) or {} # Merge crawling settings from config payload if 'crawling' not in yaml_config: yaml_config['crawling'] = {} yaml_config['crawling'].update(config_data['crawling']) # Write merged config back to file with open(config_path_obj, 'w', encoding='utf-8') as f: yaml.dump(yaml_config, f, default_flow_style=False, allow_unicode=True) print(f"[{datetime.now()}] Merged config payload with config file: {config_path}") except Exception as e: print(f"[{datetime.now()}] Warning: Failed to merge config payload: {e}") else: print(f"[{datetime.now()}] Warning: Provided config path {config_path} does not exist. Falling back to domain detection.") config_path = None export_slug = config_data.get('export_slug') # Normalize domain for fallback normalized_domain = normalize_domain(domain) print(f"[{datetime.now()}] Normalized domain: {normalized_domain}") if export_slug: print(f"[{datetime.now()}] Export slug override: {export_slug}") # Get job start time (use created_at if started_at not available yet) job_started_at = None if job.get('started_at'): try: job_started_at = datetime.fromisoformat(job['started_at'].replace('Z', '+00:00')) except: pass if not job_started_at and job.get('created_at'): try: job_started_at = datetime.fromisoformat(job['created_at'].replace('Z', '+00:00')) except: pass if not job_started_at: job_started_at = datetime.now() update_job_status(job_id, 'running', current_step='Starting crawl...') try: exports_dir = project_root / 'data' / 'exports' exports_dir.mkdir(parents=True, exist_ok=True) print(f"[{datetime.now()}] Running crawl command...") update_job_status(job_id, 'running', current_step='Crawling website...', progress=10) venv_python = project_root / 'venv' / 'bin' / 'python' if venv_python.exists(): python_executable = str(venv_python) else: python_executable = sys.executable crawl_args = [python_executable, 'main.py', 'crawl'] if config_path: crawl_args.extend(['--config', config_path]) else: crawl_args.append(normalized_domain) crawl_args.extend([ '--format', 'woocommerce', '--output', str(exports_dir), '--fast' ]) result = subprocess.run( crawl_args, cwd=str(project_root), capture_output=True, text=True, timeout=3600 # 1 hour timeout ) # Log output for debugging if result.stdout: print(f"[{datetime.now()}] Crawl stdout (last 1000 chars):") print(result.stdout[-1000:]) if result.stderr: print(f"[{datetime.now()}] Crawl stderr (last 1000 chars):") print(result.stderr[-1000:]) # Check if crawl was interrupted or had no products if result.returncode == 0: # Check stdout for warnings about no products has_no_products_warning = False has_export_success = False if result.stdout: if "No products to export" in result.stdout or "No products to save" in result.stdout: has_no_products_warning = True if "Exported" in result.stdout and "products to WooCommerce" in result.stdout: has_export_success = True # If no export success message and no products warning, likely no products were exported if not has_export_success and not has_no_products_warning: # Check if we can find export file - if not, likely no products time.sleep(1) # Brief wait export_file = find_latest_export_file(normalized_domain, exports_dir, export_slug, job_started_at) if not export_file: error_msg = f"Crawl completed successfully but no export file was created. This usually means no products were found or saved. The website structure may not match the selectors, or the crawl may have been interrupted before products could be exported." print(f"[{datetime.now()}] {error_msg}") print(f"[{datetime.now()}] Crawl stdout snippet (checking for clues):") if result.stdout: # Look for any product-related messages lines = result.stdout.split('\n') product_lines = [l for l in lines if 'product' in l.lower() and ('found' in l.lower() or 'export' in l.lower())] for line in product_lines[-10:]: # Last 10 product-related lines print(f" {line}") update_job_status( job_id, 'failed', error=error_msg, current_step='No products exported' ) return elif has_no_products_warning: error_msg = f"Crawl completed but no products were found or saved. This may happen if the crawl was interrupted or the website structure doesn't match the selectors." print(f"[{datetime.now()}] {error_msg}") update_job_status( job_id, 'failed', error=error_msg, current_step='No products found' ) return if result.returncode != 0: error_msg = result.stderr or result.stdout or "Unknown error" print(f"[{datetime.now()}] Crawl failed (return code {result.returncode}): {error_msg}") update_job_status( job_id, 'failed', error=error_msg[:500], # Limit error message length current_step='Crawl failed' ) return # Wait a bit for file system to sync time.sleep(2) # Find the generated file print(f"[{datetime.now()}] Looking for export file in {exports_dir}...") pattern_preview = f"{(export_slug or normalized_domain.replace('.', '-'))}-*-wc.csv" print(f"[{datetime.now()}] Pattern: {pattern_preview}") # List all CSV files for debugging all_csv_files = list(exports_dir.glob("*.csv")) print(f"[{datetime.now()}] All CSV files in exports dir: {[f.name for f in all_csv_files]}") update_job_status(job_id, 'running', current_step='Finding export file...', progress=90) export_file = find_latest_export_file(normalized_domain, exports_dir, export_slug, job_started_at) if not export_file: # Try to find any recent CSV file (in case domain pattern doesn't match) if all_csv_files: # Sort by modification time all_csv_files.sort(key=lambda p: p.stat().st_mtime, reverse=True) latest_file = all_csv_files[0] file_mtime = datetime.fromtimestamp(latest_file.stat().st_mtime) job_start_time = datetime.strptime(job_id[:8] + '-' + job_id[9:11] + '-' + job_id[12:14], '%Y-%m-%d') if len(job_id) > 14 else datetime.now() # Check if file was created after job started (within last 5 minutes) time_diff = (datetime.now() - file_mtime).total_seconds() if time_diff < 300: # 5 minutes print(f"[{datetime.now()}] Found recent CSV file (created {time_diff:.0f}s ago): {latest_file.name}") export_file = latest_file else: error_msg = f"Export file not found for domain: {domain}. Crawl completed successfully but no matching file found. Latest file: {latest_file.name} (created {time_diff:.0f}s ago)" print(f"[{datetime.now()}] {error_msg}") update_job_status(job_id, 'failed', error=error_msg, current_step='Export file not found') return else: error_msg = f"Export file not found for domain: {domain}. Crawl completed successfully but no CSV files found in exports directory." print(f"[{datetime.now()}] {error_msg}") update_job_status(job_id, 'failed', error=error_msg, current_step='Export file not found') return # Count products print(f"[{datetime.now()}] Counting products in {export_file.name}...") products_count = count_products_in_csv(export_file) # Update job to completed update_job_status( job_id, 'completed', file_path=str(export_file), products_count=products_count, current_step='Completed', progress=100 ) # Refresh exports manifest so WP sees latest file & counts update_exports_manifest() print(f"[{datetime.now()}] Job {job_id} completed successfully!") print(f" File: {export_file.name}") print(f" Products: {products_count}") except subprocess.TimeoutExpired: error_msg = "Crawl timeout (exceeded 1 hour)" print(f"[{datetime.now()}] {error_msg}") update_job_status(job_id, 'failed', error=error_msg, current_step='Timeout') except Exception as e: error_msg = str(e) print(f"[{datetime.now()}] Error processing job: {error_msg}") update_job_status(job_id, 'failed', error=error_msg[:500], current_step='Error occurred') def get_next_queued_job(): """Get the next queued job from database. Also checks for stuck running jobs (older than 30 minutes) and resets them. """ conn = get_db_connection() conn.row_factory = sqlite3.Row cursor = conn.cursor() # First, check for stuck running jobs (older than 30 minutes) # These might be from a previous worker crash/restart cursor.execute(""" SELECT * FROM jobs WHERE status = 'running' AND datetime(created_at) < datetime('now', '-30 minutes') ORDER BY created_at ASC LIMIT 1 """) stuck_job = cursor.fetchone() if stuck_job: stuck_job = dict(stuck_job) job_id = stuck_job['job_id'] domain = stuck_job['domain'] print(f"[{datetime.now()}] Found stuck running job {job_id[:8]}... ({domain}), resetting to queued") update_job_status(job_id, 'queued', current_step='Reset from stuck running state') conn.close() return stuck_job # Get next queued job cursor.execute(""" SELECT * FROM jobs WHERE status = 'queued' ORDER BY created_at ASC LIMIT 1 """) job = cursor.fetchone() conn.close() if job: return dict(job) return None def main(): """Main worker loop.""" print("=" * 60) print("SS Crawler Background Worker") print("=" * 60) print(f"Started at: {datetime.now()}") print("Polling for queued jobs every 5 seconds...") print("Press Ctrl+C to stop") print("=" * 60) print() try: while True: # Get next queued job job = get_next_queued_job() if job: process_job(job) else: # No jobs, wait a bit time.sleep(5) except KeyboardInterrupt: print() print("=" * 60) print("Worker stopped by user") print(f"Stopped at: {datetime.now()}") print("=" * 60) sys.exit(0) except Exception as e: print(f"Fatal error: {e}") import traceback traceback.print_exc() sys.exit(1) if __name__ == '__main__': main()