Browse Source

WiP postgresql tests

main
Nicolas Massé 1 month ago
parent
commit
f26d2fc371
  1. 46
      conftest.py
  2. 2
      cookbooks/postgresql/postgresql-init.container
  3. 4
      cookbooks/postgresql/postgresql.image
  4. 173
      cookbooks/postgresql/tests/conftest.py
  5. 118
      cookbooks/postgresql/tests/helpers.py
  6. 204
      cookbooks/postgresql/tests/test_01_install_upgrade_backup.py
  7. 99
      cookbooks/postgresql/tests/test_02_restore.py
  8. 4
      cookbooks/postgresql/tests/test_install.py
  9. 59
      cookbooks/postgresql/tests/test_security.py
  10. 2
      scripts/common.mk
  11. 350
      tests/fcos_vm.py
  12. 291
      tests/test_quadlet.py
  13. 384
      tests/vm.py

46
conftest.py

@ -3,10 +3,52 @@ from pathlib import Path
import pytest import pytest
# Persistent directory used when --keep-vm is active.
_KEEP_VM_CACHE_DIR = Path.home() / ".cache" / "podman-quadlet-cookbook-tests"
def pytest_addoption(parser: pytest.Parser) -> None:
parser.addoption(
"--keep-vm",
action="store_true",
default=False,
help=(
"Keep the test VM alive after the test run and reuse it on the next run. "
"Speeds up iteration: the VM is created once and never destroyed. "
"The SSH key is stored persistently in "
f"{_KEEP_VM_CACHE_DIR}."
),
)
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def test_ssh_key(tmp_path_factory: pytest.TempPathFactory) -> Path: def keep_vm(request: pytest.FixtureRequest) -> bool:
"""Generate a temporary SSH key pair (no passphrase) for VM access.""" """True when --keep-vm was passed on the command line."""
return request.config.getoption("--keep-vm")
@pytest.fixture(scope="session")
def test_ssh_key(
keep_vm: bool,
tmp_path_factory: pytest.TempPathFactory,
) -> Path:
"""SSH key pair for VM access.
When --keep-vm is set the key is stored persistently so that subsequent
runs can re-use the same VM without re-injecting a new key.
"""
if keep_vm:
key_dir = _KEEP_VM_CACHE_DIR
key_dir.mkdir(parents=True, exist_ok=True)
key_path = key_dir / "id_ed25519"
if not key_path.exists():
subprocess.run(
["ssh-keygen", "-t", "ed25519", "-N", "", "-f", str(key_path)],
check=True,
capture_output=True,
)
return key_path
key_dir = tmp_path_factory.mktemp("ssh-key") key_dir = tmp_path_factory.mktemp("ssh-key")
key_path = key_dir / "id_ed25519" key_path = key_dir / "id_ed25519"
subprocess.run( subprocess.run(

2
cookbooks/postgresql/postgresql-init.container

@ -53,7 +53,7 @@ Volume=/etc/quadlets/postgresql/init.d:/docker-entrypoint-initdb.d:z,ro
[Service] [Service]
Restart=no Restart=no
TimeoutStartSec=30 TimeoutStartSec=60
# These environment variables are sourced to be used by systemd in the Exec* commands # These environment variables are sourced to be used by systemd in the Exec* commands
EnvironmentFile=/etc/quadlets/postgresql/config.env EnvironmentFile=/etc/quadlets/postgresql/config.env

4
cookbooks/postgresql/postgresql.image

@ -1,6 +1,6 @@
[Unit] [Unit]
Description=podman pull docker.io/pgautoupgrade/pgautoupgrade Description=podman pull docker.io/library/postgres
Documentation=https://hub.docker.com/r/pgautoupgrade/pgautoupgrade Documentation=https://hub.docker.com/_/postgres/
# Only start if PostgreSQL has been configured # Only start if PostgreSQL has been configured
ConditionPathExists=/etc/quadlets/postgresql/config.env ConditionPathExists=/etc/quadlets/postgresql/config.env

173
cookbooks/postgresql/tests/conftest.py

@ -2,166 +2,57 @@
Prerequisites: Prerequisites:
- Must run as root (KVM/libvirt access). - Must run as root (KVM/libvirt access).
- The Fedora CoreOS base QCOW2 image must be present at - The Fedora CoreOS base QCOW2 image must be present at /var/lib/libvirt/images/library/fedora-coreos.qcow2.
/var/lib/libvirt/images/library/fedora-coreos.qcow2. Run ``coreos-installer download -p qemu -f qcow2.xz -d -C /var/lib/libvirt/images/library/`` to fetch it.
Run ``coreos-installer download -p qemu -f qcow2.xz -d - fcos.ign for the postgresql cookbook is built on demand by ``make -C postgresql butane`` if it is missing.
-C /var/lib/libvirt/images/library/`` to fetch it.
- fcos.ign for the postgresql cookbook is built on demand by
``make -C postgresql butane`` if it is missing. This requires
local.bu (SSH keys, user setup) to be present at the repository root.
""" """
import os
import shutil import shutil
import subprocess
import sys import sys
from pathlib import Path
import pytest import pytest
import testinfra import testinfra
REPO_ROOT = Path(__file__).parent.parent.parent from pathlib import Path
POSTGRESQL_DIR = REPO_ROOT / "postgresql" THIS_COOKBOOK_DIR = Path(__file__).parent.parent
COOKBOOKS_DIR = THIS_COOKBOOK_DIR.parent
TOP_LEVEL_DIR = COOKBOOKS_DIR.parent
THIS_COOKBOOK_NAME = THIS_COOKBOOK_DIR.name
# Add directories to the path so we can import local helpers and shared vm.py. # Add directories to the path so we can import local helpers and shared vm.py.
sys.path.insert(0, str(Path(__file__).parent)) sys.path.insert(0, str(Path(__file__).parent))
sys.path.insert(0, str(REPO_ROOT / "tests")) sys.path.insert(0, str(TOP_LEVEL_DIR / "tests"))
from vm import FCOSVirtualMachine, build_test_ignition, ensure_fcos_ign # noqa: E402 from fcos_vm import FCOSVirtualMachine, ensure_fcos_ign # noqa: E402
from helpers import ( # The virtiofs is where important and persistent data are stored.
PG_DB, # We keep it for the entire test session.
PG_MAJOR_DEFAULT, @pytest.fixture(scope="session")
PG_MAJOR_UPGRADE_FROM, def virtiofs_dirs(keep_vm: bool) -> list[tuple[Path, str]]:
PG_MAJOR_UPGRADE_TO, """VirtioFS host directories for the default test VM.
PG_PASSWORD,
PG_USER, With --keep-vm the directories are persistent so the VM can be reused across
run_sql, test runs. Without it unique per-process paths are used and cleaned up
) on teardown.
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _default_config_env(pg_major: str) -> dict[str, str]:
"""Return the full default config.env content as a dict for the given PG major."""
return {
"PG_MAJOR": pg_major,
"POSTGRES_USER": PG_USER,
"POSTGRES_PASSWORD": PG_PASSWORD,
"POSTGRES_DB": PG_DB,
"POSTGRES_HOST_AUTH_METHOD": "scram-sha-256",
"POSTGRES_INITDB_ARGS": "--auth-host=scram-sha-256",
"POSTGRES_ARGS": "-h 127.0.0.1",
"PGPORT": "5432",
"POSTGRES_BACKUP_RETENTION": "7",
}
# ---------------------------------------------------------------------------
# Shared fixtures (module-scoped → one VM per test module)
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
def virtiofs_dir() -> Path:
"""Unique VirtioFS host directory for the default test VM."""
d = Path("/srv") / f"fcos-test-postgresql-{os.getpid()}"
d.mkdir(parents=True, exist_ok=True)
yield d
if d.exists():
shutil.rmtree(d)
@pytest.fixture(scope="module")
def postgresql_vm(
test_ssh_key: Path,
test_ssh_pubkey: str,
virtiofs_dir: Path,
tmp_path_factory: pytest.TempPathFactory,
) -> FCOSVirtualMachine:
"""Running CoreOS VM with PostgreSQL installed at the default PG version.
The VM is created once per test module and destroyed in teardown.
All tests in the same module share this VM instance.
""" """
fcos_ign = ensure_fcos_ign(POSTGRESQL_DIR) if keep_vm:
test_ign = tmp_path_factory.mktemp("ign") / "fcos-test.ign" d = Path("/srv") / f"fcos-test-{THIS_COOKBOOK_NAME}-dev"
build_test_ignition( else:
base_ignition=fcos_ign, d = Path("/srv") / f"fcos-test-{THIS_COOKBOOK_NAME}-{os.getpid()}"
ssh_pubkey=test_ssh_pubkey, d.mkdir(parents=True, exist_ok=True)
output=test_ign,
) yield [(d, "data",)] # <-- tests run here with access to the virtiofs directories
vm = FCOSVirtualMachine(
name=f"postgresql-{os.getpid()}",
ignition_file=test_ign,
virtiofs_dir=virtiofs_dir,
)
vm.create()
vm.wait_ssh(ssh_key=test_ssh_key, timeout=300)
vm.wait_for_service("postgresql.target", ssh_key=test_ssh_key, timeout=300)
yield vm
vm.destroy()
if not keep_vm and d.exists():
shutil.rmtree(d)
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def pg_host(postgresql_vm: FCOSVirtualMachine, test_ssh_key: Path): def fcos_host(fcos_vm: FCOSVirtualMachine, test_ssh_key: Path):
"""testinfra SSH host connected to the default PostgreSQL VM.""" """testinfra SSH host connected to the default FCOS VM."""
return testinfra.get_host( return testinfra.get_host(
f"ssh://root@{postgresql_vm.ip}", f"ssh://root@{fcos_vm.ip}",
ssh_extra_args=( ssh_extra_args=(
f"-i {test_ssh_key}" f"-i {test_ssh_key}"
" -o StrictHostKeyChecking=no" " -o StrictHostKeyChecking=no"
" -o UserKnownHostsFile=/dev/null" " -o UserKnownHostsFile=/dev/null"
), ),
) )
@pytest.fixture(scope="module")
def upgrade_virtiofs_dir() -> Path:
"""Unique VirtioFS host directory for the upgrade test VM."""
d = Path("/srv") / f"fcos-test-pg-upgrade-{os.getpid()}"
d.mkdir(parents=True, exist_ok=True)
yield d
if d.exists():
shutil.rmtree(d)
@pytest.fixture(scope="module")
def upgrade_vm(
test_ssh_key: Path,
test_ssh_pubkey: str,
upgrade_virtiofs_dir: Path,
tmp_path_factory: pytest.TempPathFactory,
) -> FCOSVirtualMachine:
"""Running CoreOS VM with PostgreSQL installed at PG_MAJOR_UPGRADE_FROM.
Used exclusively by test_upgrade.py to verify the major version upgrade path.
The config.env is overridden via the ignition overlay so the VM boots
directly with PG_MAJOR_UPGRADE_FROM, regardless of the cookbook's default.
"""
fcos_ign = ensure_fcos_ign(POSTGRESQL_DIR)
test_ign = tmp_path_factory.mktemp("ign-upgrade") / "fcos-upgrade.ign"
build_test_ignition(
base_ignition=fcos_ign,
ssh_pubkey=test_ssh_pubkey,
output=test_ign,
config_env_overrides=_default_config_env(PG_MAJOR_UPGRADE_FROM),
)
vm = FCOSVirtualMachine(
name=f"pg-upgrade-{os.getpid()}",
ignition_file=test_ign,
virtiofs_dir=upgrade_virtiofs_dir,
)
vm.create()
vm.wait_ssh(ssh_key=test_ssh_key, timeout=300)
vm.wait_for_service("postgresql.target", ssh_key=test_ssh_key, timeout=300)
yield vm
vm.destroy()

118
cookbooks/postgresql/tests/helpers.py

@ -1,39 +1,101 @@
"""Shared constants and helper functions for PostgreSQL integration tests. import sys
import pytest
import testinfra
from pathlib import Path
THIS_COOKBOOK_DIR = Path(__file__).parent.parent
COOKBOOKS_DIR = THIS_COOKBOOK_DIR.parent
TOP_LEVEL_DIR = COOKBOOKS_DIR.parent
THIS_COOKBOOK_NAME = THIS_COOKBOOK_DIR.name
sys.path.insert(0, str(TOP_LEVEL_DIR / "tests"))
import test_quadlet # noqa: E402
These are extracted from conftest.py so that test modules can import them
without conflicting with pytest's conftest discovery mechanism.
""" """
Verify that the postgresql Quadlet is correctly installed and configured on a fresh VM boot.
"""
class TestPostgresqlQuadlet(test_quadlet.TestQuadlet):
from pathlib import Path expected_services = [
{ "name": "postgresql-server.service", "state": "active", "exists": True },
{ "name": "postgresql-init.service", "state": "inactive", "exists": True },
{ "name": "postgresql-upgrade.service", "state": "inactive", "exists": True },
{ "name": "postgresql-backup.service", "state": "inactive", "exists": True },
{ "name": "postgresql-set-major.service", "state": "inactive", "exists": True },
{ "name": "postgresql-backup.timer", "state": "active", "exists": True },
{ "name": "postgresql.target", "state": "active", "exists": True },
]
expected_sockets = [
{ "uri": "tcp://127.0.0.1:5432", "state": "listening" },
{ "uri": "tcp://0.0.0.0:5432", "state": "closed" },
{ "uri": "tcp://:::5432", "state": "closed" },
]
expected_ports = [
{ "number": 5432, "protocol": "tcp", "state": "closed" },
{ "number": 22, "protocol": "tcp", "state": "open" },
]
expected_files = [
{ "path": "/var/lib/quadlets/postgresql", "type": "directory", "owner": "postgresql", "group": "itix-svc", "mode": 0o755 },
{ "path": "/etc/quadlets/postgresql/config.env", "type": "file", "owner": "root", "group": "root", "mode": 0o600 },
{ "path": "/var/lib/virtiofs/data/postgresql", "type": "directory", "owner": "postgresql", "group": "itix-svc", "mode": 0o700 },
{ "path": "/var/lib/virtiofs/data/postgresql/backup", "type": "directory", "owner": "postgresql", "group": "itix-svc", "mode": 0o700 },
{ "path": "/var/lib/quadlets/postgresql/.initialized", "type": "file", "owner": "root", "group": "root", "mode": 0o644 },
]
expected_podman_images = [
]
expected_podman_containers = [
{ "name": "postgresql-server", "state": "present", "pid1": { "owner": "10004", "group": "10000", "commandline": "postgres -h 127.0.0.1" } },
]
# Default version shipped in the example config.env. expected_main_service = "postgresql.target"
PG_MAJOR_DEFAULT = "14" expected_main_service_timeout = 300
expected_pg_major = 0 # TODO: set this variable in subclasses
# Version to start from in the major-upgrade scenario. def _run_sql(self, fcos_host, query: str, check: bool = True, database: str = "postgres") -> str:
PG_MAJOR_UPGRADE_FROM = "14" """Execute *sql* via ``podman exec`` on the running postgresql-server container.
# Version to upgrade to in the major-upgrade scenario. Uses the Unix socket at /var/run/postgresql inside the container (mapped
PG_MAJOR_UPGRADE_TO = "17" from /run/quadlets/postgresql on the host). The pg_hba.conf generated by
the official postgres image grants trust access on local sockets, so no
password is required.
# Default credentials from config/examples/config.env. Returns:
PG_USER = "postgres" Stripped stdout of the psql command.
PG_PASSWORD = "postgres" """
PG_DB = "postgres" result = fcos_host.run(
f"podman exec postgresql-server psql -U postgres -d {database} --csv -t -c %s", query
)
if check:
assert result.exit_status == 0, f"SQL query \"{query}\" failed with exit code {result.exit_status}: {result.stderr}"
return result.stdout.strip()
def test_postgresql_major_version_items(self, fcos_host):
"""The major version from the config must be reflected in the filesystem and in the running Podman image."""
self.check_expected_files(fcos_host, [
{ "path": f"/var/lib/quadlets/postgresql/{self.expected_pg_major}", "type": "directory", "owner": "postgresql", "group": "itix-svc", "mode": 0o755 },
])
self.check_expected_podman_images(fcos_host, [
{ "name": "docker.io/library/postgres", "tag": f"{self.expected_pg_major}-alpine", "state": "present" },
])
def run_sql(vm, ssh_key: Path, sql: str) -> str: def test_latest_symlink_exists(self, fcos_host):
"""Execute *sql* via ``podman exec`` on the running postgresql-server container. """The 'latest' symlink must point to the active major-version directory."""
link = fcos_host.file("/var/lib/quadlets/postgresql/latest")
assert link.exists
assert link.is_symlink
assert link.linked_to == f"/var/lib/quadlets/postgresql/{self.expected_pg_major}"
Uses the Unix socket at /var/run/postgresql inside the container (mapped def test_postgresql_accepts_connections(self, fcos_host):
from /run/quadlets/postgresql on the host). The pg_hba.conf generated by """PostgreSQL must respond to a trivial SQL query."""
the official postgres image grants trust access on local sockets, so no output = self._run_sql(fcos_host, "SELECT 1 AS probe")
password is required. assert output == "1", f"Unexpected output from SQL query: {output}"
Returns: def test_postgresql_version_matches_config(self, fcos_host):
Stripped stdout of the psql command. """The running PostgreSQL server must report the version from PG_MAJOR_DEFAULT."""
""" output = self._run_sql(fcos_host, "SHOW server_version")
result = vm.ssh_run( assert output.startswith(f"{self.expected_pg_major}."), f"Expected PostgreSQL server version to start with {self.expected_pg_major}, but got {output}"
f"podman exec postgresql-server psql -U {PG_USER} -t -c \"{sql}\"",
ssh_key,
)
return result.stdout.strip()

204
cookbooks/postgresql/tests/test_01_install_upgrade_backup.py

@ -0,0 +1,204 @@
import sys
import pytest
import testinfra
import os
import shutil
import subprocess
import textwrap
from pathlib import Path
THIS_COOKBOOK_DIR = Path(__file__).parent.parent
COOKBOOKS_DIR = THIS_COOKBOOK_DIR.parent
TOP_LEVEL_DIR = COOKBOOKS_DIR.parent
THIS_COOKBOOK_NAME = THIS_COOKBOOK_DIR.name
# Add directories to the path so we can import Python modules from the top level "tests" directory and current directory.
sys.path.insert(0, str(Path(__file__).parent))
sys.path.insert(0, str(TOP_LEVEL_DIR / "tests"))
import helpers # noqa: E402
from fcos_vm import FCOSVirtualMachine, ensure_fcos_ign # noqa: E402
# PostgreSQL major versions to test during upgrade from PG_MAJOR_DEFAULT.
@pytest.fixture(scope="session", params=[15, 16, 17, 18])
def pg_upgrade_major(request) -> int:
return int(request.param)
# Major version of PostgreSQL to install by default on a fresh VM boot.
PG_MAJOR_DEFAULT = 14
# PostgreSQL VM are kept for the duration of a test module, backed with a persistent Virtiofs directory.
@pytest.fixture(scope="module")
def fcos_vm(
request,
keep_vm: bool,
test_ssh_key: Path,
test_ssh_pubkey: str,
virtiofs_dirs: list[tuple[Path, str]],
tmp_path_factory: pytest.TempPathFactory,
) -> FCOSVirtualMachine:
"""Running CoreOS VM with Quadlets installed.
With --keep-vm the VM is reused across runs: it is created only if it
does not already exist and is never destroyed on teardown.
"""
module_name = request.module.__name__.split(".")[-1].replace("test_", "").replace("_", "-")
vm = FCOSVirtualMachine(
cookbook_name=THIS_COOKBOOK_NAME,
instance_name=module_name,
keep=keep_vm,
virtiofs_dirs=virtiofs_dirs,
)
if not (keep_vm and vm.exists()):
fcos_ign = ensure_fcos_ign(THIS_COOKBOOK_DIR)
vm.ignition.ignition_files.append(fcos_ign)
vm.ignition.extra_files.update({
"/etc/quadlets/postgresql/config.env": (
textwrap.dedent(f"""
# This file is generated by conftest.py for testing purposes.
POSTGRES_USER=postgres
POSTGRES_PASSWORD=postgres
POSTGRES_DB=postgres
POSTGRES_HOST_AUTH_METHOD=scram-sha-256
POSTGRES_INITDB_ARGS=--auth-host=scram-sha-256
POSTGRES_ARGS=-h 127.0.0.1
PGPORT=5432
PG_MAJOR={PG_MAJOR_DEFAULT}
POSTGRES_BACKUP_RETENTION=7
"""),
0,
0,
0o600,
),
"/etc/quadlets/postgresql/init.d/test.sql": (
textwrap.dedent("""
-- This file is generated by conftest.py for testing purposes.
CREATE USER test WITH PASSWORD 'test';
CREATE DATABASE testdb OWNER test;
GRANT ALL PRIVILEGES ON DATABASE testdb TO test;
ALTER ROLE test SET client_encoding TO 'utf8';
"""),
10004,
10000,
0o600,
),
})
vm.ignition.ssh_key = test_ssh_pubkey
vm.create()
vm.wait_ssh(ssh_key=test_ssh_key, timeout=300)
yield vm # <-- tests run here with access to the VM instance
if not keep_vm:
vm.destroy()
"""
Verify that the postgresql Quadlet is correctly installed and configured on a fresh VM boot.
"""
class TestPostgresqlQuadletInstallUpgradeBackup(helpers.TestPostgresqlQuadlet):
expected_pg_major = PG_MAJOR_DEFAULT
def test_can_create_database(self, fcos_host):
"""Should be possible to create a new database."""
self._run_sql(fcos_host, "CREATE DATABASE upgrade_path_db")
output = self._run_sql(fcos_host, "SELECT datname FROM pg_database WHERE datname = 'upgrade_path_db'")
assert output == "upgrade_path_db", f"Unexpected output from SQL query: {output}"
output = self._run_sql(fcos_host, "CREATE TABLE upgrade_path (version VARCHAR);", database="upgrade_path_db")
output = self._run_sql(fcos_host, "INSERT INTO upgrade_path (version) SELECT version();", database="upgrade_path_db")
def test_init_hook_has_created_database(self, fcos_host):
"""The injected init hook has created the test database and user."""
output = self._run_sql(fcos_host, "SELECT datname FROM pg_database WHERE datname = 'testdb'")
assert output == "testdb", f"Unexpected output from SQL query: {output}"
output = self._run_sql(fcos_host, "SELECT 1 FROM pg_roles WHERE rolname = 'test'")
assert output == "1", f"Unexpected output from SQL query: {output}"
def test_created_database_and_user_is_working(self, fcos_host):
"""Should be able to connect to the test database with the test user."""
result = fcos_host.run(
"podman exec postgresql-server psql -U test -d testdb --csv -t -c %s", "SELECT 1 AS probe"
)
assert result.exit_status == 0, f"SQL query failed with exit code {result.exit_status}: {result.stderr}"
output = result.stdout.strip()
assert output == "1", f"Unexpected output from SQL query: {output}"
def test_upgrade_postgresql(self, fcos_host, pg_upgrade_major):
"""Should be able to upgrade PostgreSQL by changing PG_MAJOR and rebooting."""
# Stop the server to release the data directory
result = fcos_host.run("systemctl stop postgresql.target")
assert result.exit_status == 0, f"Failed to stop postgresql.target with exit code {result.exit_status}: {result.stderr}"
self.check_expected_services(fcos_host, expected_services=[
{ "name": "postgresql-server.service", "state": "inactive", "exists": True },
])
# Change PG_MAJOR in the config.env
fcos_host.run(f"sed -i 's/^PG_MAJOR=.*/PG_MAJOR={pg_upgrade_major}/' /etc/quadlets/postgresql/config.env")
# Start the server after changing the data directory
result = fcos_host.run("systemctl start postgresql.target")
assert result.exit_status == 0, f"Failed to start postgresql.target with exit code {result.exit_status}: {result.stderr}"
self.check_expected_services(fcos_host, expected_services=[
{ "name": "postgresql-server.service", "state": "active", "exists": True },
{ "name": "postgresql-init.service", "state": "inactive", "exists": True },
{ "name": "postgresql-upgrade.service", "state": "inactive", "exists": True },
])
# The server_version must reflect the new major version after the upgrade
output = self._run_sql(fcos_host, "SHOW server_version")
assert output.startswith(f"{pg_upgrade_major}."), f"Expected PostgreSQL server version to start with {pg_upgrade_major}, but got {output}"
def test_data_is_still_there_after_upgrade(self, fcos_host, pg_upgrade_major):
"""Data created before the upgrade must still be there after the upgrade."""
# Check that the old data is still there after the upgrade
output = self._run_sql(fcos_host, "SELECT datname FROM pg_database WHERE datname = 'upgrade_path_db'")
assert output == "upgrade_path_db", f"Unexpected output from SQL query: {output}"
output = self._run_sql(fcos_host, "SELECT datname FROM pg_database WHERE datname = 'testdb'")
assert output == "testdb", f"Unexpected output from SQL query: {output}"
result = fcos_host.run(
"podman exec postgresql-server psql -U test -d testdb --csv -t -c %s", "SELECT 1 AS probe"
)
assert result.exit_status == 0, f"SQL query failed with exit code {result.exit_status}: {result.stderr}"
def test_insert_version(self, fcos_host, pg_upgrade_major):
"""Should be able to insert data into the database after the upgrade."""
output = self._run_sql(fcos_host, "INSERT INTO upgrade_path (version) SELECT version();", database="upgrade_path_db")
def test_upgraded_postgresql_version_is_correct(self, fcos_host, pg_upgrade_major):
"""The running PostgreSQL server must report the updated version."""
# The server_version must reflect the new major version after the upgrade
output = self._run_sql(fcos_host, "SHOW server_version")
assert output.startswith(f"{pg_upgrade_major}."), f"Expected PostgreSQL server version to start with {pg_upgrade_major}, but got {output}"
# The new PostgreSQL major version's image must be pulled and present in Podman after the upgrade
self.check_expected_podman_images(fcos_host, expected_podman_images=[
{ "name": "docker.io/library/postgres", "tag": f"{pg_upgrade_major}-alpine", "state": "present" },
])
def test_latest_symlink_has_expected_target(self, fcos_host, pg_upgrade_major):
"""The 'latest' symlink must point to the active major-version directory."""
link = fcos_host.file("/var/lib/quadlets/postgresql/latest")
assert link.exists
assert link.is_symlink
assert link.linked_to == f"/var/lib/quadlets/postgresql/{pg_upgrade_major}"
def test_create_backup(self, fcos_host):
"""Should be able to create a backup using the backup service."""
result = fcos_host.run("systemctl start postgresql-backup.service")
assert result.exit_status == 0, f"Failed to start postgresql-backup.service with exit code {result.exit_status}: {result.stderr}"
# Check that a backup file has been created in the backup directory
backup_dir = fcos_host.file("/var/lib/virtiofs/data/postgresql/backup")
assert backup_dir.exists
assert backup_dir.is_directory
backup_list = backup_dir.listdir()
assert len(backup_list) > 0, "No backup files found in the backup directory after running the backup service!"
latest_backup = max(backup_list)
latest_backup_content = fcos_host.file(f"/var/lib/virtiofs/data/postgresql/backup/{latest_backup}").listdir()
assert len(latest_backup_content) > 0, "No files found in the latest backup directory after running the backup service!"
assert "backup_manifest" in latest_backup_content, f"Expected 'backup_manifest' file in the backup, but got: {latest_backup_content}"
assert "base.tar" in latest_backup_content, f"Expected 'base.tar' file in the backup, but got: {latest_backup_content}"
assert "pg_wal.tar" in latest_backup_content, f"Expected 'pg_wal.tar' file in the backup, but got: {latest_backup_content}"
assert "dump-upgrade_path_db.sql.gz" in latest_backup_content, f"Expected 'dump-upgrade_path_db.sql.gz' file in the backup, but got: {latest_backup_content}"
assert "dump-testdb.sql.gz" in latest_backup_content, f"Expected 'dump-testdb.sql.gz' file in the backup, but got: {latest_backup_content}"

99
cookbooks/postgresql/tests/test_02_restore.py

@ -0,0 +1,99 @@
import sys
import pytest
import testinfra
import os
import shutil
import subprocess
import textwrap
from pathlib import Path
THIS_COOKBOOK_DIR = Path(__file__).parent.parent
COOKBOOKS_DIR = THIS_COOKBOOK_DIR.parent
TOP_LEVEL_DIR = COOKBOOKS_DIR.parent
THIS_COOKBOOK_NAME = THIS_COOKBOOK_DIR.name
# Add directories to the path so we can import Python modules from the top level "tests" directory and current directory.
sys.path.insert(0, str(Path(__file__).parent))
sys.path.insert(0, str(TOP_LEVEL_DIR / "tests"))
import helpers # noqa: E402
from fcos_vm import FCOSVirtualMachine, ensure_fcos_ign # noqa: E402
# Major version of PostgreSQL to install by default on a fresh VM boot.
PG_MAJOR_DEFAULT = 18
# PostgreSQL VM are kept for the duration of a test module, backed with a persistent Virtiofs directory.
@pytest.fixture(scope="module")
def fcos_vm(
request,
keep_vm: bool,
test_ssh_key: Path,
test_ssh_pubkey: str,
virtiofs_dirs: list[tuple[Path, str]],
tmp_path_factory: pytest.TempPathFactory,
) -> FCOSVirtualMachine:
"""Running CoreOS VM with Quadlets installed.
With --keep-vm the VM is reused across runs: it is created only if it
does not already exist and is never destroyed on teardown.
"""
module_name = request.module.__name__.split(".")[-1].replace("test_", "").replace("_", "-")
vm = FCOSVirtualMachine(
cookbook_name=THIS_COOKBOOK_NAME,
instance_name=module_name,
keep=keep_vm,
virtiofs_dirs=virtiofs_dirs,
)
if not (keep_vm and vm.exists()):
fcos_ign = ensure_fcos_ign(THIS_COOKBOOK_DIR)
vm.ignition.ignition_files.append(fcos_ign)
vm.ignition.extra_files.update({
"/etc/quadlets/postgresql/config.env": (
textwrap.dedent(f"""
# This file is generated by conftest.py for testing purposes.
POSTGRES_USER=postgres
POSTGRES_PASSWORD=postgres
POSTGRES_DB=postgres
POSTGRES_HOST_AUTH_METHOD=scram-sha-256
POSTGRES_INITDB_ARGS=--auth-host=scram-sha-256
POSTGRES_ARGS=-h 127.0.0.1
PGPORT=5432
PG_MAJOR={PG_MAJOR_DEFAULT}
POSTGRES_BACKUP_RETENTION=7
"""),
0,
0,
0o600,
),
})
vm.ignition.ssh_key = test_ssh_pubkey
vm.create()
vm.wait_ssh(ssh_key=test_ssh_key, timeout=300)
yield vm # <-- tests run here with access to the VM instance
if not keep_vm:
vm.destroy()
"""
Verify that the postgresql Quadlet correctly restores a database from a backup.
"""
class TestPostgresqlQuadletRestore(helpers.TestPostgresqlQuadlet):
expected_pg_major = PG_MAJOR_DEFAULT
def test_data_is_still_there_after_restore(self, fcos_host):
"""Data created before the restore must still be there after the restore."""
# Check that the old data is still there after the restore
output = self._run_sql(fcos_host, "SELECT datname FROM pg_database WHERE datname = 'upgrade_path_db'")
assert output == "upgrade_path_db", f"Unexpected output from SQL query: {output}"
output = self._run_sql(fcos_host, "SELECT datname FROM pg_database WHERE datname = 'testdb'")
assert output == "testdb", f"Unexpected output from SQL query: {output}"
result = fcos_host.run(
"podman exec postgresql-server psql -U test -d testdb --csv -t -c %s", "SELECT 1 AS probe"
)
assert result.exit_status == 0, f"SQL query failed with exit code {result.exit_status}: {result.stderr}"
# Check that the upgrade_path table contains the initial postgresql version (14)
output = self._run_sql(fcos_host, "SELECT version FROM upgrade_path ORDER BY version ASC LIMIT 1", database="upgrade_path_db")
assert output.startswith("14."), f"Unexpected output from SQL query: {output}"

4
cookbooks/postgresql/tests/test_install.py

@ -88,7 +88,9 @@ def test_data_dir_exists(pg_host):
f = pg_host.file("/var/lib/quadlets/postgresql") f = pg_host.file("/var/lib/quadlets/postgresql")
assert f.is_directory assert f.is_directory
assert f.user == "postgresql" assert f.user == "postgresql"
assert f.user.uid == 10004
assert f.group == "itix-svc"
assert f.group.uid == 10000
def test_latest_symlink_exists(pg_host): def test_latest_symlink_exists(pg_host):
"""The 'latest' symlink must point to the active major-version directory.""" """The 'latest' symlink must point to the active major-version directory."""

59
cookbooks/postgresql/tests/test_security.py

@ -0,0 +1,59 @@
"""Test that a fresh PostgreSQL installation is secure.
These tests run against a brand-new VM booted from the cookbook's default
ignition (PG_MAJOR=14, example credentials). They verify:
- The PostgreSQL port is NOT exposed to the network.
- The PostgreSQL backup directory has the correct ownership and permissions.
"""
from pathlib import Path
import socket
# ---------------------------------------------------------------------------
# Network / socket
# ---------------------------------------------------------------------------
def test_postgresql_port_listening(pg_host):
"""PostgreSQL must be listening on 127.0.0.1:5432 (POSTGRES_ARGS=-h 127.0.0.1)."""
assert pg_host.socket("tcp://127.0.0.1:5432").is_listening
def test_postgresql_port_not_exposed(postgresql_vm):
"""PostgreSQL must NOT be exposed to the network."""
# Positive control: port 22 (SSH) must be reachable
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(3)
assert s.connect_ex((postgresql_vm.ip, 22)) == 0, (
f"Port 22 is NOT reachable from the host on {postgresql_vm.ip}!"
)
s.close()
# Negative control: port 23 must NOT be reachable
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(3)
assert s.connect_ex((postgresql_vm.ip, 23)) != 0, (
f"Port 23 is reachable from the host on {postgresql_vm.ip}!"
)
s.close()
# The real test: port 5432 must NOT be reachable
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(3)
assert s.connect_ex((postgresql_vm.ip, 5432)) != 0, (
f"Port 5432 is reachable from the host on {postgresql_vm.ip}!"
)
s.close()
# ---------------------------------------------------------------------------
# VirtioFS permissions (verified from the host — no SSH required)
# ---------------------------------------------------------------------------
def test_backup_directory_exists_in_virtiofs(virtiofs_dir: Path):
"""The postgresql/backup sub-directory must exist in the VirtioFS share."""
backup_root = virtiofs_dir / "postgresql" / "backup"
assert backup_root.exists(), f"Backup directory not found on host: {backup_root}"
# mode 0700 — world and group bits must be 0
assert backup_root.stat().st_mode & 0o077 == 0
assert backup_root.stat().st_uid == 10004, f"Backup directory must be owned by postgres (uid 10004), but got {backup_root.stat().st_uid}"
assert backup_root.stat().st_gid == 10000, f"Backup directory must be owned by postgres (gid 10000), but got {backup_root.stat().st_gid}"

2
scripts/common.mk

@ -135,7 +135,7 @@ pre-requisites::
exit 1; \ exit 1; \
fi fi
@set -Eeuo pipefail; \ @set -Eeuo pipefail; \
for tool in install systemctl systemd-analyze systemd-tmpfiles sysctl virt-install virsh qemu-img journalctl coreos-installer resize butane yq podlet pip3; do \ for tool in install systemctl systemd-analyze systemd-tmpfiles sysctl virt-install virsh qemu-img journalctl coreos-installer resize butane yq podlet pip3 ncat; do \
if ! which $$tool &>/dev/null ; then \ if ! which $$tool &>/dev/null ; then \
echo "$$tool is not installed. Please install it first." >&2; \ echo "$$tool is not installed. Please install it first." >&2; \
exit 1; \ exit 1; \

350
tests/fcos_vm.py

@ -0,0 +1,350 @@
"""
Fedora CoreOS VM lifecycle helpers for end-to-end testing.
Requires running as root (virt-install, virsh, qemu-img need root privileges).
Typical usage:
vm = FCOSVirtualMachine(
name="fcos-vm-abc123",
ignition_file=Path("/tmp/fcos-test.ign"),
virtiofs_dir=Path("/srv/fcos-test-abc123"),
)
vm.create()
vm.wait_ssh(ssh_key=key_path)
# ... run tests ...
vm.destroy()
"""
import base64
import re
import shutil
import subprocess
import tempfile
import textwrap
import time
from pathlib import Path
import os
LIBVIRT_IMAGES_DIR = Path("/var/lib/libvirt/images")
FCOS_BASE_IMAGE = LIBVIRT_IMAGES_DIR / "library" / "fedora-coreos.qcow2"
# Butane spec version — must match the project convention.
BUTANE_VERSION = "1.4.0"
def ensure_fcos_ign(cookbook_dir: Path) -> Path:
"""Return the path to fcos.ign, building it via ``make butane`` if absent."""
fcos_ign = cookbook_dir / "fcos.ign"
if not fcos_ign.exists():
subprocess.run(
["make", "-C", str(cookbook_dir), "butane"],
check=True,
)
return fcos_ign
class FCOSIgnition:
"""
Builds a Fedora CoreOS Ignition file, by merging multiple ignition files
and optionally injecting extra files.
All public methods are synchronous and raise on failure. The caller is
responsible for calling ``destroy()`` (typically from a pytest fixture
teardown).
"""
def __init__(self, ignition_files: list[Path], ssh_key: str | None = None, extra_files: dict[str, tuple[str | int, str | int, int, str]] | None = None) -> None:
"""
Args:
ignition_files: List of paths to the compiled Ignition (.ign) files.
ssh_key: Optional SSH key to inject into the Ignition.
extra_files: Optional dictionary of extra files to inject into the Ignition.
"""
self.ignition_files = [Path(f) for f in ignition_files]
self.extra_files = extra_files or {}
self.ssh_key = ssh_key
def _build_extra_files_butane(self) -> str | None:
"""Build the butane file content for the extra files specified in self.extra_files."""
if not self.extra_files:
return None
files = []
for path, (content, owner, group, mode) in self.extra_files.items():
file_desc = (
f" - path: {path}\n"
f" mode: {mode}\n"
f" overwrite: true\n"
f" user:\n"
+ (f" id: {owner}\n" if isinstance(owner, int) else f" name: {owner}\n") +
f" group:\n"
+ (f" id: {group}\n" if isinstance(group, int) else f" name: {group}\n") +
f' contents:\n'
f' inline: |\n'
)
# Prefix all lines of content with 10 spaces (2 for indentation + 8 for the literal block)
indented_content = textwrap.indent(content + "\n", " " * 10)
file_desc += indented_content + "\n"
files.append(file_desc)
header = textwrap.dedent(f"""\
variant: fcos
version: {BUTANE_VERSION}
storage:
files:
""")
joined = "\n".join(files)
return f"{header}{joined}\n"
def _build_ssh_key_butane(self) -> str | None:
"""Build the butane file content that inject the public ssh key (self.ssh_key) into the root's authorized_keys."""
if not self.ssh_key:
return None
content = textwrap.dedent(f"""\
variant: fcos
version: {BUTANE_VERSION}
passwd:
users:
- name: root
ssh_authorized_keys:
- {self.ssh_key}
""")
return content
def build(self, output: Path) -> Path:
"""Build the final Ignition file by merging the base files and the extra files."""
try:
_tmpdir = tempfile.TemporaryDirectory(delete=False)
d = Path(_tmpdir.name)
extra_files_butane = self._build_extra_files_butane()
ssh_key_butane = self._build_ssh_key_butane()
test_bu = textwrap.dedent(f"""\
variant: fcos
version: {BUTANE_VERSION}
systemd:
units:
# Disable & mask zincati to avoid reboots during testing.
- name: zincati.service
enabled: false
mask: true
ignition:
config:
merge:
""")
for ign in self.ignition_files:
test_bu += f" - local: {ign.name}\n"
shutil.copy(ign, d / ign.name)
if extra_files_butane:
extra_files_bu = d / "test_extra_files.bu"
extra_files_bu.write_text(extra_files_butane)
extra_files_path = d / "test_extra_files.ign"
subprocess.run(
["butane", "--strict", "-o", str(extra_files_path), str(extra_files_bu)],
check=True,
capture_output=True,
)
test_bu += f" - local: {extra_files_path.name}\n"
if ssh_key_butane:
ssh_key_bu = d / "test_ssh_key.bu"
ssh_key_bu.write_text(ssh_key_butane)
ssh_key_path = d / "test_ssh_key.ign"
subprocess.run(
["butane", "--strict", "-o", str(ssh_key_path), str(ssh_key_bu)],
check=True,
capture_output=True,
)
test_bu += f" - local: {ssh_key_path.name}\n"
test_bu_path = d / "test.bu"
test_bu_path.write_text(test_bu)
subprocess.run(
[
"butane",
"--strict",
"-d", str(d),
"-o", str(output),
str(test_bu_path),
],
check=True,
capture_output=True,
)
except subprocess.CalledProcessError as e:
print(f"Error occurred while running butane: {e.stderr.decode()}")
# Keep the temporary directory for debugging
print(f"Temporary directory retained at: {_tmpdir.name}")
raise e
else:
# Clean up the temporary directory if it still exists
if Path(_tmpdir.name).exists():
shutil.rmtree(_tmpdir.name)
return output
class FCOSVirtualMachine:
"""Manages a Fedora CoreOS KVM virtual machine for end-to-end testing.
All public methods are synchronous and raise on failure. The caller is
responsible for calling ``destroy()`` (typically from a pytest fixture
teardown).
"""
def __init__(self, cookbook_name: str, instance_name: str, keep: bool = False, ignition: FCOSIgnition = FCOSIgnition([]), virtiofs_dirs: list[tuple[Path, str]] = [], vm_config: tuple[int, int, int, int] = (4096, 2, 50, 100)) -> None:
"""
Args:
cookbook_name: Short identifier appended to "fcos-test-" to form the
libvirt domain name. Keep it unique across parallel tests.
ignition: FCOSIgnition instance to build the Ignition (.ign) file.
virtiofs_dirs: List of host directories and virtiofs target directories that will be exposed inside the VM.
"""
if keep:
self.vm_name = f"fcos-test-{cookbook_name}-{instance_name}-dev"
else:
self.vm_name = f"fcos-test-{cookbook_name}-{instance_name}-{os.getpid()}"
self.ignition = ignition
self.virtiofs_dirs = virtiofs_dirs
self.vm_config = vm_config
self._images_dir = LIBVIRT_IMAGES_DIR / self.vm_name
self._ip: str | None = None
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def exists(self) -> bool:
"""Return True if a libvirt domain with this VM's name already exists."""
result = subprocess.run(
["virsh", "domstate", self.vm_name],
capture_output=True,
)
return result.returncode == 0
def create(self) -> None:
"""Create disk images and start the VM via virt-install."""
self._images_dir.mkdir(parents=True, exist_ok=True)
for host_dir, target_dir in self.virtiofs_dirs:
Path(host_dir).mkdir(parents=True, exist_ok=True)
ign_dest = self._images_dir / "fcos.ign"
self.ignition.build(ign_dest)
ign_dest.chmod(0o644)
(ram, vcpus, root_disk_size, var_disk_size) = self.vm_config
# Root OS disk: copy the base image, then resize it.
root_qcow2 = self._images_dir / "root.qcow2"
shutil.copy(FCOS_BASE_IMAGE, root_qcow2)
subprocess.run(
["qemu-img", "resize", "-f", "qcow2", str(root_qcow2), f"{root_disk_size}G"],
check=True,
)
# Secondary disk for /var (keeps OS and data separate, matches common.mk).
var_qcow2 = self._images_dir / "var.qcow2"
subprocess.run(
["qemu-img", "create", "-f", "qcow2", str(var_qcow2), f"{var_disk_size}G"],
check=True,
)
virtiofs_options = []
for i, (host_dir, target_dir) in enumerate(self.virtiofs_dirs):
virtiofs_options += [
f"--filesystem=type=mount,accessmode=passthrough,"
f"driver.type=virtiofs,driver.queue=1024,"
f"source.dir={host_dir},target.dir={target_dir}"
]
subprocess.run(
[
"virt-install",
f"--name={self.vm_name}",
"--import",
"--noautoconsole",
f"--ram={ram}",
f"--vcpus={vcpus}",
"--os-variant=fedora-coreos-stable",
f"--disk=path={root_qcow2},format=qcow2",
f"--disk=path={var_qcow2},format=qcow2",
f"--qemu-commandline=-fw_cfg name=opt/com.coreos/config,file={ign_dest}",
"--network=network=default,model=virtio",
"--console=pty,target.type=virtio",
"--serial=pty",
"--graphics=none",
"--boot=uefi",
"--memorybacking=access.mode=shared,source.type=memfd",
] + virtiofs_options,
check=True,
)
def destroy(self) -> None:
"""Forcefully stop and delete the VM and all associated disk images."""
subprocess.run(["virsh", "destroy", self.vm_name], capture_output=True)
subprocess.run(
["virsh", "undefine", self.vm_name, "--nvram"],
capture_output=True,
)
if self._images_dir.exists():
shutil.rmtree(self._images_dir)
for host_dir, _ in self.virtiofs_dirs:
if Path(host_dir).exists():
shutil.rmtree(host_dir)
# ------------------------------------------------------------------
# Readiness polling
# ------------------------------------------------------------------
def get_ip(self) -> str | None:
"""Return the VM's primary IPv4 address reported by virsh, or None."""
result = subprocess.run(
["virsh", "domifaddr", self.vm_name],
capture_output=True,
text=True,
)
if result.returncode != 0:
return None
match = re.search(r"(\d+\.\d+\.\d+\.\d+)", result.stdout)
return match.group(1) if match else None
@property
def ip(self) -> str:
if self._ip is None:
self._ip = self.get_ip()
if self._ip is None:
raise RuntimeError(f"VM {self.vm_name!r} has no IP address yet")
return self._ip
def wait_ssh(self, ssh_key: Path, timeout: int = 300) -> str:
"""Block until SSH is reachable. Returns the IP address.
Polls every 5 seconds until ``timeout`` seconds have elapsed.
"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
ip = self.get_ip()
if ip:
try:
result = subprocess.run(
[
"ssh",
"-i", str(ssh_key),
"-o", "StrictHostKeyChecking=no",
"-o", "UserKnownHostsFile=/dev/null",
"-o", "ConnectTimeout=5",
"-o", "BatchMode=yes",
f"root@{ip}",
"true",
],
capture_output=True,
timeout=10,
)
if result.returncode == 0:
self._ip = ip
return ip
except subprocess.TimeoutExpired:
pass
time.sleep(5)
raise TimeoutError(
f"VM {self.vm_name!r} did not become SSH-ready within {timeout}s"
)

291
tests/test_quadlet.py

@ -0,0 +1,291 @@
import socket
import json
import time
class TestQuadlet:
"""
Run common tests for Quadlet cookbooks.
All public methods are synchronous and raise on failure.
"""
expected_services : list[dict[str, str | bool]] = [
# Example:
# { "name": "postgresql.service", "state": "active", "masked": False, "enabled": True, "exists": True },
]
"""
Expected state of systemd services. Each dict must contain a "name" field with the service name, and may optionally contain:
- "state": one of "active", "inactive", "failed" (optional)
- "masked": boolean (optional)
- "enabled": boolean (optional)
- "exists": boolean (optional)
Optional fields are not checked if missing.
If "exists" is False, no other fields are checked.
"""
expected_sockets : list[dict[str, str]] = [
# Example:
# { "uri": "tcp://127.0.0.1:5432", "state": "listening" },
]
"""
Expected state of sockets. Each dict must contain a "uri" field with the socket URI, and a "state" field with one of "listening" or "closed".
"""
# all fields are mandatory
expected_ports : list[dict[str, str | int]] = [
# Example:
# { "number": 5432, "protocol": "tcp", "state": "closed" },
# { "number": 22, "protocol": "tcp", "state": "open" },
]
"""
Expected state of TCP ports as seen from the machine running pytest. Each dict must contain:
- "number": port number
- "protocol": currently only "tcp" is supported
- "state": one of "open" (accepting connections) or "closed"
"""
expected_files : list[dict[str, str | int]] = [
# Example:
# { "path": "/var/lib/quadlets/postgresql", "type": "directory", "owner": "postgresql", "group": "itix-svc", "mode": 0o755 },
]
"""
Expected files on the VM. Each dict must contain:
- "path": full path to the file
- "type": "directory", "file" or "none" (if the file is expected to not exist)
Optional fields:
- "owner": expected owner username
- "group": expected group name
- "mode": expected file mode as an integer (e.g. 0o755)
If an optional field is missing, it is not checked.
"""
expected_podman_images : list[dict[str, str]] = [
# Example:
# { "name": "docker.io/library/postgres", "tag": "15", "state": "present" },
]
"""
Expected Podman images. Each dict must contain:
- "name": image name (e.g. "docker.io/library/postgres")
- "tag": image tag (e.g. "15")
- "state": one of "present" or "absent"
"""
expected_podman_containers : list[dict[str, str | dict[str, str]]] = [
# Example:
# { "name": "postgresql-server", "state": "present", "pid1": { "owner": "10004", "group": "10000", "commandline": "postgres -h 127.0.0.1" } },
]
"""
Expected Podman containers. Each dict must contain:
- "name": container name
- "state": one of "present" or "absent"
Optional field:
- "pid1": dict with expected properties of the container's main process (PID 1). May contain:
- "owner": expected uid (numeric) of the process as seen from outside the container (i.e. on the host)
- "group": expected gid (numeric) of the process as seen from outside the container (i.e. on the host)
- "commandline": expected command line of the process
"""
expected_main_service : str | None = None
"""
If not None, the name of the main service to wait for before running any tests.
"""
expected_main_service_timeout : int = 120
"""
If expected_main_service is set, the number of seconds to wait for it to become active before giving up and failing the tests.
"""
def test_wait_for_main_service(self, fcos_host):
"""Wait for the expected main service to become active before running any other tests."""
if self.expected_main_service is None:
return
self.wait_for_service(fcos_host, self.expected_main_service, self.expected_main_service_timeout)
def wait_for_service(self, fcos_host, service: str, timeout: int = 120) -> None:
"""Block until *service* reaches the ``active`` state."""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
result = fcos_host.run(
f"systemctl is-active {service}", check=False
)
if result.stdout.strip() == "active":
return
time.sleep(5)
status = fcos_host.run(
f"systemctl status {service} --no-pager", check=False
)
raise TimeoutError(
f"Service {service!r} not active after {timeout}s:\n{status.stdout}"
)
def wait_for_unit_done(self, fcos_host, unit: str, timeout: int = 120) -> str:
"""
Block until a oneshot service finishes (``inactive`` or ``failed``).
Returns:
The final state string: ``"inactive"`` on success, ``"failed"``
on failure.
"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
result = fcos_host.run(
f"systemctl is-active {unit}", check=False
)
state = result.stdout.strip()
if state in ("inactive", "failed"):
return state
time.sleep(5)
raise TimeoutError(
f"Unit {unit!r} did not finish within {timeout}s"
)
def test_expected_services(self, fcos_host):
"""The expected systemd services must be present and in the expected state."""
self.check_expected_services(fcos_host, self.expected_services)
def check_expected_services(self, fcos_host, expected_services: list[dict[str, str | bool]]) -> None:
"""The expected systemd services must be present and in the expected state."""
for svc in expected_services:
service = fcos_host.service(svc["name"])
if "exists" in svc:
if svc["exists"]:
assert service.exists, f"Service {svc['name']} does not exist"
else:
assert not service.exists, f"Service {svc['name']} exists but should not"
continue # if the service shouldn't exist, no need to check other properties
if "masked" in svc:
if svc["masked"]:
assert service.is_masked, f"Service {svc['name']} is not masked"
else:
assert not service.is_masked, f"Service {svc['name']} is masked but should not"
if "enabled" in svc:
if svc["enabled"]:
assert service.is_enabled, f"Service {svc['name']} is not enabled"
else:
assert not service.is_enabled, f"Service {svc['name']} is enabled but should not"
if "state" in svc:
if svc["state"] == "active":
assert service.is_running, f"Service {svc['name']} is not running"
elif svc["state"] == "inactive":
assert not service.is_running, f"Service {svc['name']} is running but expected to be inactive"
elif svc["state"] == "failed":
result = fcos_host.run(f"systemctl is-failed {svc['name']}")
assert result.rc == 0, f"Service {svc['name']} is not in failed state"
else:
raise ValueError(f"Invalid state for service {svc['name']}: {svc['state']}")
def test_expected_sockets(self, fcos_host):
"""The expected sockets must be present and in the expected state."""
self.check_expected_sockets(fcos_host, self.expected_sockets)
def check_expected_sockets(self, fcos_host, expected_sockets: list[dict[str, str]]) -> None:
"""The expected sockets must be present and in the expected state."""
for sock in expected_sockets:
socket = fcos_host.socket(sock["uri"])
if sock["state"] == "listening":
assert socket.is_listening, f"Socket {sock['uri']} is not listening"
elif sock["state"] == "closed":
assert not socket.is_listening, f"Socket {sock['uri']} is listening but expected to be closed"
else:
raise ValueError(f"Invalid state for socket {sock['uri']}: {sock['state']}")
def test_expected_ports(self, fcos_vm):
"""The expected TCP ports must be in the expected state."""
self.check_expected_ports(fcos_vm, self.expected_ports)
def check_expected_ports(self, fcos_vm, expected_ports: list[dict[str, str]]) -> None:
"""The expected TCP ports must be in the expected state."""
for port in expected_ports:
assert port["protocol"] == "tcp", f"Unsupported protocol {port['protocol']} for port {port['number']}"
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(3)
connect_result = s.connect_ex((fcos_vm.ip, port["number"]))
if port["state"] == "open":
assert connect_result == 0, f"Port {port['number']} is NOT reachable from the host on {fcos_vm.ip}!"
elif port["state"] == "closed":
assert connect_result != 0, f"Port {port['number']} is reachable from the host on {fcos_vm.ip} but expected to be closed"
else:
raise ValueError(f"Invalid state for port {port['number']}/{port['protocol']}: {port['state']}")
s.close()
def test_expected_files(self, fcos_host):
"""The expected files must be in the expected state."""
self.check_expected_files(fcos_host, self.expected_files)
def check_expected_files(self, fcos_host, expected_files: list[dict[str, str | int]]) -> None:
"""The expected files must be in the expected state."""
for f in expected_files:
file = fcos_host.file(f["path"])
if f["type"] == "directory":
assert file.is_directory, f"Expected {f['path']} to be a directory"
elif f["type"] == "file":
assert file.is_file, f"Expected {f['path']} to be a regular file"
elif f["type"] == "none":
assert not file.exists, f"Expected {f['path']} to not exist"
continue # if the file shouldn't exist, no need to check other properties
else:
raise ValueError(f"Invalid type for expected file {f['path']}: {f['type']}")
if "owner" in f:
assert file.user == f["owner"], f"Expected {f['path']} to be owned by {f['owner']}, but got {file.user}"
if "group" in f:
assert file.group == f["group"], f"Expected {f['path']} to belong to group {f['group']}, but got {file.group}"
if "mode" in f:
assert file.mode == f["mode"], f"Expected {f['path']} to have mode {oct(f['mode'])}, but got {oct(file.mode)}"
def test_expected_podman_images(self, fcos_host):
"""The expected Podman images must be in the expected state."""
self.check_expected_podman_images(fcos_host, self.expected_podman_images)
def check_expected_podman_images(self, fcos_host, expected_podman_images: list[dict[str, str]]) -> None:
"""The expected Podman images must be in the expected state."""
for img in expected_podman_images:
result = fcos_host.run(f"podman image exists {img['name']}:{img['tag']}")
if img["state"] == "present":
assert result.rc == 0, f"Podman image {img['name']}:{img['tag']} does not exist"
elif img["state"] == "absent":
assert result.rc != 0, f"Podman image {img['name']}:{img['tag']} is present but expected to be absent"
else:
raise ValueError(f"Invalid state for Podman image {img['name']}:{img['tag']}: {img['state']}")
def test_expected_podman_containers(self, fcos_host):
"""The expected Podman containers must be in the expected state."""
self.check_expected_podman_containers(fcos_host, self.expected_podman_containers)
def check_expected_podman_containers(self, fcos_host, expected_podman_containers: list[dict[str, str]]) -> None:
"""The expected Podman containers must be in the expected state."""
for container in expected_podman_containers:
result = fcos_host.run(f"podman container inspect {container['name']}")
if container["state"] == "present":
assert result.rc == 0, f"Podman container {container['name']} does not exist"
elif container["state"] == "absent":
assert result.rc != 0, f"Podman container {container['name']} is present but expected to be absent"
else:
raise ValueError(f"Invalid state for Podman container {container['name']}: {container['state']}")
if result.rc == 0 and "pid1" in container:
try:
result_json = json.loads(result.stdout)[0]
except json.JSONDecodeError as e:
raise AssertionError(f"Failed to parse JSON output from podman inspect for container {container['name']}: {e}\nOutput was: {result_json}")
pid = result_json["State"]["Pid"]
result = fcos_host.run(f"ps axn -o pid,user,group,state,command -q {pid} --no-header")
if result.rc != 0:
raise AssertionError(f"Failed to inspect PID 1 of container {container['name']} with nsenter: rc = {result.rc}")
pid1_info = result.stdout.strip().split(None, 4)
if len(pid1_info) < 5:
raise AssertionError(f"Unexpected output from ps for PID 1 of container {container['name']}: {result.stdout}")
pid1_pid = pid1_info[0]
pid1_user = pid1_info[1]
pid1_group = pid1_info[2]
pid1_commandline = pid1_info[4]
assert int(pid1_pid) == pid, f"Expected PID {pid} for container {container['name']} main process, but got {pid1_pid}"
if "owner" in container["pid1"]:
assert pid1_user == container["pid1"]["owner"], f"Expected PID 1 of container {container['name']} to be owned by {container['pid1']['owner']}, but got {pid1_user}"
if "group" in container["pid1"]:
assert pid1_group == container["pid1"]["group"], f"Expected PID 1 of container {container['name']} to belong to group {container['pid1']['group']}, but got {pid1_group}"
if "commandline" in container["pid1"]:
assert pid1_commandline == container["pid1"]["commandline"], f"Expected PID 1 of container {container['name']} to have command line {container['pid1']['commandline']}, but got {pid1_commandline}"

384
tests/vm.py

@ -1,384 +0,0 @@
"""Fedora CoreOS VM lifecycle helpers for end-to-end testing.
Requires running as root (virt-install, virsh, qemu-img need root privileges).
Typical usage:
vm = FCOSVirtualMachine(
name="postgresql-abc123",
ignition_file=Path("/tmp/fcos-test.ign"),
virtiofs_dir=Path("/srv/fcos-test-postgresql-abc123"),
)
vm.create()
vm.wait_ssh(ssh_key=key_path)
vm.wait_for_service("postgresql.target", ssh_key=key_path)
# ... run tests ...
vm.destroy()
"""
import base64
import re
import shutil
import subprocess
import tempfile
import textwrap
import time
from pathlib import Path
LIBVIRT_IMAGES_DIR = Path("/var/lib/libvirt/images")
FCOS_BASE_IMAGE = LIBVIRT_IMAGES_DIR / "library" / "fedora-coreos.qcow2"
# Butane spec version — must match the project convention.
BUTANE_VERSION = "1.4.0"
def ensure_fcos_ign(cookbook_dir: Path) -> Path:
"""Return the path to fcos.ign, building it via ``make butane`` if absent."""
fcos_ign = cookbook_dir / "fcos.ign"
if not fcos_ign.exists():
subprocess.run(
["make", "-C", str(cookbook_dir), "butane"],
check=True,
)
return fcos_ign
def build_test_ignition(
base_ignition: Path,
ssh_pubkey: str,
output: Path,
config_env_overrides: dict[str, str] | None = None,
extra_files: dict[str, tuple[str, int]] | None = None,
) -> Path:
"""Build a test ignition file by overlaying the cookbook's fcos.ign.
The overlay:
- Merges the base cookbook ignition (fcos.ign).
- Adds the test SSH public key to the root user so the test runner can
SSH in (FCOS allows root login with keys via PermitRootLogin
prohibit-password).
- Optionally patches /etc/quadlets/postgresql/config.env via
``config_env_overrides`` (merged on top of whatever the base ignition
already sets).
- Optionally injects arbitrary extra files via ``extra_files``:
``{"/path/on/vm": ("file content", 0o644)}``.
Args:
base_ignition: Path to the pre-built fcos.ign for the cookbook.
ssh_pubkey: Ed25519 public key string to inject for root.
output: Destination path for the compiled test ignition.
config_env_overrides: Key/value pairs to override in config.env.
The full config.env is re-written with these values merged on
top of the defaults from the base ignition.
extra_files: Additional files to inject into the VM image.
Returns:
``output`` path.
"""
with tempfile.TemporaryDirectory() as _tmpdir:
d = Path(_tmpdir)
# butane resolves "local:" references relative to the directory passed
# via -d; copy the base ignition there.
shutil.copy(base_ignition, d / "base.ign")
# Build the storage.files section of the overlay.
storage_section = _build_storage_section(config_env_overrides, extra_files)
overlay_bu = textwrap.dedent(f"""\
variant: fcos
version: {BUTANE_VERSION}
ignition:
config:
merge:
- local: base.ign
passwd:
users:
- name: root
ssh_authorized_keys:
- {ssh_pubkey}
systemd:
units:
# Disable & mask zincati to avoid reboots during testing.
- name: zincati.service
enabled: false
mask: true
""")
if storage_section:
overlay_bu += storage_section
overlay_bu_path = d / "test-overlay.bu"
overlay_bu_path.write_text(overlay_bu)
subprocess.run(
[
"butane",
"--strict",
"-d", str(d),
"-o", str(output),
str(overlay_bu_path),
],
check=True,
)
return output
def _build_storage_section(
config_env_overrides: dict[str, str] | None,
extra_files: dict[str, tuple[str, int]] | None,
) -> str:
"""Return a Butane ``storage:`` YAML block (or empty string if nothing to inject)."""
files = []
if config_env_overrides:
content = "\n".join(f"{k}={v}" for k, v in config_env_overrides.items()) + "\n"
files.append(
_butane_file("/etc/quadlets/postgresql/config.env", content, 0o600)
)
if extra_files:
for path, (content, mode) in extra_files.items():
files.append(_butane_file(path, content, mode))
if not files:
return ""
joined = "\n".join(files)
return f"storage:\n files:\n{joined}\n"
def _butane_file(path: str, content: str, mode: int) -> str:
"""Return a Butane file entry using a base64 data URI (avoids YAML quoting)."""
b64 = base64.b64encode(content.encode()).decode()
return (
f" - path: {path}\n"
f" mode: {mode}\n"
f" contents:\n"
f' source: "data:text/plain;base64,{b64}"\n'
)
class FCOSVirtualMachine:
"""Manages a Fedora CoreOS KVM virtual machine for end-to-end testing.
All public methods are synchronous and raise on failure. The caller is
responsible for calling ``destroy()`` (typically from a pytest fixture
teardown).
"""
def __init__(self, name: str, ignition_file: Path, virtiofs_dir: Path) -> None:
"""
Args:
name: Short identifier appended to "fcos-test-" to form the
libvirt domain name. Keep it unique across parallel tests.
ignition_file: Path to the compiled Ignition (.ign) file.
virtiofs_dir: Host directory that will be exposed inside the VM
at /var/lib/virtiofs/data via VirtioFS.
"""
self.name = name
self.vm_name = f"fcos-test-{name}"
self.ignition_file = Path(ignition_file)
self.virtiofs_dir = Path(virtiofs_dir)
self._images_dir = LIBVIRT_IMAGES_DIR / self.vm_name
self._ip: str | None = None
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def create(self) -> None:
"""Create disk images and start the VM via virt-install."""
self._images_dir.mkdir(parents=True, exist_ok=True)
self.virtiofs_dir.mkdir(parents=True, exist_ok=True)
ign_dest = self._images_dir / "fcos.ign"
shutil.copy(self.ignition_file, ign_dest)
ign_dest.chmod(0o644)
# Root OS disk: copy from the shared base QCOW2 image.
root_qcow2 = self._images_dir / "root.qcow2"
shutil.copy(FCOS_BASE_IMAGE, root_qcow2)
# Secondary disk for /var (keeps OS and data separate, matches common.mk).
var_qcow2 = self._images_dir / "var.qcow2"
subprocess.run(
["qemu-img", "create", "-f", "qcow2", str(var_qcow2), "100G"],
check=True,
)
subprocess.run(
[
"virt-install",
f"--name={self.vm_name}",
"--import",
"--noautoconsole",
"--ram=4096",
"--vcpus=2",
"--os-variant=fedora-coreos-stable",
f"--disk=path={root_qcow2},format=qcow2,size=50",
f"--disk=path={var_qcow2},format=qcow2",
f"--qemu-commandline=-fw_cfg name=opt/com.coreos/config,file={ign_dest}",
"--network=network=default,model=virtio",
"--console=pty,target.type=virtio",
"--serial=pty",
"--graphics=none",
"--boot=uefi",
"--memorybacking=access.mode=shared,source.type=memfd",
(
f"--filesystem=type=mount,accessmode=passthrough,"
f"driver.type=virtiofs,driver.queue=1024,"
f"source.dir={self.virtiofs_dir},target.dir=data"
),
],
check=True,
)
def destroy(self) -> None:
"""Forcefully stop and delete the VM and all associated disk images."""
subprocess.run(["virsh", "destroy", self.vm_name], capture_output=True)
subprocess.run(
["virsh", "undefine", self.vm_name, "--nvram"],
capture_output=True,
)
if self._images_dir.exists():
shutil.rmtree(self._images_dir)
if self.virtiofs_dir.exists():
shutil.rmtree(self.virtiofs_dir)
# ------------------------------------------------------------------
# Readiness polling
# ------------------------------------------------------------------
def get_ip(self) -> str | None:
"""Return the VM's primary IPv4 address reported by virsh, or None."""
result = subprocess.run(
["virsh", "domifaddr", self.vm_name],
capture_output=True,
text=True,
)
if result.returncode != 0:
return None
match = re.search(r"(\d+\.\d+\.\d+\.\d+)", result.stdout)
return match.group(1) if match else None
@property
def ip(self) -> str:
if self._ip is None:
self._ip = self.get_ip()
if self._ip is None:
raise RuntimeError(f"VM {self.vm_name!r} has no IP address yet")
return self._ip
def wait_ssh(self, ssh_key: Path, timeout: int = 300) -> str:
"""Block until SSH is reachable. Returns the IP address.
Polls every 5 seconds until ``timeout`` seconds have elapsed.
"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
ip = self.get_ip()
if ip:
try:
result = subprocess.run(
[
"ssh",
"-i", str(ssh_key),
"-o", "StrictHostKeyChecking=no",
"-o", "UserKnownHostsFile=/dev/null",
"-o", "ConnectTimeout=5",
"-o", "BatchMode=yes",
f"root@{ip}",
"true",
],
capture_output=True,
timeout=10,
)
if result.returncode == 0:
self._ip = ip
return ip
except subprocess.TimeoutExpired:
pass
time.sleep(5)
raise TimeoutError(
f"VM {self.vm_name!r} did not become SSH-ready within {timeout}s"
)
def wait_for_service(
self, service: str, ssh_key: Path, timeout: int = 120
) -> None:
"""Block until *service* reaches the ``active`` state."""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
result = self.ssh_run(
f"systemctl is-active {service}", ssh_key, check=False
)
if result.stdout.strip() == "active":
return
time.sleep(5)
status = self.ssh_run(
f"systemctl status {service} --no-pager", ssh_key, check=False
)
raise TimeoutError(
f"Service {service!r} not active after {timeout}s:\n{status.stdout}"
)
def wait_for_unit_done(
self, service: str, ssh_key: Path, timeout: int = 120
) -> str:
"""Block until a oneshot service finishes (``inactive`` or ``failed``).
Returns:
The final state string: ``"inactive"`` on success, ``"failed"``
on failure.
"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
result = self.ssh_run(
f"systemctl is-active {service}", ssh_key, check=False
)
state = result.stdout.strip()
if state in ("inactive", "failed"):
return state
time.sleep(5)
raise TimeoutError(
f"Service {service!r} did not finish within {timeout}s"
)
# ------------------------------------------------------------------
# Remote execution
# ------------------------------------------------------------------
def ssh_run(
self,
command: str,
ssh_key: Path,
check: bool = True,
) -> subprocess.CompletedProcess:
"""Run a shell command in the VM via SSH.
Args:
command: Shell command string passed to the remote bash.
ssh_key: Path to the private key used for authentication.
check: If True (default), raise RuntimeError on non-zero exit.
Returns:
CompletedProcess with stdout/stderr as text.
"""
result = subprocess.run(
[
"ssh",
"-i", str(ssh_key),
"-o", "StrictHostKeyChecking=no",
"-o", "UserKnownHostsFile=/dev/null",
f"root@{self.ip}",
command,
],
capture_output=True,
text=True,
)
if check and result.returncode != 0:
raise RuntimeError(
f"SSH command failed (exit {result.returncode}): {command!r}\n"
f"stdout: {result.stdout}\nstderr: {result.stderr}"
)
return result
Loading…
Cancel
Save