"""Database utilities for crawl queue.""" import sqlite3 import uuid from pathlib import Path from typing import Optional, Dict, Any from datetime import datetime DB_PATH = Path(__file__).parent.parent.parent / 'data' / 'crawl_queue.db' def init_db(): """Initialize database and create tables if they don't exist.""" DB_PATH.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Create jobs table cursor.execute(""" CREATE TABLE IF NOT EXISTS jobs ( job_id TEXT PRIMARY KEY, domain TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'queued', progress INTEGER DEFAULT 0, current_step TEXT, products_processed INTEGER DEFAULT 0, products_total INTEGER DEFAULT 0, file_path TEXT, products_count INTEGER, error TEXT, config TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, started_at TIMESTAMP, completed_at TIMESTAMP ) """) # Create indexes cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_status ON jobs(status) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_domain ON jobs(domain) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_created_at ON jobs(created_at) """) conn.commit() conn.close() def get_db_connection(): """Get database connection.""" DB_PATH.parent.mkdir(parents=True, exist_ok=True) # Ensure database is initialized init_db() return sqlite3.connect(DB_PATH) def create_job(domain: str, config: Optional[Dict] = None) -> str: """Create a new crawl job and return job_id.""" job_id = str(uuid.uuid4()) conn = get_db_connection() cursor = conn.cursor() config_json = None if config: import json config_json = json.dumps(config) cursor.execute(""" INSERT INTO jobs (job_id, domain, status, config, created_at) VALUES (?, ?, 'queued', ?, CURRENT_TIMESTAMP) """, (job_id, domain, config_json)) conn.commit() conn.close() return job_id def get_job(job_id: str) -> Optional[Dict[str, Any]]: """Get job by job_id.""" conn = get_db_connection() conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute("SELECT * FROM jobs WHERE job_id = ?", (job_id,)) row = cursor.fetchone() conn.close() if row: return dict(row) return None def update_job_status(job_id: str, status: str, **kwargs): """Update job status and optional fields.""" conn = get_db_connection() cursor = conn.cursor() updates = ['status = ?'] values = [status] if 'progress' in kwargs: updates.append('progress = ?') values.append(kwargs['progress']) if 'current_step' in kwargs: updates.append('current_step = ?') values.append(kwargs['current_step']) if 'file_path' in kwargs: updates.append('file_path = ?') values.append(kwargs['file_path']) if 'products_count' in kwargs: updates.append('products_count = ?') values.append(kwargs['products_count']) if 'products_processed' in kwargs: updates.append('products_processed = ?') values.append(kwargs['products_processed']) if 'products_total' in kwargs: updates.append('products_total = ?') values.append(kwargs['products_total']) if 'error' in kwargs: updates.append('error = ?') values.append(kwargs['error']) if status == 'running' and 'started_at' not in [k for k in kwargs.keys()]: updates.append('started_at = CURRENT_TIMESTAMP') if status in ('completed', 'failed') and 'completed_at' not in [k for k in kwargs.keys()]: updates.append('completed_at = CURRENT_TIMESTAMP') values.append(job_id) query = f"UPDATE jobs SET {', '.join(updates)} WHERE job_id = ?" cursor.execute(query, values) conn.commit() conn.close()