from __future__ import annotations
import csv
import json
import logging
from argparse import ArgumentParser, Namespace
from datetime import datetime
from importlib import resources
from pathlib import Path
from typing import Any, List
from urllib.parse import urlparse
from . import UrlRecord, SiteClassifier, check_urls, extract_urls_from_paths, stream_extract_from_file
from .core.extract import extract_https_urls, urls_to_csv
from .core.update_yaml import add_user_domain, enrich_domain, remove_user_domain, load_db
logger = logging.getLogger(__name__)
PACKAGE_RESOURCE_DB = "ucsmith_db.yaml"
USER_DB_NAME = "usmith_db.yaml"
def build_parser() -> ArgumentParser:
"""
Builds and returns an ArgumentParser instance configured for the `urlcheck-smith` tool.
The tool provides multiple subcommands for extracting, classifying, and checking URLs,
as well as managing a credibility database for domain classification. Each subcommand
supports specific flags and parameters to customize its behavior.
Returns:
ArgumentParser: A configured ArgumentParser instance for the `urlcheck-smith` tool.
Subcommands:
scan:
Extracts URLs from text files and runs a classification and HTTP check pipeline.
Arguments:
paths: A list of input text files to scan.
output: An optional output file path. If not provided, a timestamped filename
is generated.
format: The output file format (`csv` or `jsonl`). Defaults to `csv`.
no_http: A flag to skip HTTP status checks, limiting the operation to extraction
and classification.
timeout: A timeout value in seconds for HTTP requests. Defaults to 5.0 seconds.
user_agent: A custom User-Agent string for HTTP requests.
rules: One or more optional YAML rules files for classification, which are merged
with database rules.
verbose: A flag to enable verbose logging.
classify-url:
Classifies a single URL using domain suffix rules.
Arguments:
url: The URL to be classified.
rules: One or more optional YAML rules files for classification, merged with
database rules.
format: The output format (`json` or `text`). Defaults to `json`.
explain: A flag to provide an explanation for the classification decision.
preset: A preset configuration for classification, selecting from predefined
options such as `japan`, `eu`, or `global`.
quiet: A flag to suppress output.
normalize_domain: A flag to normalize the domain before classification.
classify:
Classifies multiple URLs from a file without performing HTTP checks.
Arguments:
path: A file containing one URL per line.
output: An optional output file path.
rules: One or more optional YAML rules files for classification, merged with
database rules.
format: The output format (`csv` or `jsonl`). Defaults to `csv`.
explain: A flag to provide explanations for classification decisions.
quiet: A flag to suppress output.
normalize_domain: A flag to normalize domains before classification.
db:
Manages the UC Smith credibility database (`usmith_db.yaml`).
Subcommands:
update:
Updates or enriches a domain's data in the database.
Arguments:
domain: The domain to be updated or enriched (e.g., example.com).
add:
Adds a new trusted domain to the user-defined section of the database.
Arguments:
domain: The domain to add.
category: An optional category for the domain. Defaults to `General`.
remove:
Removes a domain from the user-defined section of the database.
Arguments:
domain: The domain to remove.
init:
Creates a local writable `usmith_db.yaml` database file from the standard baseline.
Arguments:
force: A flag to overwrite an existing local database file if one exists.
target: A target path for the initialized database file. Defaults to
`./usmith_db.yaml`.
"""
parser = ArgumentParser(
prog="urlcheck-smith",
description="Battery-included URL extraction / classification / HTTP check pipeline.",
)
sub = parser.add_subparsers(dest="command", required=True)
# --- scan subcommand -----------------------------------------------------
scan = sub.add_parser(
"scan",
help="Extract URLs from text files and run classify+check pipeline.",
)
scan.add_argument(
"paths",
nargs="+",
help="Input files to scan (treated as plain text).",
)
scan.add_argument(
"-o",
"--output",
type=Path,
default=None,
help="Output file. If omitted, a timestamped filename is generated.",
)
scan.add_argument(
"--format",
choices=["csv", "jsonl"],
default="csv",
help="Output format: csv (default) or jsonl.",
)
scan.add_argument(
"--no-http",
action="store_true",
help="Skip HTTP status check (extract+classify only).",
)
scan.add_argument(
"--timeout",
type=float,
default=5.0,
help="HTTP timeout per request in seconds (default: 5.0).",
)
scan.add_argument(
"--user-agent",
help="Custom User-Agent for HTTP requests.",
)
scan.add_argument(
"--rules",
type=Path,
action="append",
help="Optional YAML rules file for classifier (merges with database rules). Can be specified multiple times.",
)
scan.add_argument(
"-v", "--verbose", action="store_true", help="Enable verbose logging."
)
# --- classify-url subcommand --------------------------------------------
classify = sub.add_parser(
"classify-url",
help="Classify a single URL using suffix rules.",
)
classify.add_argument(
"url",
help="URL to classify.",
)
classify.add_argument(
"--rules",
type=Path,
action="append",
help="Optional YAML rules file for classifier (merges with database rules). Can be specified multiple times.",
)
classify.add_argument(
"--format",
choices=["json", "text"],
default="json",
help="Output format: json (default) or text.",
)
classify.add_argument("--explain", action="store_true")
classify.add_argument("--preset", choices=["japan", "eu", "global"])
classify.add_argument("--quiet", action="store_true")
classify.add_argument("--normalize-domain", action="store_true")
# classify (batch)
batch = sub.add_parser(
"classify",
help="Classify URLs from a file (one URL per line). No HTTP check.",
)
batch.add_argument("path", type=Path, help="File with one URL per line.")
batch.add_argument("-o", "--output", type=Path, default=None)
batch.add_argument(
"--rules",
type=Path,
action="append",
help="Custom YAML rules. Merges with database rules. Can be specified multiple times.",
)
batch.add_argument("--format", choices=["csv", "jsonl"], default="csv")
batch.add_argument("--explain", action="store_true")
batch.add_argument("--quiet", action="store_true")
batch.add_argument("--normalize-domain", action="store_true")
# --- db subcommand ------------------------------------------------------
db_parser = sub.add_parser(
"db",
help="Manage the UC Smith credibility database (usmith_db.yaml).",
)
db_sub = db_parser.add_subparsers(dest="db_command", required=True)
db_update = db_sub.add_parser(
"update",
help="Enrich/Update a domain in the database.",
)
db_update_group = db_update.add_mutually_exclusive_group(required=False)
db_update_group.add_argument(
"domain", nargs="?", help="Domain to enrich (e.g., example.com)."
)
db_update_group.add_argument(
"--file",
"-f",
type=Path,
help="File containing a list of URLs or domains to enrich.",
)
db_update.add_argument(
"--all",
action="store_true",
help="Update all domains currently in the discovered cache.",
)
db_update.add_argument(
"--no-api",
action="store_true",
help="Disable Google Fact Check API usage even when the API key is available.",
)
db_add = db_sub.add_parser("add", help="Add a trusted domain to user_defined.")
db_add.add_argument("domain", help="Domain to add.")
db_add.add_argument("--category", default="General", help="Category for the domain.")
db_remove = db_sub.add_parser("remove", help="Remove a domain from user_defined.")
db_remove.add_argument("domain", help="Domain to remove.")
# --- init subcommand ----------------------------------------------------
init = sub.add_parser(
"init",
help="Create a local writable usmith_db.yaml from the packaged baseline.",
)
init.add_argument(
"--force",
action="store_true",
help="Overwrite an existing local database file.",
)
init.add_argument(
"--target",
type=Path,
default=Path.cwd() / USER_DB_NAME,
help="Target path for the initialized database (default: ./usmith_db.yaml).",
)
# --- extract-https subcommand -------------------------------------------
extract_https = sub.add_parser(
"extract-https",
help="Interactively extract unique HTTPS URLs from a file and save them to CSV.",
)
extract_https.add_argument(
"--input",
"-i",
type=Path,
default=None,
help="Source text file. If omitted, you will be prompted.",
)
extract_https.add_argument(
"--output",
"-o",
type=Path,
default=None,
help="Output CSV path. If omitted, you will be prompted (blank uses a timestamped default).",
)
return parser
def _timestamped_output(prefix: str, suffix: str) -> Path:
"""
Generates a timestamped filepath with the given prefix and suffix.
The function appends the current timestamp, formatted as "YYYYMMDD_HHMMSS",
between the provided prefix and suffix to create a unique output filename.
Args:
prefix (str): The prefix for the output filename.
suffix (str): The suffix for the output filename.
Returns:
Path: A `Path` object representing the generated timestamped filepath.
"""
stamp = datetime.now().strftime("%Y%m%d_%H%M%S")
return Path(f"{prefix}_{stamp}{suffix}")
def _init_local_db(target: Path, force: bool = False) -> int:
"""
Initializes a local database at the specified target location.
This function creates a local database file at the provided target path. If the file
already exists and the `force` flag is not set to True, it will not overwrite the
existing file and will return a success code indicating the database already exists.
Args:
target (Path): The path to the target location where the local database is to be
initialized.
force (bool): A flag indicating whether to overwrite the database file if it
already exists. Defaults to False.
Returns:
int: A success code indicating the result of the initialization process. Returns
1 if the database already exists and was not overwritten. Returns 0 if the
database was successfully initialized.
"""
if target.exists() and not force:
logger.warning(f"Database already exists: {target}")
return 1
target.parent.mkdir(parents=True, exist_ok=True)
resource_db = resources.files("urlcheck_smith.resources").joinpath(PACKAGE_RESOURCE_DB)
target.write_text(resource_db.read_text(encoding="utf-8"), encoding="utf-8")
logger.info(f"Initialized local database at {target}")
return 0
def run_check(args: Namespace) -> int:
"""
Extracts URLs from provided paths, classifies the URLs, optionally performs HTTP checks, and writes the
results to an output file in the specified format.
Args:
args (Namespace): The arguments necessary for the URL check process. Expected attributes include:
- paths (List[str]): A list of paths from which URLs will be extracted.
- rules (str): Path to the classification rules file.
- verbose (bool): If True, enables verbose output during URL classification.
- no_http (bool): If True, skips HTTP checks on the URLs.
- timeout (int): Timeout in seconds for HTTP requests during the check.
- user_agent (str): User-Agent string to use for HTTP requests.
- output (Optional[str]): Optional custom path for the output results. If not provided, a
timestamped file will be generated.
- format (str): Format of the output file (e.g., "csv" or "jsonl").
Returns:
int: An exit code indicating the success (0) or failure of the process.
"""
paths = [Path(p) for p in args.paths]
logger.info(f"Extracting URLs from {len(paths)} path(s)...")
records: List[UrlRecord] = extract_urls_from_paths(paths)
logger.info(f"Found {len(records)} unique URLs.")
logger.info("Classifying URLs...")
classifier = SiteClassifier(
rules_path=args.rules,
explain=args.verbose,
)
records = classifier.classify(records)
if not args.no_http:
logger.info(f"Running HTTP checks (timeout={args.timeout}s)...")
records = check_urls(
records,
timeout=args.timeout,
user_agent=args.user_agent,
)
output = args.output
if output is None:
output = _timestamped_output(
"urlcheck_results",
".csv" if args.format == "csv" else ".jsonl",
)
logger.info(f"Writing results to {output}...")
if args.format == "csv":
write_csv(output, records)
else:
write_jsonl(output, records)
logger.info("Done.")
return 0
def run_classify_url(args: Namespace) -> int:
"""
Classifies a given URL based on specified rules and outputs the result in either JSON or plain text format.
This function utilizes the `SiteClassifier` class to apply classification rules and outputs the classification
result, including details such as category and trust tier. The output format can be controlled via the `args.format`
parameter.
Args:
args (Namespace): The command-line arguments containing the URL to classify and additional classification options,
such as the path to rules, domain normalization, explanation flag, and output format.
Returns:
int: Exit code indicating the status of the execution. Typically, 0 indicates successful classification.
"""
classifier = SiteClassifier(
rules_path=args.rules,
explain=args.explain,
normalize_domain=args.normalize_domain,
)
rec = UrlRecord(url=args.url)
rec = classifier.classify([rec])[0]
if args.format == "json":
obj = {
"url": rec.url,
"base_url": rec.base_url,
"category": rec.category,
"trust_tier": rec.trust_tier,
}
if rec.explain:
obj["explain"] = rec.explain
print(json.dumps(obj, ensure_ascii=False))
else:
print(f"url={rec.url}")
print(f"base_url={rec.base_url}")
print(f"category={rec.category}")
print(f"trust_tier={rec.trust_tier}")
if rec.explain:
print(f"explain={rec.explain}")
return 0
def run_classify(args: Namespace) -> int:
"""
Classifies URLs from the input path based on the provided rules and writes the
classification results to the specified output file. Supports classification in
multiple output formats (CSV, JSONL).
Args:
args (Namespace): The arguments for configuring the classification process.
Includes the following attributes:
- path (str): The file path containing URLs to classify.
- rules (str): The path to the rules for classification.
- explain (bool): Whether to produce explanation for classifications.
- normalize_domain (bool): Whether to normalize domain names while classifying.
- quiet (bool): If True, only prints categories to stdout without saving results.
- output (str | None): The output file path. If None, creates a timestamped file.
- format (str): Output format of the classification results, either "csv"
or "jsonl".
Returns:
int: The exit code of the classification process. Always returns 0 on success.
"""
logger.info(f"Reading URLs from {args.path}...")
recs = extract_urls_from_paths([args.path])
logger.info(f"Loaded {len(recs)} URLs.")
logger.info("Classifying...")
clf = SiteClassifier(
rules_path=args.rules,
explain=args.explain,
normalize_domain=args.normalize_domain,
)
recs = clf.classify(recs)
if args.quiet:
for r in recs:
print(r.category)
return 0
output = args.output
if output is None:
output = _timestamped_output(
"classified",
".csv" if args.format == "csv" else ".jsonl",
)
logger.info(f"Writing results to {output}...")
if args.format == "csv":
write_csv(output, recs)
else:
write_jsonl(output, recs)
logger.info("Done.")
return 0
def run_db(args: Namespace) -> int:
"""
Executes database-related operations such as updating, adding, or removing domains.
This function processes commands based on the value of the `db_command` parameter in the
provided arguments. Supported commands include updating domains with enrichment from
a file or domain input, adding user domains with categories, and removing user domains.
Args:
args (Namespace): A namespace object containing command arguments for database
operations. Expected attributes include `db_command` (str), `file` (Path or None),
`domain` (str or None), `category` (str or None), and `no_api` (bool).
Returns:
int: An integer indicating the exit status of the operation. Typically, `0` is returned
for success, while `1` indicates a failure, such as a missing file.
"""
args_dict = vars(args)
if args.db_command == "update":
use_api = not args_dict.get("no_api", False)
if args_dict.get("all", False):
db = load_db()
cache = db.get("discovered_cache", [])
if not cache:
logger.info("No domains in discovered cache to update.")
return 0
logger.info(f"Updating all {len(cache)} domains in cache...")
for entry in cache:
domain = entry.get("name")
if domain:
logger.info(f"Enriching domain: {domain}")
enrich_domain(domain, use_api=use_api)
elif args_dict.get("file") is not None:
db_file = args.file
if not db_file.exists():
logger.error(f"File not found: {db_file}")
return 1
logger.info(f"Bulk enriching domains from {db_file}...")
domains_seen = set()
for record in stream_extract_from_file(db_file):
try:
parsed = urlparse(record.url)
domain = parsed.netloc or record.url
domain = domain.lower().strip()
if domain and domain not in domains_seen:
logger.info(f"Enriching domain: {domain}")
enrich_domain(domain, use_api=use_api)
domains_seen.add(domain)
except Exception as e:
logger.error(f"Error processing {record.url}: {e}")
elif args_dict.get("domain"):
logger.info(f"Enriching domain: {args.domain}")
enrich_domain(args.domain, use_api=use_api)
else:
logger.error("Please specify a domain, a --file, or use --all.")
return 1
elif args.db_command == "add":
logger.info(f"Adding user domain: {args.domain} ({args.category})")
add_user_domain(args.domain, args.category)
elif args.db_command == "remove":
logger.info(f"Removing user domain: {args.domain}")
remove_user_domain(args.domain)
return 0
def run_init(args: Namespace) -> int:
"""
Initializes the local database with the specified target and options.
Args:
args (Namespace): A Namespace object containing the initialization
parameters. Must include:
- target (str): The target path or name for the local database
initialization.
- force (bool): A flag indicating whether to force overwrite
any existing database.
Returns:
int: Status code where 0 indicates success and non-zero indicates failure.
"""
return _init_local_db(args.target, force=args.force)
def _record_to_dict(r: UrlRecord) -> dict[str, Any]:
"""
Converts a UrlRecord object into a dictionary representation.
This function takes a UrlRecord instance and extracts its relevant
attributes, converting them into a dictionary format for further use or
serialization. Attributes with default values or optional fields are
handled appropriately to ensure a consistent structure in the resulting
dictionary.
Args:
r (UrlRecord): The UrlRecord object to be converted into a dictionary.
Returns:
dict[str, Any]: A dictionary containing the extracted attributes from
the UrlRecord object.
"""
d: dict[str, Any] = {
"url": r.url,
"base_url": r.base_url or "",
"category": r.category or "",
"http_status": r.http_status if r.http_status is not None else None,
"redirected_url": r.redirected_url or "",
"error": r.error or "",
"human_check_suspected": bool(r.human_check_suspected),
"soft_404_detected": bool(r.soft_404_detected),
"trust_tier": r.trust_tier or "TIER_3_GENERAL",
}
if getattr(r, "explain", None):
d["explain"] = r.explain
return d
def write_csv(path: Path, records: List[UrlRecord]) -> None:
"""
Writes a list of URL records to a CSV file with specified field names.
This function creates the parent directory of the given file path if it
does not already exist, then writes the list of `UrlRecord` instances
into a CSV file. The CSV file includes predefined field names, and any
missing `http_status` values are replaced with empty strings before
writing to the file.
Args:
path (Path): The file path where the CSV file will be written.
records (List[UrlRecord]): A list of URL records to be written to the CSV file.
"""
path.parent.mkdir(parents=True, exist_ok=True)
fieldnames = [
"url",
"base_url",
"category",
"http_status",
"redirected_url",
"error",
"human_check_suspected",
"soft_404_detected",
"trust_tier",
]
with path.open("w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for r in records:
row = _record_to_dict(r)
if row["http_status"] is None:
row["http_status"] = ""
writer.writerow(row)
def write_jsonl(path: Path, records: List[UrlRecord]) -> None:
"""
Writes a list of UrlRecord objects to a file in JSONL (JSON Lines) format.
This function ensures that the directory structure for the specified file path
exists by creating any missing directories. Each UrlRecord object is converted
to a dictionary and written as a single line of JSON in the specified file.
Args:
path (Path): The file path where the JSONL data will be written.
records (List[UrlRecord]): A list of UrlRecord objects to serialize into
JSONL format.
"""
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as f:
for r in records:
obj = _record_to_dict(r)
f.write(json.dumps(obj, ensure_ascii=False) + "\n")
def main(argv: list[str] | None = None) -> int:
"""
Executes the main logic of the program. Parses command-line arguments, configures
logging behavior based on verbosity, and dispatches the requested subcommand
execution.
Args:
argv (list[str] | None): A list of command-line arguments passed to the
script. If None, sys.argv is used.
Returns:
int: The exit code of the program. Generally, 0 indicates successful
execution and non-zero values indicate an error state.
"""
parser = build_parser()
args = parser.parse_args(argv)
log_level = logging.DEBUG if getattr(args, "verbose", False) else logging.INFO
logging.basicConfig(
level=log_level,
format="%(levelname)s: %(message)s",
)
if args.command == "scan":
return run_check(args)
if args.command == "classify-url":
return run_classify_url(args)
if args.command == "classify":
return run_classify(args)
if args.command == "db":
return run_db(args)
if args.command == "init":
return run_init(args)
if args.command == "extract-https":
return run_extract_https(args)
parser.print_help()
return 1
def extract_https_cli() -> int:
"""
Console-script entry point for extracting HTTPS URLs.
This keeps argument parsing and logging behavior consistent with the
`extract-https` subcommand implemented in `main()`.
"""
import sys
return main(["extract-https", *sys.argv[1:]])