"""Scrapy pipelines for data processing.""" import json import pandas as pd from pathlib import Path from datetime import datetime from typing import List, Dict from scrapy import Item from scrapy.exceptions import DropItem from scrapy_project.items import ProductItem, Product from urllib.parse import urlparse class ValidationPipeline: """Validate items using Pydantic models.""" def process_item(self, item: ProductItem, spider) -> ProductItem: """Validate item using Pydantic model.""" # Skip validation for variable/variation products (they're handled as raw dicts) attrs = item.get('attributes', {}) product_type = attrs.get('type', 'simple') if isinstance(attrs, dict) else 'simple' if product_type in ['variable', 'variation']: # Don't validate variable/variation products - they're stored as raw dicts return item try: # Convert to Pydantic model for validation product = Product.from_scrapy_item(item) # Update item with validated data validated_dict = product.model_dump(exclude_none=True) for key, value in validated_dict.items(): item[key] = value return item except Exception as e: spider.logger.error(f"Validation error for item {item.get('name', 'Unknown')}: {str(e)}") # Return item anyway, but mark as invalid item['_validation_error'] = str(e) return item class DeduplicationPipeline: """Remove duplicate items based on normalized URL or SKU.""" def __init__(self): self.ids_seen = set() def _normalize_product_url(self, url: str) -> str: """Normalize product URL to extract unique identifier (same logic as analyzer).""" if not url: return None # Skip invalid URLs if url.startswith(('javascript:', 'data:', 'mailto:')) or not url.startswith('http'): return None # Extract product slug from URL (e.g., /products/dalat-arabica) import re product_slug_match = re.search(r'/products/([^/?#]+)', url) if product_slug_match: product_slug = product_slug_match.group(1).lower() parsed = urlparse(url) normalized_url = f"{parsed.scheme}://{parsed.netloc}/products/{product_slug}" return normalized_url # Fallback: normalize full URL (remove query params, fragments) parsed = urlparse(url) normalized_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}".rstrip('/') # Skip if it's not a product URL (collections, images, etc.) if '/products/' not in normalized_url: return None return normalized_url def process_item(self, item: ProductItem, spider) -> ProductItem: """Check for duplicates using normalized URL.""" # For variations, use URL + attribute values as identifier attrs = item.get('attributes', {}) is_variation = attrs.get('type') == 'variation' if isinstance(attrs, dict) else False if is_variation: # Variations: Use SKU if available, otherwise URL + attribute values if item.get('sku'): identifier = f"sku:{item.get('sku')}" else: # Build identifier from URL + attribute values product_url = item.get('product_url', '') normalized_url = self._normalize_product_url(product_url) if product_url else None if normalized_url: # Collect attribute values attr_values = [] for i in range(1, 4): attr_value = attrs.get(f'attribute_{i}_value', '') if attr_value: attr_values.append(str(attr_value)) if attr_values: identifier = f"{normalized_url}#{'|'.join(attr_values)}" else: # Fallback: use URL + price as identifier price = item.get('price', '') identifier = f"{normalized_url}#price:{price}" else: # Last resort: use name + attribute values name = item.get('name', '') attr_values = [] for i in range(1, 4): attr_value = attrs.get(f'attribute_{i}_value', '') if attr_value: attr_values.append(str(attr_value)) if attr_values: identifier = f"name:{name}#{'|'.join(attr_values)}" else: spider.logger.warning(f"Variation has no identifier: {item}") raise DropItem(f"Variation has no identifier") else: # For simple/variable products, use URL or SKU or name product_url = item.get('product_url', '') normalized_url = self._normalize_product_url(product_url) if product_url else None # Use normalized URL, SKU, or name as identifier if normalized_url: identifier = normalized_url elif item.get('sku'): identifier = f"sku:{item.get('sku')}" elif item.get('name'): # Skip common category names that are not products category_keywords = ['các sản phẩm khác', 'other products', 'view all', 'see all', 'shop all'] name_lower = item.get('name', '').lower() if any(keyword in name_lower for keyword in category_keywords): spider.logger.debug(f"Skipping category name: {item.get('name')}") raise DropItem(f"Skipping category name: {item.get('name')}") identifier = f"name:{item.get('name')}" else: spider.logger.warning(f"Item has no identifier: {item}") raise DropItem(f"Item has no identifier") if identifier in self.ids_seen: spider.logger.debug(f"Duplicate item found: {identifier} (original URL: {product_url})") raise DropItem(f"Duplicate item found: {identifier}") self.ids_seen.add(identifier) return item class StoragePipeline: """Store items to JSON file.""" def __init__(self, output_dir: str = 'data'): self.output_dir = Path(output_dir) self.output_dir.mkdir(parents=True, exist_ok=True) self.products: List[Dict] = [] @classmethod def from_crawler(cls, crawler): """Create pipeline instance from crawler settings.""" output_dir = crawler.settings.get('DATA_OUTPUT_DIR', 'data') return cls(output_dir=output_dir) def open_spider(self, spider): """Called when spider is opened.""" self.products = [] spider.logger.info(f"Storage pipeline initialized. Output directory: {self.output_dir}") def process_item(self, item: ProductItem, spider) -> ProductItem: """Process item and store in memory.""" # Convert to dict, excluding None values product_dict = {k: v for k, v in item.items() if v is not None} # Convert datetime to string for JSON serialization if 'crawled_at' in product_dict and isinstance(product_dict['crawled_at'], datetime): product_dict['crawled_at'] = product_dict['crawled_at'].isoformat() self.products.append(product_dict) return item def _get_domain_from_spider(self, spider) -> str: """Extract domain from spider's base_url.""" custom_slug = getattr(spider, 'export_domain', None) if custom_slug: return custom_slug base_url = getattr(spider, 'base_url', '') if base_url: try: parsed = urlparse(base_url) domain = parsed.netloc.replace('www.', '').replace('.', '-') return domain except: pass return 'unknown' def close_spider(self, spider): """Called when spider is closed. Save all products to JSON.""" if not self.products: spider.logger.warning("No products to save") return # Generate filename with domain and timestamp domain = self._get_domain_from_spider(spider) timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") filename = f"{domain}-{timestamp}.json" filepath = self.output_dir / filename # Save to JSON with open(filepath, 'w', encoding='utf-8') as f: json.dump(self.products, f, ensure_ascii=False, indent=2) spider.logger.info(f"Saved {len(self.products)} products to {filepath}") # Store filepath in spider stats for export pipeline spider.crawler.stats.set_value('products_json_file', str(filepath)) spider.crawler.stats.set_value('products_count', len(self.products)) class ExportPipeline: """Export products to various formats (WooCommerce, Shopify, Excel).""" def __init__(self, export_dir: str = 'data/exports', export_format: str = 'woocommerce'): self.export_dir = Path(export_dir) self.export_dir.mkdir(parents=True, exist_ok=True) self.export_format = export_format self.products: List[Product] = [] @classmethod def from_crawler(cls, crawler): """Create pipeline instance from crawler settings.""" export_dir = crawler.settings.get('EXPORT_DIR', 'data/exports') export_format = crawler.settings.get('EXPORT_FORMAT', 'woocommerce') return cls(export_dir=export_dir, export_format=export_format) def open_spider(self, spider): """Called when spider is opened.""" self.products = [] spider.logger.info(f"Export pipeline initialized. Format: {self.export_format}, Directory: {self.export_dir}") def process_item(self, item: ProductItem, spider) -> ProductItem: """Convert item to Pydantic model or store raw dict for variable products.""" try: # Check if this is a variable product or variation item_dict = dict(item) attrs = item_dict.get('attributes', {}) product_type = attrs.get('type', 'simple') if isinstance(attrs, dict) else 'simple' if product_type in ['variable', 'variation']: # Store as raw dict for variable products to preserve structure self.products.append(item_dict) spider.logger.debug(f"Added {product_type} product to export: {item_dict.get('name', 'N/A')}") else: # Convert to Pydantic model for simple products product = Product.from_scrapy_item(item) self.products.append(product) spider.logger.debug(f"Added product to export: {product.name} ({product.product_url})") except Exception as e: spider.logger.error(f"Error processing item: {str(e)}") spider.logger.debug(f"Item data: {dict(item)}") return item def _get_domain_from_spider(self, spider) -> str: """Extract domain slug from spider (prefers custom export slug).""" custom_slug = getattr(spider, 'export_domain', None) if custom_slug: return custom_slug base_url = getattr(spider, 'base_url', '') if base_url: try: parsed = urlparse(base_url) domain = parsed.netloc.replace('www.', '').replace('.', '-') return domain except: pass return 'unknown' def close_spider(self, spider): """Export products when spider closes.""" if not self.products: spider.logger.warning("No products to export") return # Generate domain and timestamp for filename domain = self._get_domain_from_spider(spider) timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") # Export based on format if self.export_format == 'woocommerce': self._export_woocommerce(spider, domain, timestamp) elif self.export_format == 'shopify': self._export_shopify(spider, domain, timestamp) elif self.export_format == 'excel': self._export_excel(spider, domain, timestamp) elif self.export_format == 'json': self._export_json(spider, domain, timestamp) elif self.export_format == 'all': self._export_woocommerce(spider, domain, timestamp) self._export_shopify(spider, domain, timestamp) self._export_excel(spider, domain, timestamp) self._export_json(spider, domain, timestamp) def _export_woocommerce(self, spider, domain: str, timestamp: str): """Export to WooCommerce CSV format with variable product support.""" rows = [] for product in self.products: # Handle both dict and Product objects if isinstance(product, dict): # Raw dict from variable product attrs = product.get('attributes', {}) product_type = attrs.get('type', 'simple') if isinstance(attrs, dict) else 'simple' name = product.get('name', '') sku = product.get('sku', '') price = product.get('price') original_price = product.get('original_price') description = product.get('description', '') images = product.get('images', []) category = product.get('category', '') stock_status = product.get('stock_status', 'instock') else: # Product Pydantic model attrs = product.attributes if hasattr(product, 'attributes') and product.attributes else {} product_type = attrs.get('type', 'simple') if isinstance(attrs, dict) else 'simple' name = product.name sku = product.sku or '' price = product.price original_price = product.original_price description = product.description or '' images = product.images or [] category = product.category or '' stock_status = product.stock_status or 'instock' if product_type == 'variable': # Parent product row # Get reviews from product reviews = product.get('reviews', []) if isinstance(product, dict) else [] reviews_json = json.dumps(reviews) if reviews else '' row = { 'Type': 'variable', 'SKU': sku, 'Name': name, 'Published': 1, 'Is featured?': 0, 'Visibility in catalog': 'visible', 'Short description': description[:200] if description else '', 'Description': description, 'Date sale price starts': '', 'Date sale price ends': '', 'Tax status': 'taxable', 'Tax class': '', 'In stock?': 1, 'Stock': '', 'Low stock amount': '', 'Backorders allowed?': 0, 'Sold individually?': 0, 'Weight (kg)': '', 'Length (cm)': '', 'Width (cm)': '', 'Height (cm)': '', 'Allow customer reviews?': 1, 'Purchase note': '', 'Sale price': '', 'Regular price': '', 'Categories': category, 'Tags': '', 'Shipping class': '', 'Images': '|'.join(images) if images else '', 'Download limit': '', 'Download expiry days': '', 'Parent': '', 'Grouped products': '', 'Upsells': '', 'Cross-sells': '', 'External URL': '', 'Button text': '', 'Position': 0, 'Reviews': reviews_json } # Add attribute columns for i in range(1, 4): attr_name_key = f'attribute_{i}_name' attr_values_key = f'attribute_{i}_values' attr_visible_key = f'attribute_{i}_visible' if attr_name_key in attrs: row[f'Attribute {i} name'] = attrs[attr_name_key] row[f'Attribute {i} value(s)'] = attrs[attr_values_key] row[f'Attribute {i} visible'] = attrs.get(attr_visible_key, '1') row[f'Attribute {i} default'] = '' else: row[f'Attribute {i} name'] = '' row[f'Attribute {i} value(s)'] = '' row[f'Attribute {i} visible'] = '' row[f'Attribute {i} default'] = '' rows.append(row) elif product_type == 'variation': # Variation row row = { 'Type': 'variation', 'SKU': sku, 'Name': name, 'Published': 1, 'Is featured?': 0, 'Visibility in catalog': 'visible', 'Short description': '', 'Description': '', 'Date sale price starts': '', 'Date sale price ends': '', 'Tax status': 'taxable', 'Tax class': '', 'In stock?': 1 if stock_status == 'in_stock' else 0, 'Stock': '', 'Low stock amount': '', 'Backorders allowed?': 0, 'Sold individually?': 0, 'Weight (kg)': attrs.get('weight_grams', '') if isinstance(attrs, dict) else '', 'Length (cm)': '', 'Width (cm)': '', 'Height (cm)': '', 'Allow customer reviews?': 1, 'Purchase note': '', 'Sale price': price if original_price else '', 'Regular price': original_price if original_price else price or '', 'Categories': '', 'Tags': '', 'Shipping class': '', 'Images': '', # Variations typically don't have separate images 'Download limit': '', 'Download expiry days': '', 'Parent': '', # WooCommerce will match by name 'Grouped products': '', 'Upsells': '', 'Cross-sells': '', 'External URL': '', 'Button text': '', 'Position': 0 } # Add attribute columns for variations for i in range(1, 4): attr_name_key = f'attribute_{i}_name' attr_value_key = f'attribute_{i}_value' if attr_name_key in attrs: row[f'Attribute {i} name'] = attrs[attr_name_key] row[f'Attribute {i} value(s)'] = attrs.get(attr_value_key, '') row[f'Attribute {i} visible'] = '' row[f'Attribute {i} default'] = '' else: row[f'Attribute {i} name'] = '' row[f'Attribute {i} value(s)'] = '' row[f'Attribute {i} visible'] = '' row[f'Attribute {i} default'] = '' rows.append(row) else: # Simple product row if isinstance(product, dict): # Get reviews from product reviews = product.get('reviews', []) reviews_json = json.dumps(reviews) if reviews else '' # Convert dict to simple CSV row row = { 'Type': 'simple', 'SKU': sku, 'Name': name, 'Published': 1, 'Is featured?': 0, 'Visibility in catalog': 'visible', 'Short description': description[:200] if description else '', 'Description': description, 'Date sale price starts': '', 'Date sale price ends': '', 'Tax status': 'taxable', 'Tax class': '', 'In stock?': 1 if stock_status == 'in_stock' else 0, 'Stock': '', 'Low stock amount': '', 'Backorders allowed?': 0, 'Sold individually?': 0, 'Weight (kg)': '', 'Length (cm)': '', 'Width (cm)': '', 'Height (cm)': '', 'Allow customer reviews?': 1, 'Purchase note': '', 'Sale price': price if original_price else '', 'Regular price': original_price if original_price else price or '', 'Categories': category, 'Tags': '', 'Shipping class': '', 'Images': '|'.join(images) if images else '', 'Download limit': '', 'Download expiry days': '', 'Parent': '', 'Grouped products': '', 'Upsells': '', 'Cross-sells': '', 'External URL': '', 'Button text': '', 'Position': 0, 'Reviews': reviews_json } else: # Use Pydantic model's method row = product.to_woocommerce_csv_row() # Add reviews if available reviews = product.reviews if hasattr(product, 'reviews') and product.reviews else [] row['Reviews'] = json.dumps(reviews) if reviews else '' # Add empty attribute columns for consistency for i in range(1, 4): row[f'Attribute {i} name'] = '' row[f'Attribute {i} value(s)'] = '' row[f'Attribute {i} visible'] = '' row[f'Attribute {i} default'] = '' rows.append(row) df = pd.DataFrame(rows) filename = f"{domain}-{timestamp}-wc.csv" filepath = self.export_dir / filename df.to_csv(filepath, index=False, encoding='utf-8-sig') spider.logger.info(f"Exported {len(rows)} rows ({len(self.products)} products) to WooCommerce format: {filepath}") def _export_shopify(self, spider, domain: str, timestamp: str): """Export to Shopify CSV format with multiple rows per product (one per image).""" # Use to_shopify_csv_rows() to get multiple rows per product (one per image) all_rows = [] for product in self.products: product_rows = product.to_shopify_csv_rows() all_rows.extend(product_rows) df = pd.DataFrame(all_rows) filename = f"{domain}-{timestamp}-shopify.csv" filepath = self.export_dir / filename df.to_csv(filepath, index=False, encoding='utf-8-sig') spider.logger.info( f"Exported {len(self.products)} products ({len(all_rows)} rows) to Shopify format: {filepath}" ) def _export_excel(self, spider, domain: str, timestamp: str): """Export to Excel format.""" rows = [product.model_dump() for product in self.products] df = pd.DataFrame(rows) filename = f"{domain}-{timestamp}.xlsx" filepath = self.export_dir / filename df.to_excel(filepath, index=False, engine='openpyxl') spider.logger.info(f"Exported {len(self.products)} products to Excel format: {filepath}") def _export_json(self, spider, domain: str, timestamp: str): """Export to JSON format.""" data = [product.model_dump() for product in self.products] filename = f"{domain}-{timestamp}.json" filepath = self.export_dir / filename with open(filepath, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2, default=str) spider.logger.info(f"Exported {len(self.products)} products to JSON format: {filepath}")