180 lines
5.4 KiB
Python
180 lines
5.4 KiB
Python
"""Stage3 tarball download and extraction."""
|
|
|
|
import hashlib
|
|
import re
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from urllib.request import urlopen, Request
|
|
from urllib.error import URLError
|
|
|
|
from .utils import info, success, error, fatal, run
|
|
|
|
|
|
@dataclass
|
|
class Stage3Config:
|
|
"""Configuration for stage3 download."""
|
|
init_system: str = "openrc" # or "systemd"
|
|
mirrors: list[str] = field(default_factory=lambda: [
|
|
"https://mirrors.rit.edu/gentoo/",
|
|
"https://distfiles.gentoo.org/",
|
|
"https://gentoo.osuosl.org/",
|
|
])
|
|
mount_root: Path = field(default_factory=lambda: Path("/mnt/gentoo"))
|
|
|
|
@property
|
|
def variant(self) -> str:
|
|
return f"stage3-amd64-{self.init_system}"
|
|
|
|
|
|
def _fetch_url(url: str, timeout: int = 30) -> bytes:
|
|
"""Fetch URL content."""
|
|
request = Request(url, headers={"User-Agent": "install-installer/4.0"})
|
|
with urlopen(request, timeout=timeout) as response:
|
|
return response.read()
|
|
|
|
|
|
def _find_stage3_filename(mirror: str, config: Stage3Config) -> str | None:
|
|
"""Find current stage3 filename from mirror."""
|
|
base_url = f"{mirror}releases/amd64/autobuilds/current-{config.variant}/"
|
|
latest_url = f"{base_url}latest-{config.variant}.txt"
|
|
|
|
try:
|
|
content = _fetch_url(latest_url).decode("utf-8")
|
|
except URLError as e:
|
|
error(f"Failed to fetch {latest_url}: {e}")
|
|
return None
|
|
|
|
# Parse PGP-signed content or direct listing
|
|
pattern = rf"{config.variant}-\d{{8}}T\d{{6}}Z\.tar\.xz"
|
|
match = re.search(pattern, content)
|
|
|
|
if match:
|
|
return match.group(0)
|
|
|
|
return None
|
|
|
|
|
|
def _verify_sha512(filepath: Path, expected: str) -> bool:
|
|
"""Verify SHA512 checksum of file."""
|
|
sha512 = hashlib.sha512()
|
|
with open(filepath, "rb") as f:
|
|
for chunk in iter(lambda: f.read(8192), b""):
|
|
sha512.update(chunk)
|
|
return sha512.hexdigest() == expected
|
|
|
|
|
|
def _parse_digests(content: str, filename: str) -> str | None:
|
|
"""Extract SHA512 hash from DIGESTS file."""
|
|
lines = content.split("\n")
|
|
for i, line in enumerate(lines):
|
|
if "SHA512" in line and i + 1 < len(lines) and filename in lines[i + 1]:
|
|
# Hash is on next line, first field
|
|
hash_line = lines[i + 1].strip()
|
|
return hash_line.split()[0]
|
|
|
|
# Alternative format: hash followed by filename on same line
|
|
for line in lines:
|
|
if filename in line and len(line.split()) >= 2:
|
|
parts = line.split()
|
|
if len(parts[0]) == 128: # SHA512 is 128 hex chars
|
|
return parts[0]
|
|
|
|
return None
|
|
|
|
|
|
def download_stage3(config: Stage3Config | None = None) -> Path:
|
|
"""Download and verify stage3 tarball."""
|
|
if config is None:
|
|
config = Stage3Config()
|
|
|
|
info(f"=== Downloading Stage3 ({config.init_system}) ===")
|
|
|
|
filename = None
|
|
working_mirror = None
|
|
|
|
for mirror in config.mirrors:
|
|
info(f"Trying mirror: {mirror}")
|
|
filename = _find_stage3_filename(mirror, config)
|
|
if filename:
|
|
working_mirror = mirror
|
|
success(f"Found: {filename}")
|
|
break
|
|
error(f"Mirror {mirror} failed, trying next...")
|
|
|
|
if not filename or not working_mirror:
|
|
fatal("Could not find stage3 on any mirror")
|
|
|
|
base_url = f"{working_mirror}releases/amd64/autobuilds/current-{config.variant}/"
|
|
tarball_url = f"{base_url}{filename}"
|
|
digests_url = f"{base_url}{filename}.DIGESTS"
|
|
|
|
target_dir = config.mount_root
|
|
tarball_path = target_dir / filename
|
|
digests_path = target_dir / f"{filename}.DIGESTS"
|
|
|
|
# Download tarball
|
|
info(f"Downloading {filename}...")
|
|
run("wget", "--progress=bar:force", "-O", str(tarball_path), tarball_url)
|
|
|
|
# Download digests
|
|
info("Downloading DIGESTS...")
|
|
run("wget", "-q", "-O", str(digests_path), digests_url)
|
|
|
|
# Verify checksum
|
|
info("Verifying SHA512 checksum...")
|
|
digests_content = digests_path.read_text()
|
|
expected_hash = _parse_digests(digests_content, filename)
|
|
|
|
if not expected_hash:
|
|
error("Could not parse checksum from DIGESTS file")
|
|
error("Continuing without verification (manual check recommended)")
|
|
else:
|
|
if _verify_sha512(tarball_path, expected_hash):
|
|
success("Checksum verified.")
|
|
else:
|
|
fatal("Checksum verification FAILED")
|
|
|
|
# Cleanup digests file
|
|
digests_path.unlink()
|
|
|
|
return tarball_path
|
|
|
|
|
|
def extract_stage3(tarball_path: Path, mount_root: Path | None = None) -> None:
|
|
"""Extract stage3 tarball."""
|
|
if mount_root is None:
|
|
mount_root = Path("/mnt/gentoo")
|
|
|
|
info(f"Extracting {tarball_path.name}...")
|
|
|
|
run(
|
|
"tar", "xpf", str(tarball_path),
|
|
"--xattrs-include=*.*",
|
|
"--numeric-owner",
|
|
"--skip-old-files", # Resume capability - skip files that already exist
|
|
"-C", str(mount_root),
|
|
)
|
|
|
|
success("Stage3 extracted.")
|
|
|
|
# Cleanup tarball
|
|
tarball_path.unlink()
|
|
success("Cleaned up tarball.")
|
|
|
|
|
|
def fetch_stage3(config: Stage3Config | None = None) -> None:
|
|
"""Full stage3 workflow: download, verify, extract."""
|
|
if config is None:
|
|
config = Stage3Config()
|
|
|
|
# Check if already extracted (idempotency check)
|
|
if (config.mount_root / "etc/portage").exists():
|
|
info("Stage3 already extracted (skipping)")
|
|
return
|
|
|
|
tarball = download_stage3(config)
|
|
extract_stage3(tarball, config.mount_root)
|
|
|
|
print()
|
|
success("=== Stage3 installation complete ===")
|