Browse Source

add tests for postgresql

main
Nicolas Massé 2 months ago
parent
commit
9bdd00b193
  1. 2
      .gitignore
  2. 7
      README.md
  3. 235
      TESTS.md
  4. 7
      common.mk
  5. 23
      conftest.py
  6. 2
      postgresql/postgresql-backup.container
  7. 2
      postgresql/postgresql-init.container
  8. 13
      postgresql/postgresql-pgautoupgrade.image
  9. 2
      postgresql/postgresql-server.container
  10. 2
      postgresql/postgresql-upgrade.container
  11. 13
      postgresql/postgresql.image
  12. 0
      postgresql/tests/__init__.py
  13. 167
      postgresql/tests/conftest.py
  14. 39
      postgresql/tests/helpers.py
  15. 119
      postgresql/tests/test_backup.py
  16. 149
      postgresql/tests/test_install.py
  17. 154
      postgresql/tests/test_recovery.py
  18. 163
      postgresql/tests/test_upgrade.py
  19. 24
      pyproject.toml
  20. 0
      tests/__init__.py
  21. 384
      tests/vm.py

2
.gitignore

@ -3,3 +3,5 @@
!fcos.bu !fcos.bu
!overlay.bu !overlay.bu
*/butane.blocklist */butane.blocklist
__pycache__/
.pytest_cache/

7
README.md

@ -53,6 +53,13 @@ This repository gathers all the recipes (hence the name "Cookbook") to deploy Op
- Fedora / CentOS Stream / RHEL or derivative operating system. - Fedora / CentOS Stream / RHEL or derivative operating system.
- Systemd - Systemd
## End-to-end testing
```
pip install -e .
pytest postgresql/tests/
```
## Development ## Development
To develop Podman Quadlets, it is advised to create a Fedora Virtual Machine dedicated to this task. To develop Podman Quadlets, it is advised to create a Fedora Virtual Machine dedicated to this task.

235
TESTS.md

@ -0,0 +1,235 @@
# Testing Guide
This project uses **pytest** with the **pytest-testinfra** plugin to run
end-to-end integration tests against real Fedora CoreOS virtual machines.
## Dependencies
Declared in `pyproject.toml`:
| Package | Purpose |
|---------|---------|
| `pytest>=8.0` | Test runner and framework |
| `pytest-testinfra>=10.1` | Infrastructure testing (services, files, sockets, ...) |
| `paramiko>=3.4` | SSH transport used by testinfra |
## Core pytest concepts
### Test discovery
pytest automatically finds tests by scanning for files named `test_*.py` and
collecting functions named `test_*` inside them. No registration is needed.
The `pyproject.toml` configuration:
```toml
[tool.pytest.ini_options]
log_cli = true
log_cli_level = "INFO"
addopts = "-v"
```
No `testpaths` is set, so pytest discovers tests in all sub-directories.
To run a specific cookbook's tests:
```bash
pytest postgresql/tests/
```
### Fixtures
A **fixture** is a function decorated with `@pytest.fixture` that prepares a
resource for a test. Fixtures are injected by naming them as test function
parameters:
```python
@pytest.fixture(scope="module")
def pg_host(...):
return testinfra.get_host(f"ssh://root@{vm.ip}", ...)
def test_port_listening(pg_host): # ← pg_host is injected automatically
assert pg_host.socket("tcp://127.0.0.1:5432").is_listening
```
pytest resolves the full dependency graph: if fixture A depends on fixture B,
B is created first.
### Fixture scopes
The `scope` parameter controls how long a fixture lives:
| Scope | Lifetime |
|-------|----------|
| `"function"` (default) | Recreated for every single test |
| `"module"` | One instance per `.py` file |
| `"session"` | One instance for the entire pytest run |
In this project:
- `test_ssh_key` / `test_ssh_pubkey` are **session-scoped** — a single SSH
key pair is generated once and shared across all tests.
- `postgresql_vm` / `pg_host` are **module-scoped** — each test file gets its
own VM that is destroyed after the last test in that file.
### `yield` fixtures (setup + teardown)
When a fixture uses `yield`, the code before `yield` is setup and the code
after is teardown. Teardown always runs, even if a test fails.
```python
@pytest.fixture(scope="module")
def postgresql_vm(...):
vm = FCOSVirtualMachine(...)
vm.create() # ← setup
vm.wait_ssh(...)
yield vm # ← value passed to the test
vm.destroy() # ← teardown (always runs)
```
### `conftest.py` — shared fixtures
`conftest.py` files are loaded automatically by pytest. Every fixture defined
in a `conftest.py` is available to all tests in the same directory and its
sub-directories.
This project has two:
| File | Scope | Contents |
|------|-------|----------|
| `conftest.py` (root) | Global | SSH key pair generation |
| `postgresql/tests/conftest.py` | PostgreSQL tests | VM creation, testinfra host, upgrade VM |
## pytest-testinfra
**testinfra** is a pytest plugin that provides a high-level Python API to
inspect the state of a remote server over SSH. A connection is established
via `testinfra.get_host()` and the resulting object exposes modules to
inspect:
| Module | Example | What it checks |
|--------|---------|----------------|
| `service` | `host.service("postgresql.target").is_running` | systemd unit state |
| `socket` | `host.socket("tcp://127.0.0.1:5432").is_listening` | open ports |
| `file` | `host.file("/etc/config").exists` | file existence, permissions, ownership |
| `mount_point` | `host.mount_point("/data").filesystem` | mounted filesystems |
| `run` | `host.run("systemctl is-active ...")` | arbitrary commands (returns `.stdout`, `.rc`) |
## Project test architecture
```
conftest.py (root) → SSH key pair (session-scoped)
tests/
└── vm.py → FCOSVirtualMachine class (create/destroy/ssh)
postgresql/tests/
├── conftest.py → VM + pg_host fixtures (module-scoped)
├── helpers.py → constants (PG_MAJOR_DEFAULT, credentials) + run_sql()
├── test_install.py → fresh install: services, ports, filesystem, connectivity
├── test_backup.py → trigger backup, verify artefacts, retention policy
├── test_recovery.py → restore from backup
└── test_upgrade.py → major version upgrade (uses a separate VM)
```
`FCOSVirtualMachine` (in `tests/vm.py`) is a plain Python class — not a
fixture. It manages the full lifecycle of a KVM virtual machine: QCOW2 disk
creation, `virt-install`, SSH readiness polling, remote command execution via
SSH, and `virsh destroy` cleanup. Fixtures in `conftest.py` wrap this class.
## Test execution flow
Taking `test_postgresql_port_listening` as an example:
1. pytest discovers `test_install.py`.
2. It sees `test_postgresql_port_listening(pg_host)` and resolves the fixture
chain: `pg_host``postgresql_vm` + `test_ssh_key`.
3. `test_ssh_key` (session-scoped) generates an Ed25519 key pair — once for
the entire run.
4. `postgresql_vm` (module-scoped):
- Compiles the Fedora CoreOS ignition via `make butane`.
- Creates a KVM VM with `virt-install`.
- Polls until SSH is reachable.
- Waits for `postgresql.target` to become active.
5. `pg_host` connects testinfra to the VM via SSH.
6. The test runs: `pg_host.socket("tcp://127.0.0.1:5432").is_listening`.
7. After **all** tests in the module complete, `vm.destroy()` tears down the
VM.
## Test ordering
### Module (file) order
Modules are executed in **alphabetical order** by path:
1. `test_backup.py`
2. `test_install.py`
3. `test_recovery.py`
4. `test_upgrade.py`
Since each module gets its own VM (module-scoped fixtures), there are **no
dependencies between modules**.
### Test (function) order within a module
Within a file, tests run in **source order** (top to bottom). This is
pytest's default behavior — no plugin needed.
This matters when tests have side effects. For example in `test_backup.py`:
1. `test_trigger_backup` — triggers the backup service.
2. `test_backup_completes_successfully` — waits for the service to finish.
3. `test_backup_directory_exists_in_virtiofs` — checks files created by step 1.
4. ...and so on.
Later tests depend on artefacts created by earlier ones. The ordering relies
on the declaration order in the source file.
## Pausing tests for manual inspection
### `breakpoint()` + `--pdb`
Add `breakpoint()` at any point in a test. Run with `--pdb` and `-x` (stop
at first failure):
```bash
pytest postgresql/tests/test_install.py --pdb -x
```
`--pdb` drops into the Python debugger on failure. `breakpoint()` drops into
it unconditionally. Type `c` to continue.
### `input()` + `-s`
The simplest approach — add a manual pause:
```python
def test_postgresql_port_listening(pg_host):
assert pg_host.socket("tcp://127.0.0.1:5432").is_listening
input("VM is running. Press Enter to continue.")
```
Run with `-s` so pytest does not capture stdin/stdout:
```bash
pytest postgresql/tests/test_install.py -s -k test_postgresql_port_listening
```
### Scope-aware pausing
The VM is destroyed after the **last** test in a module. If you pause in the
last test, the VM will be destroyed as soon as you resume. To inspect after
all tests, add a sentinel test at the end of the file:
```python
def test_zz_pause_for_inspection(postgresql_vm, test_ssh_key):
print(f"\nVM accessible: ssh -i {test_ssh_key} root@{postgresql_vm.ip}")
input("Inspecting... Press Enter to destroy the VM.")
```
### `-k` to target a specific test
Combine with any of the above to skip unrelated tests:
```bash
pytest postgresql/tests/test_install.py -s -k test_postgresql_port_listening
```

