2026-02-05 18:05:06 -05:00

385 lines
11 KiB
Python

"""Disk partitioning, LUKS encryption, and btrfs setup."""
import time
from dataclasses import dataclass, field
from pathlib import Path
from .utils import (
info,
success,
error,
fatal,
prompt,
confirm,
run,
run_quiet,
is_block_device,
is_mounted,
)
@dataclass
class DiskConfig:
"""Configuration for disk setup."""
efi_size_gb: int = 1
swap_size_gb: int = 24
luks_label: str = "legion"
btrfs_label: str = "legion"
mapper_name: str = "legion"
btrfs_opts: str = "noatime,compress=zstd"
mount_root: Path = field(default_factory=lambda: Path("/mnt/gentoo"))
@property
def mapper_path(self) -> Path:
return Path(f"/dev/mapper/{self.mapper_name}")
@dataclass
class DiskLayout:
"""Resolved disk layout after disk selection."""
device: Path
part_efi: Path
part_swap: Path
part_root: Path
@classmethod
def from_device(cls, device: Path) -> "DiskLayout":
"""Create layout with correct partition naming for device type."""
dev_str = str(device)
if "nvme" in dev_str or "mmcblk" in dev_str:
return cls(
device=device,
part_efi=Path(f"{dev_str}p1"),
part_swap=Path(f"{dev_str}p2"),
part_root=Path(f"{dev_str}p3"),
)
else:
return cls(
device=device,
part_efi=Path(f"{dev_str}1"),
part_swap=Path(f"{dev_str}2"),
part_root=Path(f"{dev_str}3"),
)
def _is_luks_device(device: Path) -> bool:
"""Check if a device is a LUKS container."""
result = run_quiet("cryptsetup", "isLuks", str(device), check=False)
return result.ok
def _is_luks_open(mapper_name: str) -> bool:
"""Check if a LUKS container is already open."""
return Path(f"/dev/mapper/{mapper_name}").exists()
def list_disks() -> list[dict]:
"""List available disks with metadata."""
result = run_quiet("lsblk", "-d", "-b", "-n", "-o", "NAME,SIZE,MODEL,TYPE")
disks = []
for line in result.stdout.strip().split("\n"):
parts = line.split(None, 3)
if len(parts) >= 2 and parts[-1] == "disk":
name = parts[0]
size_bytes = int(parts[1])
model = parts[2] if len(parts) > 2 else "Unknown"
disks.append({
"name": name,
"path": Path(f"/dev/{name}"),
"size_gb": size_bytes // (1024**3),
"model": model,
})
return disks
def select_disk() -> DiskLayout:
"""Interactive disk selection."""
info("Available disks:")
disks = list_disks()
for i, disk in enumerate(disks, 1):
print(f" {i}) {disk['path']} - {disk['size_gb']} GB ({disk['model']})")
print()
choice = prompt("Enter disk number: ")
try:
index = int(choice) - 1
if index < 0 or index >= len(disks):
raise ValueError()
except ValueError:
fatal("Invalid selection")
selected = disks[index]
layout = DiskLayout.from_device(selected["path"])
print()
error(f"WARNING: This will DESTROY ALL DATA on {layout.device}")
if not confirm("Proceed?"):
fatal("Aborted by user")
return layout
def wipe_disk(layout: DiskLayout, config: DiskConfig) -> None:
"""Wipe disk signatures and close any existing LUKS/mounts."""
info("Wiping disk signatures...")
# Unmount target
run("umount", "-R", str(config.mount_root), check=False)
# Close LUKS if open
run("cryptsetup", "close", config.mapper_name, check=False)
# Deactivate swap
run("swapoff", str(layout.part_swap), check=False)
# Wipe signatures
run("wipefs", "-af", str(layout.device), check=False)
# Zero first 10MB
run(
"dd", "if=/dev/zero", f"of={layout.device}",
"bs=1M", "count=10", "status=none",
check=False,
)
run("partprobe", str(layout.device), check=False)
run("sync")
success("Disk wiped.")
def create_partitions(layout: DiskLayout, config: DiskConfig) -> None:
"""Create GPT partition table with EFI, swap, and root partitions."""
info("Creating GPT partition table...")
device = str(layout.device)
# Clear and create GPT
run("sgdisk", "--clear", device)
run("sgdisk", "--set-alignment=2048", device)
# EFI partition
run(
"sgdisk",
"-n", f"1:0:+{config.efi_size_gb}G",
"-t", "1:EF00",
"-c", "1:EFI",
device,
)
# Swap partition
run(
"sgdisk",
"-n", f"2:0:+{config.swap_size_gb}G",
"-t", "2:8200",
"-c", "2:Swap",
device,
)
# Root partition (LUKS)
run(
"sgdisk",
"-n", "3:0:0",
"-t", "3:8309",
"-c", "3:LUKS",
device,
)
run("partprobe", device)
time.sleep(2)
# Verify partitions exist
for part in [layout.part_efi, layout.part_swap, layout.part_root]:
_wait_for_partition(part, layout.device)
success("Partitions created.")
run("sgdisk", "-p", device)
def _wait_for_partition(part: Path, device: Path, timeout: int = 10) -> None:
"""Wait for partition to appear."""
for _ in range(timeout):
if is_block_device(part):
return
time.sleep(1)
run_quiet("partprobe", str(device), check=False)
fatal(f"Partition {part} not found after {timeout}s")
def format_efi(layout: DiskLayout) -> None:
"""Format EFI partition as FAT32."""
info(f"Formatting EFI partition {layout.part_efi}...")
run("mkfs.vfat", "-F", "32", str(layout.part_efi))
success("EFI formatted.")
def setup_luks(layout: DiskLayout, config: DiskConfig) -> None:
"""Create and open LUKS2 container."""
info(f"Setting up LUKS2 encryption on {layout.part_root}...")
success(">>> Enter your LUKS passphrase (OnlyKey) <<<")
run(
"cryptsetup", "luksFormat",
"--type", "luks2",
"--label", config.luks_label,
str(layout.part_root),
)
info("Opening LUKS container...")
success(">>> Enter passphrase again to open <<<")
run("cryptsetup", "open", str(layout.part_root), config.mapper_name)
success(f"LUKS container opened at {config.mapper_path}")
def create_btrfs(config: DiskConfig) -> None:
"""Create btrfs filesystem on LUKS container."""
info("Creating btrfs filesystem...")
run("mkfs.btrfs", "-L", config.btrfs_label, str(config.mapper_path))
success(f"Btrfs created with label '{config.btrfs_label}'.")
def create_subvolumes(config: DiskConfig) -> None:
"""Create btrfs subvolumes."""
info("Creating btrfs subvolumes...")
mount_root = config.mount_root
mapper = str(config.mapper_path)
mount_root.mkdir(parents=True, exist_ok=True)
run("mount", mapper, str(mount_root))
subvolumes = ["@", "@home", "@var", "@log", "@snapshots"]
created = []
skipped = []
for subvol in subvolumes:
subvol_path = mount_root / subvol
if subvol_path.exists():
skipped.append(subvol)
else:
run("btrfs", "subvolume", "create", str(subvol_path))
created.append(subvol)
run("umount", str(mount_root))
if created:
success(f"Subvolumes created: {', '.join(created)}")
if skipped:
info(f"Subvolumes already exist: {', '.join(skipped)}")
def mount_filesystems(layout: DiskLayout, config: DiskConfig) -> None:
"""Mount all filesystems for installation."""
info("Mounting filesystems...")
mount_root = config.mount_root
mapper = str(config.mapper_path)
opts = config.btrfs_opts
# Check if already mounted
if is_mounted(mount_root):
info(f"{mount_root} already mounted")
run("findmnt", "-R", str(mount_root))
return
# Mount @ subvolume first
mount_root.mkdir(parents=True, exist_ok=True)
run("mount", "-o", f"{opts},subvol=@", mapper, str(mount_root))
# Create mount points
for subdir in ["home", "var", "var/log", ".snapshots", "boot"]:
(mount_root / subdir).mkdir(parents=True, exist_ok=True)
# Mount remaining subvolumes (skip if already mounted)
mounts = [
(f"{opts},subvol=@home", mount_root / "home"),
(f"{opts},subvol=@var", mount_root / "var"),
(f"{opts},subvol=@log", mount_root / "var/log"),
(f"{opts},subvol=@snapshots", mount_root / ".snapshots"),
]
for mount_opts, target in mounts:
if not is_mounted(target):
run("mount", "-o", mount_opts, mapper, str(target))
# Mount EFI
if not is_mounted(mount_root / "boot"):
run("mount", str(layout.part_efi), str(mount_root / "boot"))
success("All filesystems mounted.")
run("findmnt", "-R", str(mount_root))
def prepare_disk(config: DiskConfig | None = None) -> tuple[DiskLayout, DiskConfig]:
"""Full disk preparation workflow."""
if config is None:
config = DiskConfig()
print()
info("=== Gentoo Disk Preparation ===")
info(f"EFI: {config.efi_size_gb} GB")
info(f"Swap: {config.swap_size_gb} GB (encrypted at boot with random key)")
info("Root: Remaining space (LUKS2 + btrfs)")
print()
layout = select_disk()
# Check for existing LUKS container - offer to reuse instead of reformatting
if layout.part_root.exists() and _is_luks_device(layout.part_root):
print()
info("Existing LUKS container detected on root partition.")
response = prompt("Reformat disk (destroys data) or reuse existing? [reformat/reuse]: ").strip().lower()
if response == "reuse":
info("Reusing existing disk setup...")
# Open LUKS if not already open
if not _is_luks_open(config.mapper_name):
info("Opening existing LUKS container...")
success(">>> Enter LUKS passphrase <<<")
run("cryptsetup", "open", str(layout.part_root), config.mapper_name)
# Mount filesystems
mount_filesystems(layout, config)
_print_summary(layout, config)
return layout, config
elif response != "reformat":
fatal("Aborted - enter 'reformat' or 'reuse'")
# Full disk setup
wipe_disk(layout, config)
create_partitions(layout, config)
format_efi(layout)
setup_luks(layout, config)
create_btrfs(config)
create_subvolumes(config)
mount_filesystems(layout, config)
_print_summary(layout, config)
return layout, config
def _print_summary(layout: DiskLayout, config: DiskConfig) -> None:
"""Print disk preparation summary."""
print()
success("=== Disk preparation complete ===")
print()
info("Summary:")
print(f" EFI: {layout.part_efi} -> {config.mount_root}/boot")
print(f" Swap: {layout.part_swap} (encrypted at boot)")
print(f" LUKS: {layout.part_root} -> {config.mapper_path}")
print(f" Btrfs: {config.mapper_path}")
print()
info("Subvolumes:")
print(" @ -> /")
print(" @home -> /home")
print(" @var -> /var")
print(" @log -> /var/log")
print(" @snapshots -> /.snapshots")
print()