Latent Component Gaussian Process (LCGP): replicated 1D illustration#

The experiment compares LCGP behavior under three replication designs and two training modes:

Axis

Options

Replication design

uniform, skewed, hotspot

Training mode

replicated-data reduction (rep) and full-data training (full)

Figure type

output predictions (y) and latent GP diagnostics (g)

The notebook is deterministic: each (case, submethod) run receives its own fixed random seed.

Execution requirements#

This page expects the following packages to already be available in the JupyterBook build environment or to be installed during runtime:

  lcgp
  pandas
  matplotlib
  tensorflow-probability[tf]

Imports and global configuration#

Hide code cell source

from __future__ import annotations

import time
from dataclasses import dataclass
from pathlib import Path

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from IPython.display import Markdown, display

from call_model import LCGPRun
from lcgp import evaluation

plt.rcParams.update({
    "figure.dpi": 110,
    "savefig.dpi": 150,
    "axes.grid": True,
    "grid.alpha": 0.25,
})
Matplotlib is building the font cache; this may take a moment.
---------------------------------------------------------------------------
ModuleNotFoundError                       Traceback (most recent call last)
Cell In[1], line 12
      8 import pandas as pd
      9 import matplotlib.pyplot as plt
     10 from IPython.display import Markdown, display
     11 