7
common.mk

@ -60,12 +60,13 @@ endif
PROJECT_NAME := $(shell basename "$${PWD}") PROJECT_NAME := $(shell basename "$${PWD}")
# Quadlets files and their corresponding systemd unit names # Quadlets files and their corresponding systemd unit names
QUADLETS_FILES = $(wildcard *.container *.volume *.network *.pod *.build) QUADLETS_FILES = $(wildcard *.container *.volume *.network *.pod *.build *.image)
QUADLET_UNIT_NAMES := $(patsubst %.container, %.service, $(wildcard *.container)) \ QUADLET_UNIT_NAMES := $(patsubst %.container, %.service, $(wildcard *.container)) \
$(patsubst %.volume, %-volume.service, $(wildcard *.volume)) \ $(patsubst %.volume, %-volume.service, $(wildcard *.volume)) \
$(patsubst %.network, %-network.service, $(wildcard *.network)) \ $(patsubst %.network, %-network.service, $(wildcard *.network)) \
$(patsubst %.pod, %-pod.service, $(wildcard *.pod)) \ $(patsubst %.pod, %-pod.service, $(wildcard *.pod)) \
$(patsubst %.build, %-build.service, $(wildcard *.build)) $(patsubst %.build, %-build.service, $(wildcard *.build)) \
$(patsubst %.image, %-image.service, $(wildcard *.image))
# Wellknown systemd unit file types # Wellknown systemd unit file types
SYSTEMD_FILES = $(wildcard *.service *.target *.timer *.mount) SYSTEMD_FILES = $(wildcard *.service *.target *.timer *.mount)
@ -133,7 +134,7 @@ pre-requisites::
exit 1; \ exit 1; \
fi fi
@set -Eeuo pipefail; \ @set -Eeuo pipefail; \
for tool in install systemctl systemd-analyze systemd-tmpfiles sysctl virt-install virsh qemu-img journalctl coreos-installer resize butane yq podlet; do \ for tool in install systemctl systemd-analyze systemd-tmpfiles sysctl virt-install virsh qemu-img journalctl coreos-installer resize butane yq podlet pip3; do \
if ! which $$tool &>/dev/null ; then \ if ! which $$tool &>/dev/null ; then \
echo "$$tool is not installed. Please install it first." >&2; \ echo "$$tool is not installed. Please install it first." >&2; \
exit 1; \ exit 1; \

23
conftest.py

@ -0,0 +1,23 @@
import subprocess
from pathlib import Path
import pytest
@pytest.fixture(scope="session")
def test_ssh_key(tmp_path_factory: pytest.TempPathFactory) -> Path:
"""Generate a temporary SSH key pair (no passphrase) for VM access."""
key_dir = tmp_path_factory.mktemp("ssh-key")
key_path = key_dir / "id_ed25519"
subprocess.run(
["ssh-keygen", "-t", "ed25519", "-N", "", "-f", str(key_path)],
check=True,
capture_output=True,
)
return key_path
@pytest.fixture(scope="session")
def test_ssh_pubkey(test_ssh_key: Path) -> str:
"""Public key string corresponding to test_ssh_key."""
return test_ssh_key.with_suffix(".pub").read_text().strip()

2
postgresql/postgresql-backup.container

@ -9,7 +9,7 @@ PartOf=postgresql.target
[Container] [Container]
ContainerName=postgresql-backup-job ContainerName=postgresql-backup-job
Image=docker.io/library/postgres:${PG_MAJOR}-alpine Image=postgresql.image
# Network configuration # Network configuration
Network=host Network=host

2
postgresql/postgresql-init.container

@ -15,7 +15,7 @@ PartOf=postgresql.target
[Container] [Container]
ContainerName=postgresql-init-job ContainerName=postgresql-init-job
Image=docker.io/library/postgres:${PG_MAJOR}-alpine Image=postgresql.image
# Network configuration # Network configuration
Network=host Network=host

13
postgresql/postgresql-pgautoupgrade.image

@ -0,0 +1,13 @@
[Unit]
Description=podman pull docker.io/pgautoupgrade/pgautoupgrade
Documentation=https://hub.docker.com/_/postgres/
# Only start if PostgreSQL has been configured
ConditionPathExists=/etc/quadlets/postgresql/config.env
[Image]
Image=docker.io/pgautoupgrade/pgautoupgrade:${PG_MAJOR}-alpine
[Service]
# These environment variables are sourced to be used by systemd in the Exec* commands
EnvironmentFile=/etc/quadlets/postgresql/config.env

2
postgresql/postgresql-server.container

