#!/usr/bin/env python3
import glob
import itertools
import os
import shutil
import sys
from pathlib import Path
import htcondor
from bilby.core.sampler import get_sampler_class
from bilby_pipe.utils import convert_string_to_dict, strip_quotes
from bilby_pipe.job_creation.nodes.analysis_node import touch_checkpoint_files
from htcondor import dags
from .parser import create_parser
MASS_MODELS = dict(
a="gwpopulation.models.mass.power_law_primary_mass_ratio",
b="gwpopulation.models.mass.power_law_primary_mass_ratio",
c="SinglePeakSmoothedMassDistribution",
d="BrokenPowerLawSmoothedMassDistribution",
e="MultiPeakSmoothedMassDistribution",
f="BrokenPowerLawPeakSmoothedMassDistribution",
)
REDSHIFT_MODELS = dict(
powerlaw="gwpopulation.models.redshift.PowerLawRedshift",
madaudickinson="gwpopulation.models.redshift.MadauDickinsonRedshift",
)
[docs]
def relative_topdir(path: str, reference: str) -> str:
"""Returns the top-level directory name of a path relative
to a reference
"""
try:
return str(Path(path).resolve().relative_to(reference))
except ValueError as exc:
exc.args = (f"cannot format {path} relative to {reference}",)
raise
[docs]
def create_base(kind, args, gpus=0):
log_file = f"{args.log_dir}/population-{kind}-$(label)"
if kind == "collection":
memory = "8GB"
else:
memory = "1GB"
if kind in ["plot", "analysis"]:
disk = "1GB"
else:
disk = "200MB"
config = {
"universe": "vanilla",
"initialdir": os.environ["PWD"],
"executable": getattr(args, f"{kind}_executable"),
"output": f"{log_file}.out",
"error": f"{log_file}.err",
"log": f"{log_file}.log",
"request_cpus": "1",
"request_gpus": gpus,
"request_memory": memory,
"request_disk": disk,
"accounting_group": args.accounting,
"accounting_group_user": args.user,
"notification": "error",
"arguments": _arguments(args, kind).format(
**{var: f"$({var})" for var in _VARIABLES[kind]}
),
"checkpoint_exit_code": 130,
}
if gpus:
config["require_gpus"] = args.require_gpus
if kind == "summary":
config["environment"] = f'CONDA_EXE={os.getenv("CONDA_EXE")}'
config.update(transfer_files(args, kind))
job = htcondor.Submit(config)
return job
[docs]
def transfer_files(args, kind):
if kind == "collection":
return dict(should_transfer_files="NO")
run_dir = relative_topdir(args.run_dir, os.environ["PWD"])
ini_file = f"{run_dir}/{args.label}_config_complete.ini"
data_dir = f"{run_dir}/data"
result_dir = f"{run_dir}/result"
model_files = list()
for model_list in args.all_models.values():
for model in model_list:
if model.endswith(".json"):
model_files.append(os.path.abspath(model))
if args.source_files is not None:
source_files = [os.path.abspath(fname) for fname in args.source_files]
else:
source_files = list()
if kind == "summary":
inputs = [ini_file]
inputs.extend(
[f"{run_dir}/result/{label}_result.hdf5" for label in args.labels]
)
outputs = [f"{run_dir}/summary"]
elif kind == "collection":
if "*" in args.vt_file:
inputs = glob.glob(args.vt_file)
else:
inputs = [args.vt_file]
inputs += [ini_file]
outputs = [data_dir]
elif kind == "analysis":
inputs = [
f"{data_dir}/{args.data_label}.pkl",
f"{data_dir}/{args.data_label}_posterior_files.txt",
f"{data_dir}/injections.pkl",
os.path.abspath(args.prior_file),
ini_file,
]
cls = get_sampler_class(args.sampler)
files, dirs = cls.get_expected_outputs(outdir=result_dir, label="$(label)")
inputs.extend(files)
inputs.extend(dirs)
inputs.extend(model_files)
inputs.extend(source_files)
outputs = [result_dir]
elif kind == "format":
inputs = [
f"{result_dir}/$(label)_result.hdf5",
f"{data_dir}/injections.pkl",
f"{result_dir}/$(label)_samples.pkl",
]
inputs.extend(model_files)
inputs.extend(source_files)
outputs = [f"{result_dir}/$(label)_full_posterior.hdf5"]
elif kind == "plot":
inputs = [
ini_file,
f"{result_dir}/$(label)_result.hdf5",
f"{result_dir}/$(label)_samples.pkl",
f"{data_dir}/injections.pkl",
]
inputs.extend(model_files)
inputs.extend(source_files)
outputs = [result_dir]
else:
raise ValueError(f"Unknown job type: {kind}")
return dict(
should_transfer_files="YES",
transfer_input_files=f"{','.join(inputs)}",
transfer_output_files=f"{','.join(outputs)}",
when_to_transfer_output="ON_EXIT_OR_EVICT",
preserve_relative_paths=True,
stream_error=True,
stream_output=True,
)
def _arguments(args, kind):
run_dir = relative_topdir(args.run_dir, os.environ["PWD"])
ini_file = f"{run_dir}/{args.label}_config_complete.ini"
data_dir = f"{run_dir}/data"
result_dir = f"{run_dir}/result"
if args.vt_file is not None:
vt_file = f"{data_dir}/injections.pkl"
else:
vt_file = args.vt_file
if kind == "analysis":
arguments = (
f"{ini_file} --run-dir {run_dir} --label {{label}} "
f"{{models}} {{vt_models}} --vt-file {vt_file}"
)
elif kind == "collection":
arguments = f"{ini_file} --run-dir {run_dir}"
elif kind == "format":
arguments = (
f"--result-file {result_dir}/{{label}}_result.hdf5 "
f"--n-samples {args.n_post_samples} --max-redshift {args.max_redshift} "
f"--minimum-mass {args.minimum_mass} --maximum-mass {args.maximum_mass} "
f"--injection-file {vt_file} "
f"--filename {result_dir}/{{label}}_full_posterior.hdf5 "
f"--samples-file {result_dir}/{{label}}_samples.pkl "
f"--vt-ifar-threshold {args.vt_ifar_threshold} "
f"--vt-snr-threshold {args.vt_snr_threshold} "
f"--backend {args.backend} --cosmology {args.cosmology} "
)
if args.cosmo:
arguments += f"--cosmo "
elif kind == "plot":
arguments = (
f"{ini_file} --run-dir {run_dir} "
f"--result-file {result_dir}/{{label}}_result.hdf5 "
f"--samples {result_dir}/{{label}}_samples.pkl "
)
elif kind == "summary":
result_files = " ".join(
[f"{result_dir}/{label}_result.hdf5" for label in args.labels]
)
arguments = (
f"--config {ini_file} --webdir {run_dir}/summary "
f"--samples {result_files} --labels {' '.join(args.labels)}"
)
return arguments
_VARIABLES = dict(
analysis=["label", "models", "vt_models"],
collection=list(),
format=["label"],
plot=["label"],
summary=list(),
)
[docs]
def check_user(user=None):
if user is None:
if "USER" in os.environ:
user = os.environ.get["USER"]
else:
raise ValueError(
"Argument 'user' must be provided or set in environment variables!"
)
return user
[docs]
def make_submit_files(args):
subfiles = ["analysis", "collection", "common_format"]
if args.post_plots:
subfiles.append("plot")
if args.make_summary:
subfiles.append("summary")
uses_gpu = ["analysis", "common_format", "plot"]
jobs = dict()
for label in subfiles:
if label in uses_gpu:
gpus = args.request_gpu
else:
gpus = 0
jobs[label] = create_base(label.split("_")[-1], args, gpus)
return jobs
[docs]
def update_args(args):
args.user = check_user(user=args.user)
args.accounting = "ligo.dev.o4.cbc.bayesianpopulations.parametric"
args.run_dir = os.path.abspath(args.run_dir)
args.log_dir = os.path.abspath(args.log_dir)
args.request_gpu = int(args.request_gpu)
args.all_models = convert_string_to_dict(args.all_models)
args.analysis_executable = str(shutil.which("gwpopulation_pipe_analysis"))
args.collection_executable = str(shutil.which("gwpopulation_pipe_collection"))
args.format_executable = str(shutil.which("gwpopulation_pipe_to_common_format"))
args.plot_executable = str(shutil.which("gwpopulation_pipe_plot"))
args.summary_executable = str(shutil.which("summarypages"))
args.custom_plotting = (
os.path.join(os.path.dirname(__file__), "pesummary_plot.py"),
)
args.condor_dir = os.path.join(args.run_dir, "submit")
args.result_dir = os.path.join(args.run_dir, "result")
args.summary_dir = os.path.join(args.run_dir, "summary")
args.data_dir = os.path.join(args.run_dir, "data")
[docs]
def create_directories(args):
for directory in [
args.run_dir,
args.log_dir,
args.condor_dir,
args.result_dir,
args.data_dir,
args.summary_dir,
]:
if not os.path.isdir(directory):
os.mkdir(directory)
elif not os.path.isdir(directory):
raise IOError(f"{directory} exists and is not a directory.")
if os.path.isdir(args.existing_data_directory):
os.rmdir(args.data_dir)
os.symlink(
os.path.abspath(args.existing_data_directory),
os.path.abspath(args.data_dir),
)
do_collection = False
else:
do_collection = True
return do_collection
[docs]
def reduce_name(value):
if value.endswith(".json"):
value = value[:-5]
return value.split(".")[-1]
[docs]
def write_bash_file(variables, args, bash_file):
bash_str = "#! /bin/bash\n\n"
bash_str += f"echo 'Moving to {os.environ['PWD']}'\n"
bash_str += f"cd {os.environ['PWD']}\n\n"
if "collection" in variables:
bash_str += f"{args.collection_executable} {_arguments(args, 'collection')}\n\n"
for avar, pvar, fvar in zip(
variables["analysis"], variables["plot"], variables["common_format"]
):
bash_str += f"{args.analysis_executable} {_arguments(args, 'analysis').format(**avar)}\n\n"
bash_str += (
f"{args.plot_executable} {_arguments(args, 'plot').format(**pvar)}\n\n"
)
bash_str += (
f"{args.format_executable} {_arguments(args, 'format').format(**fvar)}\n\n"
)
if "summary" in variables:
bash_str += f"{args.summary_executable} {_arguments(args, 'summary')}\n\n"
bash_str += "cd -\n"
with open(bash_file, "w") as ff:
ff.write(bash_str)
[docs]
def make_dag(args):
update_args(args)
do_collection = create_directories(args)
all_variables = dict()
if do_collection:
all_variables["collection"] = [dict(label=args.label)]
run_dir = relative_topdir(args.run_dir, os.environ["PWD"])
job_names = list()
result_files = list()
all_variables["analysis"] = list()
all_variables["plot"] = list()
all_variables["common_format"] = list()
for values in itertools.product(*args.all_models.values()):
models = dict(zip(args.all_models.keys(), values))
prior_name = "_".join(
[f"{key}_{reduce_name(value)}" for key, value in models.items()]
)
job_name = f"{args.label}_{prior_name}"
job_names.append(job_name)
result_files.append(f"{run_dir}/result/{job_name}_result.hdf5")
if models.get("mass", None) in MASS_MODELS:
models["mass"] = MASS_MODELS[models["mass"]]
if models.get("redshift", None) in REDSHIFT_MODELS:
models["redshift"] = REDSHIFT_MODELS[models["redshift"]]
vt_models = dict()
for key in models:
if key in args.vt_parameters:
vt_models[key] = models[key]
if "spin" in args.vt_parameters:
vt_models["magnitude"] = models["magnitude"]
vt_models["tilt"] = models["tilt"]
if len(vt_models) == 0:
vt_models = {key: models[key] for key in ["mass", "redshift"]}
models = "--models " + " --models ".join(
[f"{key}:{value}" for key, value in models.items()]
)
vt_models = "--vt-models " + " --vt-models ".join(
[f"{key}:{value}" for key, value in vt_models.items()]
)
all_variables["analysis"].append(
dict(label=job_name, models=models, vt_models=vt_models)
)
touch_checkpoint_files(
directory=args.result_dir,
label=job_name,
sampler=args.sampler,
result_format="hdf5",
)
all_variables["plot"].append(dict(label=job_name))
all_variables["common_format"].append(dict(label=job_name))
args.labels = job_names
if args.make_summary:
all_variables["summary"] = [dict()]
def layer_kwargs(kind):
return dict(
name=f"{args.condor_dir}/{kind}",
submit_description=jobs[kind],
vars=all_variables[kind],
)
dag = dags.DAG()
jobs = make_submit_files(args)
if do_collection:
collection_layer = dag.layer(**layer_kwargs("collection"))
next_layer = collection_layer.child_layer
else:
next_layer = dag.layer
analysis_layer = next_layer(**layer_kwargs("analysis"))
edges = dict(plot=dags.OneToOne(), common_format=dags.OneToOne(), summary=None)
for kind in ["plot", "common_format", "summary"]:
if kind not in all_variables:
continue
analysis_layer.child_layer(edge=edges[kind], **layer_kwargs(kind))
bash_file = f"{args.condor_dir}/{args.label}.sh"
dag_file = f"{args.condor_dir}/{args.label}.dag"
dags.write_dag(dag, args.condor_dir, dag_file_name=f"{args.label}.dag")
write_bash_file(all_variables, args, bash_file)
print(f"dag file written to {dag_file}")
print(f"shell script written to {bash_file}")
print(f"Now run condor_submit_dag {dag_file}")
[docs]
def main():
parser = create_parser()
args, _ = parser.parse_known_args(sys.argv[1:])
complete_ini_file = f"{args.run_dir}/{args.label}_config_complete.ini"
make_dag(args)
parser.write_to_file(
filename=complete_ini_file,
args=args,
overwrite=True,
include_description=False,
)
with open(complete_ini_file, "r") as ff:
content = ff.readlines()
for ii, line in enumerate(content):
content[ii] = strip_quotes(line)
with open(complete_ini_file, "w") as ff:
ff.writelines(content)