"""
Functions for generating simulated event posteriors.
The simulated posteriors will not accurately represent real observational uncertainties.
The module provides the `gwpopulation_pipe_simulate_posteriors` executable.
In order to use the primary function you will need a class that provides
various attributes specified in the `gwpopulation_pipe` parser.
"""
import os
import sys
import numpy as np
import pandas as pd
from bilby.core.prior import TruncatedGaussian
from bilby.core.utils import logger
from bilby.gw.cosmology import get_cosmology
from bilby.gw.prior import UniformSourceFrame
from gwpopulation.backend import set_backend
from gwpopulation.conversions import convert_to_beta_parameters
from gwpopulation.utils import to_numpy, xp
from .data_analysis import load_model, load_vt
from .main import MASS_MODELS
from .main import create_parser as create_main_parser
BOUNDS = dict(
    mass_1=(2, 100),
    mass_ratio=(0, 1),
    a_1=(0, 1),
    a_2=(0, 1),
    cos_tilt_1=(-1, 1),
    cos_tilt_2=(-1, 1),
    redshift=(0, 2.3),
)
cosmology = get_cosmology()
PRIOR_VOLUME = (
    (BOUNDS["mass_1"][1] - BOUNDS["mass_1"][0]) ** 2
    * (BOUNDS["a_1"][1] - BOUNDS["a_1"][0])
    * (BOUNDS["a_2"][1] - BOUNDS["a_2"][0])
    * (BOUNDS["cos_tilt_1"][1] - BOUNDS["cos_tilt_1"][0])
)
[docs]
def create_parser():
    parser = create_main_parser()
    parser.add_argument(
        "--models",
        action="append",
        help="Model function to evaluate, default is "
        "two component mass and iid spins.",
    )
    parser.add_argument(
        "--vt-model",
        type=str,
        default="",
        help="Function to generate VT object, should be in "
        "vt_helper.py, default is mass only.",
    )
    return parser 
[docs]
def draw_true_values(model, vt_model=None, n_samples=40):
    """
    Draw samples from the probability density provided by the input model.
    Parameters
    ----------
    model: bilby.hyper.model.Model
        Population model to evaluate
    vt_model: gwpopulation.vt.GridVT, optional
        Model to compute selection function
    n_samples: int
        Number of samples to draw
    Returns
    -------
    total_samples: `pd.DataFrame`
        Data containing the true values.
    """
    if vt_model is None:
        vt_model = lambda x: 1
    else:
        raise NotImplementedError("Sensitive volume not implemented.")
    total_samples = pd.DataFrame()
    n_per_iteration = n_samples * 10000
    while True:
        data = _draw_from_prior(n_samples=n_per_iteration)
        prob = model.prob(data)
        prob *= vt_model(data)
        data = pd.DataFrame({key: to_numpy(data[key]) for key in data})
        data["prob"] = to_numpy(prob)
        total_samples = total_samples.append(data)
        max_prob = np.max(total_samples["prob"])
        total_samples = total_samples[total_samples["prob"] > 0]
        total_samples = total_samples[
            total_samples["prob"] > max_prob / n_per_iteration
        ]
        keep = np.array(total_samples["prob"]) >= np.random.uniform(
            0, max_prob, len(total_samples)
        )
        if sum(keep) > n_samples:
            total_samples = total_samples.iloc[keep]
            break
        logger.info(f"Drawing events is very inefficient.")
    total_samples = total_samples[:n_samples]
    del total_samples["prob"]
    return total_samples 
