385 lines
11 KiB
Python
385 lines
11 KiB
Python
"""Disk partitioning, LUKS encryption, and btrfs setup."""
|
|
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
|
|
from .utils import (
|
|
info,
|
|
success,
|
|
error,
|
|
fatal,
|
|
prompt,
|
|
confirm,
|
|
run,
|
|
run_quiet,
|
|
is_block_device,
|
|
is_mounted,
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class DiskConfig:
|
|
"""Configuration for disk setup."""
|
|
efi_size_gb: int = 1
|
|
swap_size_gb: int = 24
|
|
luks_label: str = "legion"
|
|
btrfs_label: str = "legion"
|
|
mapper_name: str = "legion"
|
|
btrfs_opts: str = "noatime,compress=zstd"
|
|
mount_root: Path = field(default_factory=lambda: Path("/mnt/gentoo"))
|
|
|
|
@property
|
|
def mapper_path(self) -> Path:
|
|
return Path(f"/dev/mapper/{self.mapper_name}")
|
|
|
|
|
|
@dataclass
|
|
class DiskLayout:
|
|
"""Resolved disk layout after disk selection."""
|
|
device: Path
|
|
part_efi: Path
|
|
part_swap: Path
|
|
part_root: Path
|
|
|
|
@classmethod
|
|
def from_device(cls, device: Path) -> "DiskLayout":
|
|
"""Create layout with correct partition naming for device type."""
|
|
dev_str = str(device)
|
|
|
|
if "nvme" in dev_str or "mmcblk" in dev_str:
|
|
return cls(
|
|
device=device,
|
|
part_efi=Path(f"{dev_str}p1"),
|
|
part_swap=Path(f"{dev_str}p2"),
|
|
part_root=Path(f"{dev_str}p3"),
|
|
)
|
|
else:
|
|
return cls(
|
|
device=device,
|
|
part_efi=Path(f"{dev_str}1"),
|
|
part_swap=Path(f"{dev_str}2"),
|
|
part_root=Path(f"{dev_str}3"),
|
|
)
|
|
|
|
|
|
def _is_luks_device(device: Path) -> bool:
|
|
"""Check if a device is a LUKS container."""
|
|
result = run_quiet("cryptsetup", "isLuks", str(device), check=False)
|
|
return result.ok
|
|
|
|
|
|
def _is_luks_open(mapper_name: str) -> bool:
|
|
"""Check if a LUKS container is already open."""
|
|
return Path(f"/dev/mapper/{mapper_name}").exists()
|
|
|
|
|
|
def list_disks() -> list[dict]:
|
|
"""List available disks with metadata."""
|
|
result = run_quiet("lsblk", "-d", "-b", "-n", "-o", "NAME,SIZE,MODEL,TYPE")
|
|
disks = []
|
|
|
|
for line in result.stdout.strip().split("\n"):
|
|
parts = line.split(None, 3)
|
|
if len(parts) >= 2 and parts[-1] == "disk":
|
|
name = parts[0]
|
|
size_bytes = int(parts[1])
|
|
model = parts[2] if len(parts) > 2 else "Unknown"
|
|
disks.append({
|
|
"name": name,
|
|
"path": Path(f"/dev/{name}"),
|
|
"size_gb": size_bytes // (1024**3),
|
|
"model": model,
|
|
})
|
|
|
|
return disks
|
|
|
|
|
|
def select_disk() -> DiskLayout:
|
|
"""Interactive disk selection."""
|
|
info("Available disks:")
|
|
disks = list_disks()
|
|
|
|
for i, disk in enumerate(disks, 1):
|
|
print(f" {i}) {disk['path']} - {disk['size_gb']} GB ({disk['model']})")
|
|
|
|
print()
|
|
choice = prompt("Enter disk number: ")
|
|
|
|
try:
|
|
index = int(choice) - 1
|
|
if index < 0 or index >= len(disks):
|
|
raise ValueError()
|
|
except ValueError:
|
|
fatal("Invalid selection")
|
|
|
|
selected = disks[index]
|
|
layout = DiskLayout.from_device(selected["path"])
|
|
|
|
print()
|
|
error(f"WARNING: This will DESTROY ALL DATA on {layout.device}")
|
|
if not confirm("Proceed?"):
|
|
fatal("Aborted by user")
|
|
|
|
return layout
|
|
|
|
|
|
def wipe_disk(layout: DiskLayout, config: DiskConfig) -> None:
|
|
"""Wipe disk signatures and close any existing LUKS/mounts."""
|
|
info("Wiping disk signatures...")
|
|
|
|
# Unmount target
|
|
run("umount", "-R", str(config.mount_root), check=False)
|
|
|
|
# Close LUKS if open
|
|
run("cryptsetup", "close", config.mapper_name, check=False)
|
|
|
|
# Deactivate swap
|
|
run("swapoff", str(layout.part_swap), check=False)
|
|
|
|
# Wipe signatures
|
|
run("wipefs", "-af", str(layout.device), check=False)
|
|
|
|
# Zero first 10MB
|
|
run(
|
|
"dd", "if=/dev/zero", f"of={layout.device}",
|
|
"bs=1M", "count=10", "status=none",
|
|
check=False,
|
|
)
|
|
|
|
run("partprobe", str(layout.device), check=False)
|
|
run("sync")
|
|
|
|
success("Disk wiped.")
|
|
|
|
|
|
def create_partitions(layout: DiskLayout, config: DiskConfig) -> None:
|
|
"""Create GPT partition table with EFI, swap, and root partitions."""
|
|
info("Creating GPT partition table...")
|
|
|
|
device = str(layout.device)
|
|
|
|
# Clear and create GPT
|
|
run("sgdisk", "--clear", device)
|
|
run("sgdisk", "--set-alignment=2048", device)
|
|
|
|
# EFI partition
|
|
run(
|
|
"sgdisk",
|
|
"-n", f"1:0:+{config.efi_size_gb}G",
|
|
"-t", "1:EF00",
|
|
"-c", "1:EFI",
|
|
device,
|
|
)
|
|
|
|
# Swap partition
|
|
run(
|
|
"sgdisk",
|
|
"-n", f"2:0:+{config.swap_size_gb}G",
|
|
"-t", "2:8200",
|
|
"-c", "2:Swap",
|
|
device,
|
|
)
|
|
|
|
# Root partition (LUKS)
|
|
run(
|
|
"sgdisk",
|
|
"-n", "3:0:0",
|
|
"-t", "3:8309",
|
|
"-c", "3:LUKS",
|
|
device,
|
|
)
|
|
|
|
run("partprobe", device)
|
|
time.sleep(2)
|
|
|
|
# Verify partitions exist
|
|
for part in [layout.part_efi, layout.part_swap, layout.part_root]:
|
|
_wait_for_partition(part, layout.device)
|
|
|
|
success("Partitions created.")
|
|
run("sgdisk", "-p", device)
|
|
|
|
|
|
def _wait_for_partition(part: Path, device: Path, timeout: int = 10) -> None:
|
|
"""Wait for partition to appear."""
|
|
for _ in range(timeout):
|
|
if is_block_device(part):
|
|
return
|
|
time.sleep(1)
|
|
run_quiet("partprobe", str(device), check=False)
|
|
|
|
fatal(f"Partition {part} not found after {timeout}s")
|
|
|
|
|
|
def format_efi(layout: DiskLayout) -> None:
|
|
"""Format EFI partition as FAT32."""
|
|
info(f"Formatting EFI partition {layout.part_efi}...")
|
|
run("mkfs.vfat", "-F", "32", str(layout.part_efi))
|
|
success("EFI formatted.")
|
|
|
|
|
|
def setup_luks(layout: DiskLayout, config: DiskConfig) -> None:
|
|
"""Create and open LUKS2 container."""
|
|
info(f"Setting up LUKS2 encryption on {layout.part_root}...")
|
|
success(">>> Enter your LUKS passphrase (OnlyKey) <<<")
|
|
|
|
run(
|
|
"cryptsetup", "luksFormat",
|
|
"--type", "luks2",
|
|
"--label", config.luks_label,
|
|
str(layout.part_root),
|
|
)
|
|
|
|
info("Opening LUKS container...")
|
|
success(">>> Enter passphrase again to open <<<")
|
|
|
|
run("cryptsetup", "open", str(layout.part_root), config.mapper_name)
|
|
|
|
success(f"LUKS container opened at {config.mapper_path}")
|
|
|
|
|
|
def create_btrfs(config: DiskConfig) -> None:
|
|
"""Create btrfs filesystem on LUKS container."""
|
|
info("Creating btrfs filesystem...")
|
|
run("mkfs.btrfs", "-L", config.btrfs_label, str(config.mapper_path))
|
|
success(f"Btrfs created with label '{config.btrfs_label}'.")
|
|
|
|
|
|
def create_subvolumes(config: DiskConfig) -> None:
|
|
"""Create btrfs subvolumes."""
|
|
info("Creating btrfs subvolumes...")
|
|
|
|
mount_root = config.mount_root
|
|
mapper = str(config.mapper_path)
|
|
|
|
mount_root.mkdir(parents=True, exist_ok=True)
|
|
run("mount", mapper, str(mount_root))
|
|
|
|
subvolumes = ["@", "@home", "@var", "@log", "@snapshots"]
|
|
created = []
|
|
skipped = []
|
|
|
|
for subvol in subvolumes:
|
|
subvol_path = mount_root / subvol
|
|
if subvol_path.exists():
|
|
skipped.append(subvol)
|
|
else:
|
|
run("btrfs", "subvolume", "create", str(subvol_path))
|
|
created.append(subvol)
|
|
|
|
run("umount", str(mount_root))
|
|
|
|
if created:
|
|
success(f"Subvolumes created: {', '.join(created)}")
|
|
if skipped:
|
|
info(f"Subvolumes already exist: {', '.join(skipped)}")
|
|
|
|
|
|
def mount_filesystems(layout: DiskLayout, config: DiskConfig) -> None:
|
|
"""Mount all filesystems for installation."""
|
|
info("Mounting filesystems...")
|
|
|
|
mount_root = config.mount_root
|
|
mapper = str(config.mapper_path)
|
|
opts = config.btrfs_opts
|
|
|
|
# Check if already mounted
|
|
if is_mounted(mount_root):
|
|
info(f"{mount_root} already mounted")
|
|
run("findmnt", "-R", str(mount_root))
|
|
return
|
|
|
|
# Mount @ subvolume first
|
|
mount_root.mkdir(parents=True, exist_ok=True)
|
|
run("mount", "-o", f"{opts},subvol=@", mapper, str(mount_root))
|
|
|
|
# Create mount points
|
|
for subdir in ["home", "var", "var/log", ".snapshots", "boot"]:
|
|
(mount_root / subdir).mkdir(parents=True, exist_ok=True)
|
|
|
|
# Mount remaining subvolumes (skip if already mounted)
|
|
mounts = [
|
|
(f"{opts},subvol=@home", mount_root / "home"),
|
|
(f"{opts},subvol=@var", mount_root / "var"),
|
|
(f"{opts},subvol=@log", mount_root / "var/log"),
|
|
(f"{opts},subvol=@snapshots", mount_root / ".snapshots"),
|
|
]
|
|
for mount_opts, target in mounts:
|
|
if not is_mounted(target):
|
|
run("mount", "-o", mount_opts, mapper, str(target))
|
|
|
|
# Mount EFI
|
|
if not is_mounted(mount_root / "boot"):
|
|
run("mount", str(layout.part_efi), str(mount_root / "boot"))
|
|
|
|
success("All filesystems mounted.")
|
|
run("findmnt", "-R", str(mount_root))
|
|
|
|
|
|
def prepare_disk(config: DiskConfig | None = None) -> tuple[DiskLayout, DiskConfig]:
|
|
"""Full disk preparation workflow."""
|
|
if config is None:
|
|
config = DiskConfig()
|
|
|
|
print()
|
|
info("=== Gentoo Disk Preparation ===")
|
|
info(f"EFI: {config.efi_size_gb} GB")
|
|
info(f"Swap: {config.swap_size_gb} GB (encrypted at boot with random key)")
|
|
info("Root: Remaining space (LUKS2 + btrfs)")
|
|
print()
|
|
|
|
layout = select_disk()
|
|
|
|
# Check for existing LUKS container - offer to reuse instead of reformatting
|
|
if layout.part_root.exists() and _is_luks_device(layout.part_root):
|
|
print()
|
|
info("Existing LUKS container detected on root partition.")
|
|
response = prompt("Reformat disk (destroys data) or reuse existing? [reformat/reuse]: ").strip().lower()
|
|
|
|
if response == "reuse":
|
|
info("Reusing existing disk setup...")
|
|
# Open LUKS if not already open
|
|
if not _is_luks_open(config.mapper_name):
|
|
info("Opening existing LUKS container...")
|
|
success(">>> Enter LUKS passphrase <<<")
|
|
run("cryptsetup", "open", str(layout.part_root), config.mapper_name)
|
|
# Mount filesystems
|
|
mount_filesystems(layout, config)
|
|
_print_summary(layout, config)
|
|
return layout, config
|
|
elif response != "reformat":
|
|
fatal("Aborted - enter 'reformat' or 'reuse'")
|
|
|
|
# Full disk setup
|
|
wipe_disk(layout, config)
|
|
create_partitions(layout, config)
|
|
format_efi(layout)
|
|
setup_luks(layout, config)
|
|
create_btrfs(config)
|
|
create_subvolumes(config)
|
|
mount_filesystems(layout, config)
|
|
|
|
_print_summary(layout, config)
|
|
return layout, config
|
|
|
|
|
|
def _print_summary(layout: DiskLayout, config: DiskConfig) -> None:
|
|
"""Print disk preparation summary."""
|
|
print()
|
|
success("=== Disk preparation complete ===")
|
|
print()
|
|
info("Summary:")
|
|
print(f" EFI: {layout.part_efi} -> {config.mount_root}/boot")
|
|
print(f" Swap: {layout.part_swap} (encrypted at boot)")
|
|
print(f" LUKS: {layout.part_root} -> {config.mapper_path}")
|
|
print(f" Btrfs: {config.mapper_path}")
|
|
print()
|
|
info("Subvolumes:")
|
|
print(" @ -> /")
|
|
print(" @home -> /home")
|
|
print(" @var -> /var")
|
|
print(" @log -> /var/log")
|
|
print(" @snapshots -> /.snapshots")
|
|
print()
|