#!/usr/bin/env python3 """ View WordPress plugin logs from server. """ import json import sys from pathlib import Path from datetime import datetime, timedelta from typing import Optional, List, Dict, Any import argparse def get_logs_dir() -> Path: """Get logs directory.""" logs_dir = Path(__file__).parent.parent / "logs" / "wordpress" return logs_dir def load_logs(days: int = 7, session_id: Optional[str] = None) -> List[Dict[str, Any]]: """Load logs from files.""" logs_dir = get_logs_dir() all_logs = [] if not logs_dir.exists(): print(f"Logs directory not found: {logs_dir}") return [] # Read logs from last N days for i in range(days): date = datetime.now() - timedelta(days=i) log_file = logs_dir / f"wp-logs-{date.strftime('%Y-%m-%d')}.jsonl" if log_file.exists(): with open(log_file, "r", encoding="utf-8") as f: for line in f: if line.strip(): try: log_data = json.loads(line) # Filter by session_id if provided if not session_id or log_data.get("session_id") == session_id: all_logs.append(log_data) except json.JSONDecodeError: continue # Sort by timestamp (newest first) all_logs.sort(key=lambda x: x.get("timestamp", ""), reverse=True) return all_logs def format_log_entry(log: Dict[str, Any]) -> str: """Format a log entry for display.""" timestamp = log.get("timestamp", "") level = log.get("level", "info").upper() message = log.get("message", "") session_id = log.get("session_id", "") site_url = log.get("site_url", "") context = log.get("context", {}) # Format timestamp try: dt = datetime.fromisoformat(timestamp.replace("Z", "+00:00")) timestamp_str = dt.strftime("%Y-%m-%d %H:%M:%S") except: timestamp_str = timestamp # Build output output = f"[{timestamp_str}] [{level}]" if session_id: output += f" [Session: {session_id[:8]}...]" if site_url: output += f" [Site: {site_url}]" output += f" {message}" # Add context if present if context: context_str = json.dumps(context, ensure_ascii=False, indent=2) output += f"\n Context: {context_str}" return output def filter_logs(logs: List[Dict[str, Any]], level: Optional[str] = None, search: Optional[str] = None) -> List[Dict[str, Any]]: """Filter logs by level or search term.""" filtered = logs if level: filtered = [log for log in filtered if log.get("level", "").lower() == level.lower()] if search: search_lower = search.lower() filtered = [log for log in filtered if search_lower in log.get("message", "").lower() or search_lower in json.dumps(log.get("context", {})).lower()] return filtered def main(): parser = argparse.ArgumentParser(description="View WordPress plugin logs from server") parser.add_argument("--days", type=int, default=7, help="Number of days to look back (default: 7)") parser.add_argument("--session", type=str, help="Filter by session ID") parser.add_argument("--level", type=str, choices=["info", "warning", "error", "debug"], help="Filter by log level") parser.add_argument("--search", type=str, help="Search in log messages") parser.add_argument("--limit", type=int, help="Limit number of logs to display") parser.add_argument("--json", action="store_true", help="Output as JSON") args = parser.parse_args() # Load logs logs = load_logs(days=args.days, session_id=args.session) # Filter logs if args.level or args.search: logs = filter_logs(logs, level=args.level, search=args.search) # Limit results if args.limit: logs = logs[:args.limit] # Output if args.json: print(json.dumps(logs, indent=2, ensure_ascii=False)) else: if not logs: print("No logs found.") return print(f"Found {len(logs)} log entries\n") print("=" * 80) for log in logs: print(format_log_entry(log)) print("-" * 80) if __name__ == "__main__": main()