"""Stage3 tarball download and extraction.""" import hashlib import re from dataclasses import dataclass, field from pathlib import Path from urllib.request import urlopen, Request from urllib.error import URLError from .utils import info, success, error, fatal, run @dataclass class Stage3Config: """Configuration for stage3 download.""" init_system: str = "openrc" # or "systemd" mirrors: list[str] = field(default_factory=lambda: [ "https://mirrors.rit.edu/gentoo/", "https://distfiles.gentoo.org/", "https://gentoo.osuosl.org/", ]) mount_root: Path = field(default_factory=lambda: Path("/mnt/gentoo")) @property def variant(self) -> str: return f"stage3-amd64-{self.init_system}" def _fetch_url(url: str, timeout: int = 30) -> bytes: """Fetch URL content.""" request = Request(url, headers={"User-Agent": "install-installer/4.0"}) with urlopen(request, timeout=timeout) as response: return response.read() def _find_stage3_filename(mirror: str, config: Stage3Config) -> str | None: """Find current stage3 filename from mirror.""" base_url = f"{mirror}releases/amd64/autobuilds/current-{config.variant}/" latest_url = f"{base_url}latest-{config.variant}.txt" try: content = _fetch_url(latest_url).decode("utf-8") except URLError as e: error(f"Failed to fetch {latest_url}: {e}") return None # Parse PGP-signed content or direct listing pattern = rf"{config.variant}-\d{{8}}T\d{{6}}Z\.tar\.xz" match = re.search(pattern, content) if match: return match.group(0) return None def _verify_sha512(filepath: Path, expected: str) -> bool: """Verify SHA512 checksum of file.""" sha512 = hashlib.sha512() with open(filepath, "rb") as f: for chunk in iter(lambda: f.read(8192), b""): sha512.update(chunk) return sha512.hexdigest() == expected def _parse_digests(content: str, filename: str) -> str | None: """Extract SHA512 hash from DIGESTS file.""" lines = content.split("\n") for i, line in enumerate(lines): if "SHA512" in line and i + 1 < len(lines) and filename in lines[i + 1]: # Hash is on next line, first field hash_line = lines[i + 1].strip() return hash_line.split()[0] # Alternative format: hash followed by filename on same line for line in lines: if filename in line and len(line.split()) >= 2: parts = line.split() if len(parts[0]) == 128: # SHA512 is 128 hex chars return parts[0] return None def download_stage3(config: Stage3Config | None = None) -> Path: """Download and verify stage3 tarball.""" if config is None: config = Stage3Config() info(f"=== Downloading Stage3 ({config.init_system}) ===") filename = None working_mirror = None for mirror in config.mirrors: info(f"Trying mirror: {mirror}") filename = _find_stage3_filename(mirror, config) if filename: working_mirror = mirror success(f"Found: {filename}") break error(f"Mirror {mirror} failed, trying next...") if not filename or not working_mirror: fatal("Could not find stage3 on any mirror") base_url = f"{working_mirror}releases/amd64/autobuilds/current-{config.variant}/" tarball_url = f"{base_url}{filename}" digests_url = f"{base_url}{filename}.DIGESTS" target_dir = config.mount_root tarball_path = target_dir / filename digests_path = target_dir / f"{filename}.DIGESTS" # Download tarball info(f"Downloading {filename}...") run("wget", "--progress=bar:force", "-O", str(tarball_path), tarball_url) # Download digests info("Downloading DIGESTS...") run("wget", "-q", "-O", str(digests_path), digests_url) # Verify checksum info("Verifying SHA512 checksum...") digests_content = digests_path.read_text() expected_hash = _parse_digests(digests_content, filename) if not expected_hash: error("Could not parse checksum from DIGESTS file") error("Continuing without verification (manual check recommended)") else: if _verify_sha512(tarball_path, expected_hash): success("Checksum verified.") else: fatal("Checksum verification FAILED") # Cleanup digests file digests_path.unlink() return tarball_path def extract_stage3(tarball_path: Path, mount_root: Path | None = None) -> None: """Extract stage3 tarball.""" if mount_root is None: mount_root = Path("/mnt/gentoo") info(f"Extracting {tarball_path.name}...") run( "tar", "xpf", str(tarball_path), "--xattrs-include=*.*", "--numeric-owner", "--skip-old-files", # Resume capability - skip files that already exist "-C", str(mount_root), ) success("Stage3 extracted.") # Cleanup tarball tarball_path.unlink() success("Cleaned up tarball.") def fetch_stage3(config: Stage3Config | None = None) -> None: """Full stage3 workflow: download, verify, extract.""" if config is None: config = Stage3Config() # Check if already extracted (idempotency check) if (config.mount_root / "etc/portage").exists(): info("Stage3 already extracted (skipping)") return tarball = download_stage3(config) extract_stage3(tarball, config.mount_root) print() success("=== Stage3 installation complete ===")