@ -17,7 +17,7 @@ PartOf=postgresql.target
[Container] [Container]
ContainerName=postgresql-server ContainerName=postgresql-server
Image=docker.io/library/postgres:${PG_MAJOR}-alpine Image=postgresql.image
AutoUpdate=registry AutoUpdate=registry
# Network configuration # Network configuration

2
postgresql/postgresql-upgrade.container

@ -17,7 +17,7 @@ PartOf=postgresql.target
[Container] [Container]
ContainerName=postgresql-upgrade-to-${PG_MAJOR}-job ContainerName=postgresql-upgrade-to-${PG_MAJOR}-job
Image=docker.io/pgautoupgrade/pgautoupgrade:${PG_MAJOR}-alpine Image=postgresql-pgautoupgrade.image
# Network configuration # Network configuration
Network=host Network=host

13
postgresql/postgresql.image

@ -0,0 +1,13 @@
[Unit]
Description=podman pull docker.io/pgautoupgrade/pgautoupgrade
Documentation=https://hub.docker.com/r/pgautoupgrade/pgautoupgrade
# Only start if PostgreSQL has been configured
ConditionPathExists=/etc/quadlets/postgresql/config.env
[Image]
Image=docker.io/library/postgres:${PG_MAJOR}-alpine
[Service]
# These environment variables are sourced to be used by systemd in the Exec* commands
EnvironmentFile=/etc/quadlets/postgresql/config.env

0
postgresql/tests/__init__.py

167
postgresql/tests/conftest.py