[docs]
def simulate_posterior(sample, fractional_sigma=0.1, n_samples=10000):
    """
    Simulate a posterior distribution given an input sample.
    This assumes uncorrelated uncertainties between different parameters.
    Parameters
    ----------
    sample: dict
        The true parameter values of the event to simulate a posterior for.
    fractional_sigma: float
        Fractional uncertainty on each parameter.
    n_samples: int
        The number of samples to draw.
    Returns
    -------
    posterior: `pd.DataFrame`
        The simulated posterior samples.
    """
    posterior = pd.DataFrame()
    for key in sample:
        if key in BOUNDS:
            bound = BOUNDS[key]
        else:
            bound = (-np.inf, np.inf)
        sigma = sample[key] * fractional_sigma
        new_true = TruncatedGaussian(
            mu=sample[key], sigma=sigma, minimum=bound[0], maximum=bound[1]
        ).sample()
        posterior[key] = TruncatedGaussian(
            mu=new_true, sigma=sigma, minimum=bound[0], maximum=bound[1]
        ).sample(n_samples)
    posterior = posterior[
        (BOUNDS["mass_1"][0] / posterior["mass_1"] <= posterior["mass_ratio"])
        & (posterior["mass_ratio"] <= BOUNDS["mass_ratio"][1])
    ]
    posterior["prior"] = 1 / PRIOR_VOLUME
    return posterior 
def _draw_from_prior(n_samples):
    data = dict()
    for key in BOUNDS:
        data[key] = xp.random.uniform(BOUNDS[key][0], BOUNDS[key][1], n_samples)
    data["redshift"] = xp.asarray(
        UniformSourceFrame(
            minimum=BOUNDS["redshift"][0],
            maximum=BOUNDS["redshift"][1],
            name="redshift",
        ).sample(n_samples)
    )
    return data
[docs]
def simulate_posterior_from_prior(n_samples=10000):
    """
    Draw samples from the posterior distribution assuming the data are
    non-informative, e.g., sample from the prior distribution.
    Parameters
    ----------
    n_samples: int
        The number of samples to simulate.
    Returns
    -------
    posterior: `pd.DataFrame`
        The simulated samples.
    """
    data = _draw_from_prior(n_samples=n_samples)
    posterior = pd.DataFrame({key: to_numpy(data[key]) for key in data})
    posterior["prior"] = (
        UniformSourceFrame(
            minimum=BOUNDS["redshift"][0],
            maximum=BOUNDS["redshift"][1],
            name="redshift",
        ).prob(posterior["redshift"])
        / PRIOR_VOLUME
    )
    return posterior 
[docs]
def simulate_posteriors(args):
    """
    Simulate a set of posterior samples gives the command-line arguments.
    Parameters
    ----------
    args:
        The command-line arguments.
    Returns
    -------
    posteriors: list
        The simulated posterior samples.
    """
    posteriors = list()
    if args.sample_from_prior:
        logger.info("Drawing prior samples for all simulated events.")
        for ii in range(args.n_simulations):
            posteriors.append(
                simulate_posterior_from_prior(n_samples=args.samples_per_posterior)
            )
    else:
        injection_parameters = dict(
            pd.read_json(args.injection_file).iloc[args.injection_index]
        )
        injection_parameters, _ = convert_to_beta_parameters(
            parameters=injection_parameters
        )
        args.models = list()
        args.models.append(MASS_MODELS[args.mass_models[0]])
        args.models.append(f"{args.magnitude_models[0]}_spin_magnitude")
        args.models.append(f"{args.tilt_models[0]}_spin_orientation")
        model = load_model(args=args)
        model.parameters.update(injection_parameters)
        logger.info(model.parameters)
        if not getattr(args, "vt_model", "") == "":
            vt_model = load_vt(args=args)
        else:
            vt_model = None
        true_values = draw_true_values(
            model=model, vt_model=vt_model, n_samples=args.n_simulations
        )
        logger.info("Simulating posteriors for all events.")
        for ii in range(args.n_simulations):
            posteriors.append(
                simulate_posterior(
                    dict(true_values.iloc[ii]), n_samples=args.samples_per_posterior
                )
            )
    if not os.path.exists(os.path.join(args.run_dir)):
        os.mkdir(os.path.join(args.run_dir))
    if not os.path.exists(os.path.join(args.run_dir, "data")):
        os.mkdir(os.path.join(args.run_dir, "data"))
    with open(
        os.path.join(args.run_dir, "data", f"{args.data_label}_posterior_files.txt"),
        "w",
    ) as ff:
        for ii in range(args.n_simulations):
            ff.write(f"{ii}:{ii}")
    return posteriors 
[docs]
def main():
    parser = create_parser()
    args, _ = parser.parse_known_args(sys.argv[1:])
    set_backend(args.backend)
    simulate_posteriors(args)