---> 12 from call_model import LCGPRun
     13 from lcgp import evaluation
     14 
     15 plt.rcParams.update({

File ~/checkouts/readthedocs.org/user_builds/lcgp/checkouts/lcgp-r/docs/call_model.py:1
----> 1 from lcgp import LCGP
      2 import numpy as np
      5 class SuperRun:

ModuleNotFoundError: No module named 'lcgp'
# All options executed by this JupyterBook page.
CASES = (1, 2, 3)
SUBMETHODS = ("rep", "full")
PLOT_MODES = ("y", "g")

BASE_SEED = 42
RESULTS_ROOT = Path("results_figure_jupyterbook")
RESULTS_ROOT.mkdir(parents=True, exist_ok=True)

CASE_LABELS = {
    1: "Uniform replication",
    2: "Skewed replication in [0.20, 0.45]",
    3: "Hotspot replication at selected x locations",
}

True function#

The input is one-dimensional, (x \in [0,1]), while the response has three output dimensions:

[ y(x) = \begin{bmatrix} f_1(x) \ f_2(x) \ f_3(x) \end{bmatrix}. ]

The three outputs share the same scalar input but have different shapes. This gives a compact multi-output regression problem where replication can affect both mean estimation and uncertainty quantification.

Hide code cell source

def f_true(x: np.ndarray) -> np.ndarray:
    """Three-output ground-truth function evaluated at a one-dimensional input."""
    x = np.asarray(x, dtype=np.float64)
    f1 = 0.8 + 0.3 * np.sin(2 * np.pi * x) + 0.2 * x
    f2 = 0.3 + 0.5 * np.cos(2 * np.pi * x)
    f3 = -0.4 - (x - 0.5) ** 2 + 0.2 * np.sin(4 * np.pi * x)
    return np.vstack([f1, f2, f3])

Replicated data generators#

Hide code cell source

def make_rep_data(
    n_unique: int = 12,
    rep_choices: tuple[int, ...] = (1, 2, 3, 4),
    noise_std: tuple[float, float, float] = (0.05, 0.08, 0.10),
    rng: np.random.Generator | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """Case 1: roughly uniform replication over the input domain."""
    rng = np.random.default_rng(BASE_SEED) if rng is None else rng
    x_unique = np.linspace(0.0, 1.0, n_unique, dtype=np.float64)
    r = rng.choice(rep_choices, size=n_unique, replace=True)

    xs, ys = [], []
    for i, xi in enumerate(x_unique):
        yi_true = f_true(np.array([xi]))[:, 0]
        for _ in range(int(r[i])):
            eps = rng.normal(0, noise_std, size=3).astype(np.float64)
            xs.append([xi])
            ys.append(yi_true + eps)

    xtrain = np.asarray(xs, dtype=np.float64)
    ytrain = np.asarray(ys, dtype=np.float64).T
    xtest = np.linspace(0.0, 1.0, 400, dtype=np.float64)[:, None]
    ytrue = f_true(xtest[:, 0])
    return xtrain, ytrain, xtest, ytrue


def make_rep_data_skewed(
    n_unique: int = 40,
    heavy_region: tuple[float, float] = (0.20, 0.45),
    light_rep_choices: tuple[int, ...] = (1, 2),
    heavy_rep_choices: tuple[int, ...] = (8, 12, 16, 20),
    noise_std: tuple[float, float, float] = (0.05, 0.08, 0.10),
    rng: np.random.Generator | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """Case 2: many replicates in a contiguous subregion."""
    rng = np.random.default_rng(BASE_SEED) if rng is None else rng
    x_unique = np.linspace(0.0, 1.0, n_unique, dtype=np.float64)

    xs, ys = [], []
    for xi in x_unique:
        choices = heavy_rep_choices if heavy_region[0] <= xi <= heavy_region[1] else light_rep_choices
        reps = int(rng.choice(choices))
        yi_base = f_true(np.array([xi]))[:, 0]
        for _ in range(reps):
            eps = np.asarray([rng.normal(0, s) for s in noise_std], dtype=np.float64)
            xs.append([xi])
            ys.append(yi_base + eps)

    xtrain = np.asarray(xs, dtype=np.float64)
    ytrain = np.asarray(ys, dtype=np.float64).T
    xtest = np.linspace(0.0, 1.0, 400, dtype=np.float64)[:, None]
    ytrue = f_true(xtest[:, 0])
    return xtrain, ytrain, xtest, ytrue


def make_rep_data_hotspots(
    n_unique: int = 50,
    hotspots: tuple[tuple[float, int, int], ...] = ((0.15, 10, 15), (0.50, 18, 25), (0.80, 12, 20)),
    base_rep_choices: tuple[int, ...] = (1,),
    noise_std: tuple[float, float, float] = (0.05, 0.08, 0.10),
    rng: np.random.Generator | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """Case 3: very high replication at selected hotspot locations."""
    rng = np.random.default_rng(BASE_SEED) if rng is None else rng
    x_unique = np.linspace(0.0, 1.0, n_unique, dtype=np.float64)
    hotspot_idx = {
        int(np.argmin(np.abs(x_unique - x0))): (lo, hi)
        for (x0, lo, hi) in hotspots
    }

    xs, ys = [], []
    for i, xi in enumerate(x_unique):
        if i in hotspot_idx:
            lo, hi = hotspot_idx[i]
            reps = int(rng.integers(lo, hi + 1))
        else:
            reps = int(rng.choice(base_rep_choices))

        yi_base = f_true(np.array([xi]))[:, 0]
        for _ in range(reps):
            eps = np.asarray([rng.normal(0, s) for s in noise_std], dtype=np.float64)
            xs.append([xi])
            ys.append(yi_base + eps)

    xtrain = np.asarray(xs, dtype=np.float64)
    ytrain = np.asarray(ys, dtype=np.float64).T
    xtest = np.linspace(0.0, 1.0, 400, dtype=np.float64)[:, None]
    ytrue = f_true(xtest[:, 0])
    return xtrain, ytrain, xtest, ytrue

Hide code cell source

# Experiment helper functions
@dataclass(frozen=True)
class ReplicationSummary:
    case: int
    submethod: str
    n_total: int
    n_unique: int
    rep_min: int
    rep_mean: float
    rep_max: int
    seed: int


def build_dataset(case: int, seed: int) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """Generate one replicated dataset for the requested case."""
    rng = np.random.default_rng(seed)

    if case == 1:
        return make_rep_data(
            n_unique=16,
            rep_choices=(1, 2, 3, 4, 5),
            noise_std=(0.05, 0.08, 0.10),
            rng=rng,
        )

    if case == 2:
        return make_rep_data_skewed(
            n_unique=40,
            heavy_region=(0.20, 0.45),
            light_rep_choices=(1, 2),
            heavy_rep_choices=(8, 12, 16, 20),
            noise_std=(0.05, 0.08, 0.10),
            rng=rng,
        )

    if case == 3:
        return make_rep_data_hotspots(
            n_unique=50,
            hotspots=((0.15, 10, 15), (0.50, 18, 25), (0.80, 12, 20)),
            base_rep_choices=(1,),
            noise_std=(0.05, 0.08, 0.10),
            rng=rng,
        )

    raise ValueError("case must be one of 1, 2, or 3")


def summarize_replication(case: int, submethod: str, xtrain: np.ndarray, seed: int) -> ReplicationSummary:
    _, rep_counts = np.unique(xtrain[:, 0], return_counts=True)
    return ReplicationSummary(
        case=case,
        submethod=submethod,
        n_total=int(xtrain.shape[0]),
        n_unique=int(rep_counts.size),
        rep_min=int(rep_counts.min()),
        rep_mean=float(rep_counts.mean()),
        rep_max=int(rep_counts.max()),
        seed=seed,
    )


def fit_lcgp(
    xtrain: np.ndarray,
    ytrain: np.ndarray,
    xtest: np.ndarray,
    ytrue: np.ndarray,
    submethod: str,
    runno: str,
):
    """Fit an LCGP model and return the run object, predictions, and elapsed training time."""
    data = {
        "xtrain": xtrain,
        "xtest": xtest,
        "ytrain": ytrain,
        "ytest": ytrue,
        "ytrue": ytrue,
    }

    modelrun = LCGPRun(
        runno=runno,
        data=data,
        num_latent=3,
        var_threshold=None,
        submethod=submethod,
        diag_error_structure=[1, 1, 1],
        robust_mean=True,
    )
    modelrun.define_model()

    t0 = time.time()
    modelrun.train()
    elapsed = time.time() - t0

    predmean, ypredvar, yconfvar = modelrun.predict(return_fullcov=False)
    return modelrun, predmean, ypredvar, yconfvar, elapsed


def evaluate_prediction(ytrue: np.ndarray, predmean: np.ndarray, yconfvar: np.ndarray) -> dict[str, float]:
    """Compute scalar prediction-quality metrics for the fitted model."""
    pcover, pwidth = evaluation.intervalstats(ytrue, predmean, yconfvar)
    return {
        "RMSE": float(evaluation.rmse(ytrue, predmean)),
        "NRMSE": float(evaluation.normalized_rmse(ytrue, predmean)),
        "95% PI coverage": float(pcover),
        "95% PI width": float(pwidth),
        "DSS": float(evaluation.dss(ytrue, predmean, yconfvar, use_diag=True)),
    }

Hide code cell source

## Plotting helper functions
def plot_output_predictions(
    case: int,
    submethod: str,
    xtrain: np.ndarray,
    ytrain: np.ndarray,
    xtest: np.ndarray,
    ytrue: np.ndarray,
    predmean: np.ndarray,
    yconfvar: np.ndarray,
    outfile: Path | None = None,
):
    """Plot observed replicates, true functions, posterior means, and 95% confidence bands."""
    order_test = np.argsort(xtest[:, 0])
    order_train = np.argsort(xtrain[:, 0])

    fig, axes = plt.subplots(3, 1, figsize=(10, 7), sharex=True)
    for i, ax in enumerate(axes):
        ax.scatter(
            xtrain[order_train, 0],
            ytrain[i, order_train],
            s=12,
            alpha=0.65,
            label="replicates" if i == 0 else None,
        )
        ax.plot(
            xtest[order_test, 0],
            ytrue[i, order_test],
            lw=1.8,
            label="true" if i == 0 else None,
        )
        ax.plot(
            xtest[order_test, 0],
            predmean[i, order_test],
            lw=1.5,
            label="LCGP mean" if i == 0 else None,
        )

        lo = predmean[i, order_test] - 1.96 * np.sqrt(yconfvar[i, order_test])
        hi = predmean[i, order_test] + 1.96 * np.sqrt(yconfvar[i, order_test])
        ax.fill_between(
            xtest[order_test, 0],
            lo,
            hi,
            alpha=0.22,
            label="95% credible band" if i == 0 else None,
        )
        ax.set_ylabel(fr"$f_{i + 1}(x)$")

    axes[-1].set_xlabel("x")
    axes[0].legend(loc="best", fontsize=9)
    fig.suptitle(f"LCGP predictions — Case {case}: {CASE_LABELS[case]} ({submethod})", y=1.01)
    fig.tight_layout()

    if outfile is not None:
        fig.savefig(outfile, bbox_inches="tight")

    return fig


def plot_latent_gps(
    case: int,
    submethod: str,
    modelrun,
    xtest: np.ndarray,
    outfile: Path | None = None,
):
    """Plot latent GP posterior means and 95% bands."""
    mdl = modelrun.model
    _ = mdl.predict(x0=xtest, return_fullcov=False)

    ghat_test = np.asarray(mdl.ghat)
    gstd_test = np.sqrt(np.asarray(mdl.gvar))
    ghat_train = np.asarray(mdl.mks)
    x_train_unique = np.asarray(mdl.x_unique)

    order_test = np.argsort(xtest[:, 0])
    order_unique = np.argsort(x_train_unique[:, 0])

    q = ghat_test.shape[0]
    fig, axes = plt.subplots(q, 1, figsize=(10, 1.9 * q), sharex=True)
    if q == 1:
        axes = [axes]

    x_test_sorted = xtest[order_test, 0]
    x_train_sorted = x_train_unique[order_unique, 0]

    for k, ax in enumerate(axes):
        mean = ghat_test[k, order_test]
        sd = gstd_test[k, order_test]
        ax.plot(x_test_sorted, mean, lw=1.8, label=fr"$g_{{{k + 1}}}(x)$ mean")
        ax.fill_between(x_test_sorted, mean - 1.96 * sd, mean + 1.96 * sd, alpha=0.22, label="95% band")
        ax.scatter(x_train_sorted, ghat_train[k, order_unique], s=12, alpha=0.65, label="train points")
        ax.set_ylabel(fr"$g_{{{k + 1}}}(x)$")
        ax.legend(loc="best", fontsize=9)

    axes[-1].set_xlabel("x")
    fig.suptitle(f"Latent GPs — Case {case}: {CASE_LABELS[case]} ({submethod})", y=1.01)
    fig.tight_layout()

    if outfile is not None:
        fig.savefig(outfile, bbox_inches="tight")

    return fig

Comparing multiple scenarios of replications#

summary_rows: list[dict] = []
metric_rows: list[dict] = []
diagnostic_rows: list[dict] = []

for case in CASES:
    for submethod in SUBMETHODS:
        seed = BASE_SEED + 100 * case + (0 if submethod == "rep" else 1)
        xtrain, ytrain, xtest, ytrue = build_dataset(case, seed=seed)

        rep_summary = summarize_replication(case, submethod, xtrain, seed)
        summary_rows.append({
            "case": rep_summary.case,
            "case label": CASE_LABELS[case],
            "submethod": rep_summary.submethod,
            "N total": rep_summary.n_total,
            "unique x": rep_summary.n_unique,
            "rep min": rep_summary.rep_min,
            "rep mean": rep_summary.rep_mean,
            "rep max": rep_summary.rep_max,
            "seed": rep_summary.seed,
        })

        display(Markdown(f"### Case {case}: {CASE_LABELS[case]} — `{submethod}`"))
        display(pd.DataFrame([summary_rows[-1]]))

        modelrun, predmean, ypredvar, yconfvar, elapsed = fit_lcgp(
            xtrain=xtrain,
            ytrain=ytrain,
            xtest=xtest,
            ytrue=ytrue,
            submethod=submethod,
            runno=f"case_{case}_{submethod}",
        )

        metrics = evaluate_prediction(ytrue, predmean, yconfvar)
        metric_rows.append({
            "case": case,
            "case label": CASE_LABELS[case],
            "submethod": submethod,
            "training time (s)": elapsed,
            **metrics,
        })
        display(pd.DataFrame([metric_rows[-1]]))

        mdl = modelrun.model
        lLmb, lLmb0, lsigma2s, lnugGPs = mdl.get_param()
        r = np.asarray(mdl.r.numpy())

        diagnostic_rows.append({
            "case": case,
            "case label": CASE_LABELS[case],
            "submethod": submethod,
            "diag_D": np.asarray(mdl.diag_D.numpy()).round(4).tolist(),
            "phi^T phi diag": np.diag(mdl.phi.numpy().T @ mdl.phi.numpy()).round(4).tolist(),
            "lengthscales": [np.asarray(lLmb[k].numpy()).round(4).tolist() for k in range(lLmb.shape[0])],
            "noise std fitted": np.sqrt(np.exp(lsigma2s.numpy())).round(4).tolist(),
            "rep avg/min/max": f"{np.mean(r):.2f} / {np.min(r)} / {np.max(r)}",
        })
        display(pd.DataFrame([diagnostic_rows[-1]]))

        run_dir = RESULTS_ROOT / f"case_{case}_{submethod}"
        run_dir.mkdir(parents=True, exist_ok=True)

        fig_y = plot_output_predictions(
            case=case,
            submethod=submethod,
            xtrain=xtrain,
            ytrain=ytrain,
            xtest=xtest,
            ytrue=ytrue,
            predmean=predmean,
            yconfvar=yconfvar,
            outfile=run_dir / "lcgp_output.png",
        )
        plt.show()

        fig_g = plot_latent_gps(
            case=case,
            submethod=submethod,
            modelrun=modelrun,
            xtest=xtest,
            outfile=run_dir / "lcgp_latent.png",
        )
        plt.show()

        display(Markdown(f"Saved figures to `{run_dir.as_posix()}/`."))

Summary tables#

replication_summary = pd.DataFrame(summary_rows)
metric_summary = pd.DataFrame(metric_rows)
diagnostic_summary = pd.DataFrame(diagnostic_rows)

display(Markdown("### Replication design summary"))
display(replication_summary)

display(Markdown("### Predictive performance summary"))
display(
    metric_summary.sort_values(["case", "submethod"])
    .style.format({
        "training time (s)": "{:.2f}",
        "RMSE": "{:.4f}",
        "NRMSE": "{:.4f}",
        "95% PI coverage": "{:.3f}",
        "95% PI width": "{:.4f}",
        "DSS": "{:.4f}",
    })
)

display(Markdown("### Fitted diagnostic summary"))
display(diagnostic_summary)