@ -0,0 +1,167 @@
"""Pytest fixtures for the PostgreSQL cookbook end-to-end tests.
Prerequisites:
- Must run as root (KVM/libvirt access).
- The Fedora CoreOS base QCOW2 image must be present at
/var/lib/libvirt/images/library/fedora-coreos.qcow2.
Run ``coreos-installer download -p qemu -f qcow2.xz -d
-C /var/lib/libvirt/images/library/`` to fetch it.
- fcos.ign for the postgresql cookbook is built on demand by
``make -C postgresql butane`` if it is missing. This requires
local.bu (SSH keys, user setup) to be present at the repository root.
"""
import os
import shutil
import subprocess
import sys
from pathlib import Path
import pytest
import testinfra
REPO_ROOT = Path(__file__).parent.parent.parent
POSTGRESQL_DIR = REPO_ROOT / "postgresql"
# Add directories to the path so we can import local helpers and shared vm.py.
sys.path.insert(0, str(Path(__file__).parent))
sys.path.insert(0, str(REPO_ROOT / "tests"))
from vm import FCOSVirtualMachine, build_test_ignition, ensure_fcos_ign # noqa: E402
from helpers import (
PG_DB,
PG_MAJOR_DEFAULT,
PG_MAJOR_UPGRADE_FROM,
PG_MAJOR_UPGRADE_TO,
PG_PASSWORD,
PG_USER,
run_sql,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _default_config_env(pg_major: str) -> dict[str, str]:
"""Return the full default config.env content as a dict for the given PG major."""
return {
"PG_MAJOR": pg_major,
"POSTGRES_USER": PG_USER,
"POSTGRES_PASSWORD": PG_PASSWORD,
"POSTGRES_DB": PG_DB,
"POSTGRES_HOST_AUTH_METHOD": "scram-sha-256",
"POSTGRES_INITDB_ARGS": "--auth-host=scram-sha-256",
"POSTGRES_ARGS": "-h 127.0.0.1",
"PGPORT": "5432",
"POSTGRES_BACKUP_RETENTION": "7",
}
# ---------------------------------------------------------------------------
# Shared fixtures (module-scoped → one VM per test module)
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
def virtiofs_dir() -> Path:
"""Unique VirtioFS host directory for the default test VM."""
d = Path("/srv") / f"fcos-test-postgresql-{os.getpid()}"
d.mkdir(parents=True, exist_ok=True)
yield d
if d.exists():
shutil.rmtree(d)
@pytest.fixture(scope="module")
def postgresql_vm(
test_ssh_key: Path,
test_ssh_pubkey: str,
virtiofs_dir: Path,
tmp_path_factory: pytest.TempPathFactory,
) -> FCOSVirtualMachine:
"""Running CoreOS VM with PostgreSQL installed at the default PG version.
The VM is created once per test module and destroyed in teardown.
All tests in the same module share this VM instance.
"""
fcos_ign = ensure_fcos_ign(POSTGRESQL_DIR)
test_ign = tmp_path_factory.mktemp("ign") / "fcos-test.ign"
build_test_ignition(
base_ignition=fcos_ign,
ssh_pubkey=test_ssh_pubkey,
output=test_ign,
)
vm = FCOSVirtualMachine(
name=f"postgresql-{os.getpid()}",
ignition_file=test_ign,
virtiofs_dir=virtiofs_dir,
)
vm.create()
vm.wait_ssh(ssh_key=test_ssh_key, timeout=300)
vm.wait_for_service("postgresql.target", ssh_key=test_ssh_key, timeout=300)
yield vm
vm.destroy()
@pytest.fixture(scope="module")
def pg_host(postgresql_vm: FCOSVirtualMachine, test_ssh_key: Path):
"""testinfra SSH host connected to the default PostgreSQL VM."""
return testinfra.get_host(
f"ssh://root@{postgresql_vm.ip}",
ssh_extra_args=(
f"-i {test_ssh_key}"
" -o StrictHostKeyChecking=no"
" -o UserKnownHostsFile=/dev/null"
),
)
@pytest.fixture(scope="module")
def upgrade_virtiofs_dir() -> Path:
"""Unique VirtioFS host directory for the upgrade test VM."""
d = Path("/srv") / f"fcos-test-pg-upgrade-{os.getpid()}"
d.mkdir(parents=True, exist_ok=True)
yield d
if d.exists():
shutil.rmtree(d)
@pytest.fixture(scope="module")
def upgrade_vm(
test_ssh_key: Path,
test_ssh_pubkey: str,
upgrade_virtiofs_dir: Path,
tmp_path_factory: pytest.TempPathFactory,
) -> FCOSVirtualMachine:
"""Running CoreOS VM with PostgreSQL installed at PG_MAJOR_UPGRADE_FROM.
Used exclusively by test_upgrade.py to verify the major version upgrade path.
The config.env is overridden via the ignition overlay so the VM boots
directly with PG_MAJOR_UPGRADE_FROM, regardless of the cookbook's default.
"""
fcos_ign = ensure_fcos_ign(POSTGRESQL_DIR)
test_ign = tmp_path_factory.mktemp("ign-upgrade") / "fcos-upgrade.ign"
build_test_ignition(
base_ignition=fcos_ign,
ssh_pubkey=test_ssh_pubkey,
output=test_ign,
config_env_overrides=_default_config_env(PG_MAJOR_UPGRADE_FROM),
)
vm = FCOSVirtualMachine(
name=f"pg-upgrade-{os.getpid()}",
ignition_file=test_ign,
virtiofs_dir=upgrade_virtiofs_dir,
)
vm.create()
vm.wait_ssh(ssh_key=test_ssh_key, timeout=300)
vm.wait_for_service("postgresql.target", ssh_key=test_ssh_key, timeout=300)
yield vm
vm.destroy()

39
postgresql/tests/helpers.py

@ -0,0 +1,39 @@
"""Shared constants and helper functions for PostgreSQL integration tests.
These are extracted from conftest.py so that test modules can import them
without conflicting with pytest's conftest discovery mechanism.
"""
from pathlib import Path
# Default version shipped in the example config.env.
PG_MAJOR_DEFAULT = "14"
# Version to start from in the major-upgrade scenario.
PG_MAJOR_UPGRADE_FROM = "14"
# Version to upgrade to in the major-upgrade scenario.
PG_MAJOR_UPGRADE_TO = "17"
# Default credentials from config/examples/config.env.
PG_USER = "postgres"
PG_PASSWORD = "postgres"
PG_DB = "postgres"
def run_sql(vm, ssh_key: Path, sql: str) -> str:
"""Execute *sql* via ``podman exec`` on the running postgresql-server container.
Uses the Unix socket at /var/run/postgresql inside the container (mapped
from /run/quadlets/postgresql on the host). The pg_hba.conf generated by
the official postgres image grants trust access on local sockets, so no
password is required.
Returns:
Stripped stdout of the psql command.
"""
result = vm.ssh_run(
f"podman exec postgresql-server psql -U {PG_USER} -t -c \"{sql}\"",
ssh_key,
)
return result.stdout.strip()

119
postgresql/tests/test_backup.py

@ -0,0 +1,119 @@
"""Test PostgreSQL backup creation and VirtioFS storage.
These tests verify that:
- The backup oneshot service can be triggered manually and runs to completion.
- The expected backup artefacts land in the VirtioFS share (accessible from
the test runner's host filesystem without SSH).
- The backup retention policy removes stale backups.
Note: tests within a module share a single VM (module-scoped fixture), so
the order of test execution matters here: the backup files checked in later
tests are created by the earlier trigger test.
"""
import time
from pathlib import Path
# ---------------------------------------------------------------------------
# Trigger and completion
# ---------------------------------------------------------------------------
def test_create_database_and_table(postgresql_vm, test_ssh_key):
"""Create a test database and table with some data to ensure the backup has
something to capture."""
postgresql_vm.ssh_run(
"podman exec postgresql-server psql -U postgres -c \"CREATE DATABASE test;\"",
test_ssh_key,
)
postgresql_vm.ssh_run(
"podman exec postgresql-server psql -U postgres -d test -c \"CREATE TABLE witness (id SERIAL PRIMARY KEY, version VARCHAR); INSERT INTO witness (version) SELECT version();\"",
test_ssh_key,
)
def test_trigger_backup(postgresql_vm, test_ssh_key):
"""Starting postgresql-backup.service must succeed (no immediate error)."""
postgresql_vm.ssh_run(
"systemctl start postgresql-backup.service",
test_ssh_key,
)
def test_backup_completes_successfully(postgresql_vm, test_ssh_key):
"""postgresql-backup.service must finish in ``inactive`` state (not ``failed``)."""
state = postgresql_vm.wait_for_unit_done(
"postgresql-backup.service", test_ssh_key, timeout=120
)
assert state == "inactive", (
f"Backup service ended in unexpected state {state!r}. "
"Run: systemctl status postgresql-backup.service --no-pager"
)
# ---------------------------------------------------------------------------
# VirtioFS artefacts (verified from the host — no SSH required)
# ---------------------------------------------------------------------------
def test_backup_directory_exists_in_virtiofs(virtiofs_dir: Path):
"""The postgresql/backup sub-directory must exist in the VirtioFS share."""
backup_root = virtiofs_dir / "postgresql" / "backup"
assert backup_root.is_dir(), f"Backup directory not found on host: {backup_root}"
def test_at_least_one_backup_present(virtiofs_dir: Path):
"""At least one timestamped backup sub-directory must exist."""
backup_root = virtiofs_dir / "postgresql" / "backup"
backups = sorted(backup_root.iterdir())
assert backups, f"No backup sub-directories found under {backup_root}"
def test_backup_manifest_present(virtiofs_dir: Path):
"""The latest backup must contain a ``backup_manifest`` file (pg_basebackup)."""
backup_root = virtiofs_dir / "postgresql" / "backup"
latest = sorted(backup_root.iterdir())[-1]
assert (latest / "backup_manifest").exists(), (
f"backup_manifest missing in {latest}"
)
def test_backup_base_tar_present(virtiofs_dir: Path):
"""The latest backup must contain a ``base.tar`` cluster archive."""
backup_root = virtiofs_dir / "postgresql" / "backup"
latest = sorted(backup_root.iterdir())[-1]
assert (latest / "base.tar").exists(), f"base.tar missing in {latest}"
def test_database_dump_present(virtiofs_dir: Path):
"""At least one ``dump-test.sql.gz`` file must exist alongside the cluster backup."""
backup_root = virtiofs_dir / "postgresql" / "backup"
latest = sorted(backup_root.iterdir())[-1]
dumps = list(latest.glob("dump-test.sql.gz"))
assert dumps, f"No dump-test.sql.gz files found in {latest}"
# ---------------------------------------------------------------------------
# Retention policy
# ---------------------------------------------------------------------------
def test_backup_retention_enforced(postgresql_vm, test_ssh_key, virtiofs_dir: Path):
"""After triggering several extra backups the count must stay within the
configured retention limit (POSTGRES_BACKUP_RETENTION=7)."""
retention = 7
# Trigger ten additional backups so the rotation code has something to do.
for _ in range(10):
postgresql_vm.ssh_run(
"systemctl start postgresql-backup.service", test_ssh_key
)
state = postgresql_vm.wait_for_unit_done(
"postgresql-backup.service", test_ssh_key, timeout=120
)
assert state == "inactive"
time.sleep(1) # ensure distinct timestamp directories
backup_root = virtiofs_dir / "postgresql" / "backup"
count = len(list(backup_root.iterdir()))
assert count <= retention, (
f"Retention policy failed: {count} backups present, expected ≤ {retention}"
)

149
postgresql/tests/test_install.py

@ -0,0 +1,149 @@
"""Test that a fresh PostgreSQL installation is healthy.
These tests run against a brand-new VM booted from the cookbook's default
ignition (PG_MAJOR=14, example credentials). They verify:
- All expected systemd units are in the correct state.
- The PostgreSQL server is listening and accepts queries.
- VirtioFS is mounted and the expected directories exist.
"""
from pathlib import Path
from helpers import PG_MAJOR_DEFAULT, run_sql
# ---------------------------------------------------------------------------
# Systemd unit state
# ---------------------------------------------------------------------------
def test_postgresql_target_active(pg_host):
"""postgresql.target must be active once the full startup chain completes."""
assert pg_host.service("postgresql.target").is_running
def test_postgresql_server_running(pg_host):
"""The long-running PostgreSQL server container must be active."""
assert pg_host.service("postgresql-server.service").is_running
def test_set_major_oneshot_completed(pg_host):
"""postgresql-set-major.service (oneshot) must have finished — not still running."""
result = pg_host.run("systemctl is-active postgresql-set-major.service")
assert result.stdout.strip() == "inactive"
def test_init_oneshot_completed(pg_host):
"""postgresql-init.service (oneshot) must have finished after initialization."""
result = pg_host.run("systemctl is-active postgresql-init.service")
assert result.stdout.strip() == "inactive"
def test_upgrade_oneshot_completed(pg_host):
"""postgresql-upgrade.service (oneshot) must have finished — no upgrade needed
on a fresh install."""
result = pg_host.run("systemctl is-active postgresql-upgrade.service")
assert result.stdout.strip() == "inactive"
def test_backup_timer_scheduled(pg_host):
"""The daily backup timer must be active (scheduled)."""
assert pg_host.service("postgresql-backup.timer").is_running
# ---------------------------------------------------------------------------
# Network / socket
# ---------------------------------------------------------------------------
def test_postgresql_port_listening(pg_host):
"""PostgreSQL must be listening on 127.0.0.1:5432 (POSTGRES_ARGS=-h 127.0.0.1)."""
assert pg_host.socket("tcp://127.0.0.1:5432").is_listening
# ---------------------------------------------------------------------------
# Filesystem layout
# ---------------------------------------------------------------------------
def test_virtiofs_mounted(pg_host):
"""The VirtioFS share must be mounted at /var/lib/virtiofs/data."""
mount = pg_host.mount_point("/var/lib/virtiofs/data")
assert mount.exists
assert mount.filesystem == "virtiofs"
def test_virtiofs_postgresql_dir(pg_host):
"""/var/lib/virtiofs/data/postgresql must be created by tmpfiles.d."""
assert pg_host.file("/var/lib/virtiofs/data/postgresql").is_directory
def test_virtiofs_backup_dir(pg_host):
"""/var/lib/virtiofs/data/postgresql/backup must be created by tmpfiles.d."""
assert pg_host.file("/var/lib/virtiofs/data/postgresql/backup").is_directory
def test_data_dir_exists(pg_host):
"""/var/lib/quadlets/postgresql must exist with the correct ownership."""
f = pg_host.file("/var/lib/quadlets/postgresql")
assert f.is_directory
assert f.user == "postgresql"
def test_latest_symlink_exists(pg_host):
"""The 'latest' symlink must point to the active major-version directory."""
link = pg_host.file("/var/lib/quadlets/postgresql/latest")
assert link.exists
assert link.is_symlink
def test_version_dir_exists(pg_host):
"""A directory named after PG_MAJOR_DEFAULT must exist under the data dir."""
assert pg_host.file(
f"/var/lib/quadlets/postgresql/{PG_MAJOR_DEFAULT}"
).is_directory
def test_initialized_flag_exists(pg_host):
"""The .initialized sentinel file must be written after a successful init."""
assert pg_host.file("/var/lib/quadlets/postgresql/.initialized").exists
def test_config_env_present(pg_host):
"""/etc/quadlets/postgresql/config.env must be present and not world-readable."""
f = pg_host.file("/etc/quadlets/postgresql/config.env")
assert f.exists
# mode 0600 — world and group bits must be 0
assert f.mode & 0o077 == 0
# ---------------------------------------------------------------------------
# Database connectivity
# ---------------------------------------------------------------------------
def test_postgresql_accepts_connections(postgresql_vm, test_ssh_key):
"""PostgreSQL must respond to a trivial SQL query."""
output = run_sql(postgresql_vm, test_ssh_key, "SELECT 1 AS probe")
assert "1" in output
def test_postgresql_version_matches_config(postgresql_vm, test_ssh_key):
"""The running PostgreSQL server must report the version from PG_MAJOR_DEFAULT."""
output = run_sql(postgresql_vm, test_ssh_key, "SHOW server_version")
assert PG_MAJOR_DEFAULT in output
def test_can_create_database(postgresql_vm, test_ssh_key):
"""Should be possible to create a new database."""
run_sql(
postgresql_vm,
test_ssh_key,
"CREATE DATABASE install_test_db",
)
output = run_sql(
postgresql_vm,
test_ssh_key,
"SELECT datname FROM pg_database WHERE datname = 'install_test_db'",
)
assert "install_test_db" in output

154
postgresql/tests/test_recovery.py

@ -0,0 +1,154 @@
"""Test PostgreSQL automatic crash recovery.
Scenarios covered:
1. Container crash (SIGKILL via ``podman kill``) systemd restarts the
service automatically (Restart=always, RestartSec=10).
2. Hard VM reboot all services start cleanly and data is intact.
All tests share the module-scoped ``postgresql_vm`` fixture. Because some
tests are destructive (they kill the container), they are intentionally
sequenced: create data crash verify recovery create more data
reboot verify recovery.
"""
import time
from helpers import run_sql
# Data written before the crash that must survive each recovery scenario.
CRASH_WITNESS_TABLE = "crash_witness"
CRASH_WITNESS_VALUE = "before_crash"
REBOOT_WITNESS_TABLE = "reboot_witness"
REBOOT_WITNESS_VALUE = "before_reboot"
# ---------------------------------------------------------------------------
# Scenario 1: container crash
# ---------------------------------------------------------------------------
def test_server_running_before_crash(pg_host):
"""Precondition: postgresql-server.service must be active before we crash it."""
assert pg_host.service("postgresql-server.service").is_running
def test_create_data_before_crash(postgresql_vm, test_ssh_key):
"""Insert a row that must survive the container crash."""
run_sql(
postgresql_vm,
test_ssh_key,
(
f"CREATE TABLE IF NOT EXISTS {CRASH_WITNESS_TABLE} "
f"(id SERIAL PRIMARY KEY, message TEXT NOT NULL); "
f"INSERT INTO {CRASH_WITNESS_TABLE} (message) "
f"VALUES ('{CRASH_WITNESS_VALUE}');"
),
)
def test_kill_postgresql_container(postgresql_vm, test_ssh_key):
"""Simulate a process crash by sending SIGKILL to the container.
``podman kill`` delivers SIGKILL to the container's PID 1. Systemd will
detect the exit and restart the service after RestartSec=10 seconds.
"""
postgresql_vm.ssh_run(
"podman kill --signal SIGKILL postgresql-server",
test_ssh_key,
)
def test_service_restarts_automatically(postgresql_vm, test_ssh_key):
"""postgresql-server.service must be active again after the crash.
Allow up to 60 seconds: systemd waits RestartSec=10 s before restarting,
then the container start-up and health check take additional time.
"""
# Brief pause to let systemd register the exit before we start polling.
time.sleep(5)
postgresql_vm.wait_for_service(
"postgresql-server.service", test_ssh_key, timeout=120
)
def test_data_intact_after_crash_recovery(postgresql_vm, test_ssh_key):
"""Rows written before the crash must be present after automatic recovery."""
output = run_sql(
postgresql_vm,
test_ssh_key,
f"SELECT message FROM {CRASH_WITNESS_TABLE} "
f"WHERE message = '{CRASH_WITNESS_VALUE}'",
)
assert CRASH_WITNESS_VALUE in output, (
f"Crash witness row not found after recovery. Query returned: {output!r}"
)
def test_target_still_active_after_crash(pg_host):
"""postgresql.target must remain active after the container recovery."""
assert pg_host.service("postgresql.target").is_running
# ---------------------------------------------------------------------------
# Scenario 2: hard reboot
# ---------------------------------------------------------------------------
def test_create_data_before_reboot(postgresql_vm, test_ssh_key):
"""Insert a row that must survive a full VM reboot."""
run_sql(
postgresql_vm,
test_ssh_key,
(
f"CREATE TABLE IF NOT EXISTS {REBOOT_WITNESS_TABLE} "
f"(id SERIAL PRIMARY KEY, message TEXT NOT NULL); "
f"INSERT INTO {REBOOT_WITNESS_TABLE} (message) "
f"VALUES ('{REBOOT_WITNESS_VALUE}');"
),
)
def test_reboot_vm(postgresql_vm, test_ssh_key):
"""Trigger a graceful OS reboot. SSH will temporarily drop."""
postgresql_vm.ssh_run("systemctl reboot", test_ssh_key, check=False)
# Wait for the VM to go down before polling for SSH again.
time.sleep(15)
def test_ssh_available_after_reboot(postgresql_vm, test_ssh_key):
"""SSH must become available again within 5 minutes of the reboot."""
# Reset the cached IP so wait_ssh re-probes it.
postgresql_vm._ip = None
postgresql_vm.wait_ssh(ssh_key=test_ssh_key, timeout=300)
def test_postgresql_target_active_after_reboot(postgresql_vm, test_ssh_key):
"""postgresql.target must come up automatically on reboot (enabled in ignition)."""
postgresql_vm.wait_for_service(
"postgresql.target", ssh_key=test_ssh_key, timeout=300
)
def test_data_intact_after_reboot(postgresql_vm, test_ssh_key):
"""Rows written before the reboot must still be present after boot."""
output = run_sql(
postgresql_vm,
test_ssh_key,
f"SELECT message FROM {REBOOT_WITNESS_TABLE} "
f"WHERE message = '{REBOOT_WITNESS_VALUE}'",
)
assert REBOOT_WITNESS_VALUE in output, (
f"Reboot witness row not found. Query returned: {output!r}"
)
def test_crash_witness_also_intact_after_reboot(postgresql_vm, test_ssh_key):
"""Data written before the crash must also survive the subsequent reboot."""
output = run_sql(
postgresql_vm,
test_ssh_key,
f"SELECT message FROM {CRASH_WITNESS_TABLE} "
f"WHERE message = '{CRASH_WITNESS_VALUE}'",
)
assert CRASH_WITNESS_VALUE in output

163
postgresql/tests/test_upgrade.py

@ -0,0 +1,163 @@
"""Test the PostgreSQL major version upgrade path: PG 14 → PG 17.
The upgrade mechanism works as follows:
1. postgresql-set-major.service updates the ``latest`` symlink to point at
the new PG_MAJOR directory (e.g. /var/lib/quadlets/postgresql/17/).
2. postgresql-upgrade.service detects that
``latest/docker/PG_VERSION`` does not exist (the 17/ directory is
empty) and triggers pgautoupgrade.
3. pg_upgrade migrates data from the old directory to the new one.
4. postgresql-server.service starts against the upgraded data.
All tests in this module share a single ``upgrade_vm`` fixture that starts
with PG_MAJOR_UPGRADE_FROM (14). Tests are intentionally ordered to form a
sequential scenario: create data trigger upgrade verify outcome.
"""
from pathlib import Path
from helpers import PG_MAJOR_UPGRADE_FROM, PG_MAJOR_UPGRADE_TO, run_sql
# Sentinel table and row used to verify data survives the upgrade.
WITNESS_TABLE = "upgrade_witness"
WITNESS_VALUE = "before_upgrade"
# ---------------------------------------------------------------------------
# Pre-upgrade baseline
# ---------------------------------------------------------------------------
def test_initial_version_is_upgrade_from(upgrade_vm, test_ssh_key):
"""Precondition: the VM must be running PG_MAJOR_UPGRADE_FROM."""
output = run_sql(upgrade_vm, test_ssh_key, "SHOW server_version")
assert PG_MAJOR_UPGRADE_FROM in output, (
f"Expected PG {PG_MAJOR_UPGRADE_FROM}, got: {output!r}"
)
def test_create_witness_data(upgrade_vm, test_ssh_key):
"""Insert a row that must survive the major version upgrade."""
run_sql(
upgrade_vm,
test_ssh_key,
(
f"CREATE TABLE IF NOT EXISTS {WITNESS_TABLE} "
f"(id SERIAL PRIMARY KEY, message TEXT NOT NULL); "
f"INSERT INTO {WITNESS_TABLE} (message) VALUES ('{WITNESS_VALUE}');"
),
)
output = run_sql(
upgrade_vm,
test_ssh_key,
f"SELECT message FROM {WITNESS_TABLE} WHERE message = '{WITNESS_VALUE}'",
)
assert WITNESS_VALUE in output
# ---------------------------------------------------------------------------
# Trigger the upgrade
# ---------------------------------------------------------------------------
def test_bump_pg_major_in_config(upgrade_vm, test_ssh_key):
"""Change PG_MAJOR in config.env from UPGRADE_FROM to UPGRADE_TO."""
upgrade_vm.ssh_run(
f"sed -i 's/^PG_MAJOR={PG_MAJOR_UPGRADE_FROM}$/PG_MAJOR={PG_MAJOR_UPGRADE_TO}/' "
"/etc/quadlets/postgresql/config.env",
test_ssh_key,
)
# Verify the substitution worked.
result = upgrade_vm.ssh_run(
"grep ^PG_MAJOR= /etc/quadlets/postgresql/config.env",
test_ssh_key,
)
assert f"PG_MAJOR={PG_MAJOR_UPGRADE_TO}" in result.stdout
def test_restart_postgresql_target(upgrade_vm, test_ssh_key):
"""Restart postgresql.target to kick off the upgrade chain."""
upgrade_vm.ssh_run("systemctl restart postgresql.target", test_ssh_key)
def test_upgrade_service_completes(upgrade_vm, test_ssh_key):
"""postgresql-upgrade.service must finish in ``inactive`` state (not ``failed``).
pgautoupgrade can take several minutes for large databases; allow up to
10 minutes.
"""
state = upgrade_vm.wait_for_unit_done(
"postgresql-upgrade.service", test_ssh_key, timeout=600
)
assert state == "inactive", (
f"Upgrade service ended in state {state!r}. "
"Inspect with: systemctl status postgresql-upgrade.service --no-pager "
"and: journalctl -u postgresql-upgrade.service"
)
def test_server_active_after_upgrade(upgrade_vm, test_ssh_key):
"""postgresql-server.service must be active after the upgrade."""
upgrade_vm.wait_for_service(
"postgresql-server.service", test_ssh_key, timeout=120
)
# ---------------------------------------------------------------------------
# Post-upgrade verification
# ---------------------------------------------------------------------------
def test_new_version_is_running(upgrade_vm, test_ssh_key):
"""PostgreSQL must now report PG_MAJOR_UPGRADE_TO as the server version."""
output = run_sql(upgrade_vm, test_ssh_key, "SHOW server_version")
assert PG_MAJOR_UPGRADE_TO in output, (
f"Expected PG {PG_MAJOR_UPGRADE_TO} after upgrade, got: {output!r}"
)
def test_witness_data_preserved(upgrade_vm, test_ssh_key):
"""The row inserted before the upgrade must still be present and correct."""
output = run_sql(
upgrade_vm,
test_ssh_key,
f"SELECT message FROM {WITNESS_TABLE} WHERE message = '{WITNESS_VALUE}'",
)
assert WITNESS_VALUE in output, (
f"Witness row '{WITNESS_VALUE}' not found after upgrade. "
f"Query returned: {output!r}"
)
def test_old_data_dir_removed(upgrade_vm, test_ssh_key):
"""pgautoupgrade must remove the source data directory after a clean upgrade."""
result = upgrade_vm.ssh_run(
f"test -d /var/lib/quadlets/postgresql/{PG_MAJOR_UPGRADE_FROM}/docker",
test_ssh_key,
check=False,
)
assert result.returncode != 0, (
f"Old data directory for PG {PG_MAJOR_UPGRADE_FROM} still exists — "
"upgrade may not have cleaned up properly"
)
def test_latest_symlink_points_to_new_version(upgrade_vm, test_ssh_key):
"""The ``latest`` symlink must now point at the PG_MAJOR_UPGRADE_TO directory."""
result = upgrade_vm.ssh_run(
"readlink /var/lib/quadlets/postgresql/latest",
test_ssh_key,
)
assert PG_MAJOR_UPGRADE_TO in result.stdout, (
f"latest symlink does not point at PG {PG_MAJOR_UPGRADE_TO}: "
f"{result.stdout.strip()!r}"
)
def test_new_data_dir_has_pg_version_file(upgrade_vm, test_ssh_key):
"""PG_VERSION file must exist in the new data directory (server is healthy)."""
result = upgrade_vm.ssh_run(
f"cat /var/lib/quadlets/postgresql/{PG_MAJOR_UPGRADE_TO}/docker/PG_VERSION",
test_ssh_key,
)
assert PG_MAJOR_UPGRADE_TO in result.stdout

24
pyproject.toml

@ -0,0 +1,24 @@
[build-system]
requires = ["setuptools>=68"]
build-backend = "setuptools.build_meta"
[project]
name = "podman-quadlet-cookbook-tests"
version = "0.1.0"
requires-python = ">=3.11"
dependencies = [
"pytest>=8.0",
"pytest-testinfra>=10.1",
"paramiko>=3.4",
]
[tool.pytest.ini_options]
# No testpaths set: pytest discovers tests in all */tests/ directories.
# Run a specific cookbook: pytest postgresql/tests/
log_cli = true
log_cli_level = "INFO"
addopts = "-v"
[tool.setuptools]
# This repo is not a Python package — suppress automatic package discovery.
packages = []

0
tests/__init__.py

384
tests/vm.py

@ -0,0 +1,384 @@
"""Fedora CoreOS VM lifecycle helpers for end-to-end testing.
Requires running as root (virt-install, virsh, qemu-img need root privileges).
Typical usage:
vm = FCOSVirtualMachine(
name="postgresql-abc123",
ignition_file=Path("/tmp/fcos-test.ign"),
virtiofs_dir=Path("/srv/fcos-test-postgresql-abc123"),
)
vm.create()
vm.wait_ssh(ssh_key=key_path)
vm.wait_for_service("postgresql.target", ssh_key=key_path)
# ... run tests ...
vm.destroy()
"""
import base64
import re
import shutil
import subprocess
import tempfile
import textwrap
import time
from pathlib import Path
LIBVIRT_IMAGES_DIR = Path("/var/lib/libvirt/images")
FCOS_BASE_IMAGE = LIBVIRT_IMAGES_DIR / "library" / "fedora-coreos.qcow2"
# Butane spec version — must match the project convention.
BUTANE_VERSION = "1.4.0"
def ensure_fcos_ign(cookbook_dir: Path) -> Path:
"""Return the path to fcos.ign, building it via ``make butane`` if absent."""
fcos_ign = cookbook_dir / "fcos.ign"
if not fcos_ign.exists():
subprocess.run(
["make", "-C", str(cookbook_dir), "butane"],
check=True,
)
return fcos_ign
def build_test_ignition(
base_ignition: Path,
ssh_pubkey: str,
output: Path,
config_env_overrides: dict[str, str] | None = None,
extra_files: dict[str, tuple[str, int]] | None = None,
) -> Path:
"""Build a test ignition file by overlaying the cookbook's fcos.ign.
The overlay:
- Merges the base cookbook ignition (fcos.ign).
- Adds the test SSH public key to the root user so the test runner can
SSH in (FCOS allows root login with keys via PermitRootLogin
prohibit-password).
- Optionally patches /etc/quadlets/postgresql/config.env via
``config_env_overrides`` (merged on top of whatever the base ignition
already sets).
- Optionally injects arbitrary extra files via ``extra_files``:
``{"/path/on/vm": ("file content", 0o644)}``.
Args:
base_ignition: Path to the pre-built fcos.ign for the cookbook.
ssh_pubkey: Ed25519 public key string to inject for root.
output: Destination path for the compiled test ignition.
config_env_overrides: Key/value pairs to override in config.env.
The full config.env is re-written with these values merged on
top of the defaults from the base ignition.
extra_files: Additional files to inject into the VM image.
Returns:
``output`` path.
"""
with tempfile.TemporaryDirectory() as _tmpdir:
d = Path(_tmpdir)
# butane resolves "local:" references relative to the directory passed
# via -d; copy the base ignition there.
shutil.copy(base_ignition, d / "base.ign")
# Build the storage.files section of the overlay.
storage_section = _build_storage_section(config_env_overrides, extra_files)
overlay_bu = textwrap.dedent(f"""\
variant: fcos
version: {BUTANE_VERSION}
ignition:
config:
merge:
- local: base.ign
passwd:
users:
- name: root
ssh_authorized_keys:
- {ssh_pubkey}
systemd:
units:
# Disable & mask zincati to avoid reboots during testing.
- name: zincati.service
enabled: false
mask: true
""")
if storage_section:
overlay_bu += storage_section
overlay_bu_path = d / "test-overlay.bu"
overlay_bu_path.write_text(overlay_bu)
subprocess.run(
[
"butane",
"--strict",
"-d", str(d),
"-o", str(output),
str(overlay_bu_path),
],
check=True,
)
return output
def _build_storage_section(
config_env_overrides: dict[str, str] | None,
extra_files: dict[str, tuple[str, int]] | None,
) -> str:
"""Return a Butane ``storage:`` YAML block (or empty string if nothing to inject)."""
files = []
if config_env_overrides:
content = "\n".join(f"{k}={v}" for k, v in config_env_overrides.items()) + "\n"
files.append(
_butane_file("/etc/quadlets/postgresql/config.env", content, 0o600)
)
if extra_files:
for path, (content, mode) in extra_files.items():
files.append(_butane_file(path, content, mode))
if not files:
return ""
joined = "\n".join(files)
return f"storage:\n files:\n{joined}\n"
def _butane_file(path: str, content: str, mode: int) -> str:
"""Return a Butane file entry using a base64 data URI (avoids YAML quoting)."""
b64 = base64.b64encode(content.encode()).decode()
return (
f" - path: {path}\n"
f" mode: {mode}\n"
f" contents:\n"
f' source: "data:text/plain;base64,{b64}"\n'
)
class FCOSVirtualMachine:
"""Manages a Fedora CoreOS KVM virtual machine for end-to-end testing.
All public methods are synchronous and raise on failure. The caller is
responsible for calling ``destroy()`` (typically from a pytest fixture
teardown).
"""
def __init__(self, name: str, ignition_file: Path, virtiofs_dir: Path) -> None:
"""
Args:
name: Short identifier appended to "fcos-test-" to form the
libvirt domain name. Keep it unique across parallel tests.
ignition_file: Path to the compiled Ignition (.ign) file.
virtiofs_dir: Host directory that will be exposed inside the VM
at /var/lib/virtiofs/data via VirtioFS.
"""
self.name = name
self.vm_name = f"fcos-test-{name}"
self.ignition_file = Path(ignition_file)
self.virtiofs_dir = Path(virtiofs_dir)
self._images_dir = LIBVIRT_IMAGES_DIR / self.vm_name
self._ip: str | None = None
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def create(self) -> None:
"""Create disk images and start the VM via virt-install."""
self._images_dir.mkdir(parents=True, exist_ok=True)
self.virtiofs_dir.mkdir(parents=True, exist_ok=True)
ign_dest = self._images_dir / "fcos.ign"
shutil.copy(self.ignition_file, ign_dest)
ign_dest.chmod(0o644)
# Root OS disk: copy from the shared base QCOW2 image.
root_qcow2 = self._images_dir / "root.qcow2"
shutil.copy(FCOS_BASE_IMAGE, root_qcow2)
# Secondary disk for /var (keeps OS and data separate, matches common.mk).
var_qcow2 = self._images_dir / "var.qcow2"
subprocess.run(
["qemu-img", "create", "-f", "qcow2", str(var_qcow2), "100G"],
check=True,
)
subprocess.run(
[
"virt-install",
f"--name={self.vm_name}",
"--import",
"--noautoconsole",
"--ram=4096",
"--vcpus=2",
"--os-variant=fedora-coreos-stable",
f"--disk=path={root_qcow2},format=qcow2,size=50",
f"--disk=path={var_qcow2},format=qcow2",
f"--qemu-commandline=-fw_cfg name=opt/com.coreos/config,file={ign_dest}",
"--network=network=default,model=virtio",
"--console=pty,target.type=virtio",
"--serial=pty",
"--graphics=none",
"--boot=uefi",
"--memorybacking=access.mode=shared,source.type=memfd",
(
f"--filesystem=type=mount,accessmode=passthrough,"
f"driver.type=virtiofs,driver.queue=1024,"
f"source.dir={self.virtiofs_dir},target.dir=data"
),
],
check=True,
)
def destroy(self) -> None:
"""Forcefully stop and delete the VM and all associated disk images."""
subprocess.run(["virsh", "destroy", self.vm_name], capture_output=True)
subprocess.run(
["virsh", "undefine", self.vm_name, "--nvram"],
capture_output=True,
)
if self._images_dir.exists():
shutil.rmtree(self._images_dir)
if self.virtiofs_dir.exists():
shutil.rmtree(self.virtiofs_dir)
# ------------------------------------------------------------------
# Readiness polling
# ------------------------------------------------------------------
def get_ip(self) -> str | None:
"""Return the VM's primary IPv4 address reported by virsh, or None."""
result = subprocess.run(
["virsh", "domifaddr", self.vm_name],
capture_output=True,
text=True,
)
if result.returncode != 0:
return None
match = re.search(r"(\d+\.\d+\.\d+\.\d+)", result.stdout)
return match.group(1) if match else None
@property
def ip(self) -> str:
if self._ip is None:
self._ip = self.get_ip()
if self._ip is None:
raise RuntimeError(f"VM {self.vm_name!r} has no IP address yet")
return self._ip
def wait_ssh(self, ssh_key: Path, timeout: int = 300) -> str:
"""Block until SSH is reachable. Returns the IP address.
Polls every 5 seconds until ``timeout`` seconds have elapsed.
"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
ip = self.get_ip()
if ip:
try:
result = subprocess.run(
[
"ssh",
"-i", str(ssh_key),
"-o", "StrictHostKeyChecking=no",
"-o", "UserKnownHostsFile=/dev/null",
"-o", "ConnectTimeout=5",
"-o", "BatchMode=yes",
f"root@{ip}",
"true",
],
capture_output=True,
timeout=10,
)
if result.returncode == 0:
self._ip = ip
return ip
except subprocess.TimeoutExpired:
pass
time.sleep(5)
raise TimeoutError(
f"VM {self.vm_name!r} did not become SSH-ready within {timeout}s"
)
def wait_for_service(
self, service: str, ssh_key: Path, timeout: int = 120
) -> None:
"""Block until *service* reaches the ``active`` state."""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
result = self.ssh_run(
f"systemctl is-active {service}", ssh_key, check=False
)
if result.stdout.strip() == "active":
return
time.sleep(5)
status = self.ssh_run(
f"systemctl status {service} --no-pager", ssh_key, check=False
)
raise TimeoutError(
f"Service {service!r} not active after {timeout}s:\n{status.stdout}"
)
def wait_for_unit_done(
self, service: str, ssh_key: Path, timeout: int = 120
) -> str:
"""Block until a oneshot service finishes (``inactive`` or ``failed``).
Returns:
The final state string: ``"inactive"`` on success, ``"failed"``
on failure.
"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
result = self.ssh_run(
f"systemctl is-active {service}", ssh_key, check=False
)
state = result.stdout.strip()
if state in ("inactive", "failed"):
return state
time.sleep(5)
raise TimeoutError(
f"Service {service!r} did not finish within {timeout}s"
)
# ------------------------------------------------------------------
# Remote execution
# ------------------------------------------------------------------
def ssh_run(
self,
command: str,
ssh_key: Path,
check: bool = True,
) -> subprocess.CompletedProcess:
"""Run a shell command in the VM via SSH.
Args:
command: Shell command string passed to the remote bash.
ssh_key: Path to the private key used for authentication.
check: If True (default), raise RuntimeError on non-zero exit.
Returns:
CompletedProcess with stdout/stderr as text.
"""
result = subprocess.run(
[
"ssh",
"-i", str(ssh_key),
"-o", "StrictHostKeyChecking=no",
"-o", "UserKnownHostsFile=/dev/null",
f"root@{self.ip}",
command,
],
capture_output=True,
text=True,
)
if check and result.returncode != 0:
raise RuntimeError(
f"SSH command failed (exit {result.returncode}): {command!r}\n"
f"stdout: {result.stdout}\nstderr: {result.stderr}"
)
return result
Loading…
Cancel
Save