diff --git a/experiments/benchmarking/benchmarking.jl b/experiments/benchmarking/benchmarking.jl index 09b2b8b3680064df0099b5645a3bdc9d79ea1752..856bde7ead8527bfd07523d5f2a7410650b6bf52 100644 --- a/experiments/benchmarking/benchmarking.jl +++ b/experiments/benchmarking/benchmarking.jl @@ -44,25 +44,25 @@ end Run the benchmarking procedure. """ -function run_benchmark(exp::Experiment, model_dict::Dict) +function run_benchmark(exper::Experiment, model_dict::Dict) - n_individuals = exp.n_individuals - dataname = exp.dataname - counterfactual_data = exp.counterfactual_data - generator_dict = exp.generators - measures = exp.ce_measures - parallelizer = exp.parallelizer + n_individuals = exper.n_individuals + dataname = exper.dataname + counterfactual_data = exper.counterfactual_data + generator_dict = exper.generators + measures = exper.ce_measures + parallelizer = exper.parallelizer # Benchmark generators: if isnothing(generator_dict) generator_dict = default_generators(; - Λ=exp.Λ, - Λ_Δ=exp.Λ_Δ, - use_variants=exp.use_variants, - use_class_loss=exp.use_class_loss, - opt=exp.opt, - nsamples=exp.nsamples, - nmin=exp.nmin, + Λ=exper.Λ, + Λ_Δ=exper.Λ_Δ, + use_variants=exper.use_variants, + use_class_loss=exper.use_class_loss, + opt=exper.opt, + nsamples=exper.nsamples, + nmin=exper.nmin, ) end diff --git a/experiments/data/data.jl b/experiments/data/data.jl index 0facb7fa3c8f41318d9c2e18bfc192f9ce52cbbc..58965d3a7d62bf106677ebeabae70319d1ba3c80 100644 --- a/experiments/data/data.jl +++ b/experiments/data/data.jl @@ -1,10 +1,10 @@ -function _prepare_data(exp::Experiment) +function _prepare_data(exper::Experiment) # Unpack data: - counterfactual_data = exp.counterfactual_data - min_batch_size = exp.min_batch_size - sampling_batch_size = exp.sampling_batch_size - ð’Ÿx = exp.ð’Ÿx + counterfactual_data = exper.counterfactual_data + min_batch_size = exper.min_batch_size + sampling_batch_size = exper.sampling_batch_size + ð’Ÿx = exper.ð’Ÿx # Data parameters: X, _ = CounterfactualExplanations.DataPreprocessing.unpack_data(counterfactual_data) @@ -27,17 +27,17 @@ function _prepare_data(exp::Experiment) return X, labels, n_obs, batch_size, sampler end -function meta_data(exp::Experiment) - _, _, n_obs, batch_size, _ = _prepare_data(exp::Experiment) +function meta_data(exper::Experiment) + _, _, n_obs, batch_size, _ = _prepare_data(exper::Experiment) return n_obs, batch_size end -function prepare_data(exp::Experiment) - X, labels, _, _, sampler = _prepare_data(exp::Experiment) +function prepare_data(exper::Experiment) + X, labels, _, _, sampler = _prepare_data(exper::Experiment) return X, labels, sampler end -function batch_size(exp::Experiment) - _, _, _, batch_size, _ = _prepare_data(exp::Experiment) +function batch_size(exper::Experiment) + _, _, _, batch_size, _ = _prepare_data(exper::Experiment) return batch_size end \ No newline at end of file diff --git a/experiments/experiment.jl b/experiments/experiment.jl index c6ebb78621a05e70291f3ab9b304419b8c3349a8..1d39e398500ba6f8d271e76bd3b2ed1b23965f3c 100644 --- a/experiments/experiment.jl +++ b/experiments/experiment.jl @@ -35,65 +35,68 @@ Base.@kwdef struct Experiment nmin::Union{Nothing,Int} = nothing finaliser::Function = Flux.softmax loss::Function = Flux.Losses.crossentropy + train_parallel::Bool = false end "A container to hold the results of an experiment." mutable struct ExperimentOutcome - exp::Experiment + exper::Experiment model_dict::Union{Nothing, Dict} generator_dict::Union{Nothing, Dict} bmk::Union{Nothing, Benchmark} end """ - train_models!(outcome::ExperimentOutcome, exp::Experiment) + train_models!(outcome::ExperimentOutcome, exper::Experiment) -Train the models specified by `exp` and store them in `outcome`. +Train the models specified by `exper` and store them in `outcome`. """ -function train_models!(outcome::ExperimentOutcome, exp::Experiment) - model_dict = prepare_models(exp) +function train_models!(outcome::ExperimentOutcome, exper::Experiment) + model_dict = prepare_models(exper) outcome.model_dict = model_dict - meta_model_performance(outcome) + if !(is_multi_processed(exper) && MPI.Comm_rank(exper.parallelizer.comm) != 0) + meta_model_performance(outcome) + end end """ - benchmark!(outcome::ExperimentOutcome, exp::Experiment) + benchmark!(outcome::ExperimentOutcome, exper::Experiment) -Benchmark the models specified by `exp` and store the results in `outcome`. +Benchmark the models specified by `exper` and store the results in `outcome`. """ -function benchmark!(outcome::ExperimentOutcome, exp::Experiment) - bmk, generator_dict = run_benchmark(exp, outcome.model_dict) +function benchmark!(outcome::ExperimentOutcome, exper::Experiment) + bmk, generator_dict = run_benchmark(exper, outcome.model_dict) outcome.bmk = bmk outcome.generator_dict = generator_dict end """ - run_experiment(exp::Experiment) + run_experiment(exper::Experiment) -Run the experiment specified by `exp`. +Run the experiment specified by `exper`. """ -function run_experiment(exp::Experiment; save_output::Bool=true, only_models::Bool=ONLY_MODELS) +function run_experiment(exper::Experiment; save_output::Bool=true, only_models::Bool=ONLY_MODELS) # Setup - @info "All results will be saved to $(exp.output_path)." - isdir(exp.output_path) || mkdir(exp.output_path) - @info "All parameter choices will be saved to $(exp.params_path)." - isdir(exp.params_path) || mkdir(exp.params_path) - outcome = ExperimentOutcome(exp, nothing, nothing, nothing) + @info "All results will be saved to $(exper.output_path)." + isdir(exper.output_path) || mkdir(exper.output_path) + @info "All parameter choices will be saved to $(exper.params_path)." + isdir(exper.params_path) || mkdir(exper.params_path) + outcome = ExperimentOutcome(exper, nothing, nothing, nothing) # Models - train_models!(outcome, exp) + train_models!(outcome, exper) # Return if only models are needed: !only_models || return outcome # Benchmark - benchmark!(outcome, exp) + benchmark!(outcome, exper) # Save data: - if save_output - Serialization.serialize(joinpath(exp.output_path, "$(exp.save_name)_outcome.jls"), outcome) - Serialization.serialize(joinpath(exp.output_path, "$(exp.save_name)_bmk.jls"), outcome.bmk) + if save_output && !(is_multi_processed(exper) && MPI.Comm_rank(exper.parallelizer.comm) != 0) + Serialization.serialize(joinpath(exper.output_path, "$(exper.save_name)_outcome.jls"), outcome) + Serialization.serialize(joinpath(exper.output_path, "$(exper.save_name)_bmk.jls"), outcome.bmk) meta(outcome; save_output=true) end @@ -108,19 +111,19 @@ Overload the `run_experiment` function to allow for passing in `CounterfactualDa """ function run_experiment(counterfactual_data::CounterfactualData, test_data::CounterfactualData; kwargs...) # Parameters: - exp = Experiment(; + exper = Experiment(; counterfactual_data=counterfactual_data, test_data=test_data, kwargs... ) - return run_experiment(exp) + return run_experiment(exper) end # Pre-trained models: -function pretrained_path(exp::Experiment) - if isfile(joinpath(exp.output_path, "$(exp.save_name)_models.jls")) - @info "Found local pre-trained models in $(exp.output_path) and using those." - return exp.output_path +function pretrained_path(exper::Experiment) + if isfile(joinpath(exper.output_path, "$(exper.save_name)_models.jls")) + @info "Found local pre-trained models in $(exper.output_path) and using those." + return exper.output_path else @info "Using artifacts. Models were pre-trained on `julia-$(LATEST_VERSION)` and may not work on other versions." return joinpath(LATEST_ARTIFACT_PATH, "results") diff --git a/experiments/models/models.jl b/experiments/models/models.jl index bd2d5026cfb1dc1173b8091e3638dc89ab7f010b..022b693788efe9dc6dd12cbadd7afb42d764baac 100644 --- a/experiments/models/models.jl +++ b/experiments/models/models.jl @@ -2,58 +2,63 @@ include("additional_models.jl") include("default_models.jl") include("train_models.jl") -function prepare_models(exp::Experiment) +function prepare_models(exper::Experiment) # Unpack data: - X, labels, sampler = prepare_data(exp::Experiment) + X, labels, sampler = prepare_data(exper::Experiment) # Training: - if !exp.use_pretrained - if isnothing(exp.builder) + if !exper.use_pretrained + if isnothing(exper.builder) builder = default_builder() else - builder = exp.builder + builder = exper.builder end # Default models: - if isnothing(exp.models) + if isnothing(exper.models) @info "Using default models." models = default_models(; sampler=sampler, builder=builder, - batch_size=batch_size(exp), - sampling_steps=exp.sampling_steps, - α=exp.α, - n_ens=exp.n_ens, - use_ensembling=exp.use_ensembling, - finaliser=exp.finaliser, - loss=exp.loss, - epochs=exp.epochs, + batch_size=batch_size(exper), + sampling_steps=exper.sampling_steps, + α=exper.α, + n_ens=exper.n_ens, + use_ensembling=exper.use_ensembling, + finaliser=exper.finaliser, + loss=exper.loss, + epochs=exper.epochs, ) end # Additional models: - if !isnothing(exp.additional_models) + if !isnothing(exper.additional_models) @info "Using additional models." add_models = Dict{Any,Any}() - for (k, mod) in exp.additional_models + for (k, mod) in exper.additional_models add_models[k] = mod(; - batch_size=batch_size(exp), - finaliser=exp.finaliser, - loss=exp.loss, - epochs=exp.epochs, + batch_size=batch_size(exper), + finaliser=exper.finaliser, + loss=exper.loss, + epochs=exper.epochs, ) end models = merge(models, add_models) end @info "Training models." - model_dict = train_models(models, X, labels; cov=exp.coverage) + model_dict = train_models(models, X, labels; parallelizer=exper.parallelizer, train_parallel=exper.train_parallel, cov=exper.coverage) else @info "Loading pre-trained models." - model_dict = Serialization.deserialize(joinpath(pretrained_path(exp), "$(exp.save_name)_models.jls")) + model_dict = Serialization.deserialize(joinpath(pretrained_path(exper), "$(exper.save_name)_models.jls")) + if is_multi_processed(exper) + MPI.Barrier(exper.parallelizer.comm) + end end # Save models: - @info "Saving models to $(joinpath(exp.output_path, "$(exp.save_name)_models.jls"))." - Serialization.serialize(joinpath(exp.output_path, "$(exp.save_name)_models.jls"), model_dict) + if !(is_multi_processed(exper) && MPI.Comm_rank(exper.parallelizer.comm) != 0) + @info "Saving models to $(joinpath(exper.output_path, "$(exper.save_name)_models.jls"))." + Serialization.serialize(joinpath(exper.output_path, "$(exper.save_name)_models.jls"), model_dict) + end return model_dict end \ No newline at end of file diff --git a/experiments/models/train_models.jl b/experiments/models/train_models.jl index 863584a48bd8570fd672666290f912f9abac958c..635243444124bd2dcaee329e2cd37f5987103290 100644 --- a/experiments/models/train_models.jl +++ b/experiments/models/train_models.jl @@ -1,10 +1,35 @@ +using CounterfactualExplanations: AbstractParallelizer + """ train_models(models::Dict) Trains all models in a dictionary and returns a dictionary of `ConformalModel` objects. """ -function train_models(models::Dict, X, y; kwargs...) - model_dict = Dict(mod_name => _train(model, X, y; mod_name=mod_name, kwargs...) for (mod_name, model) in models) +function train_models(models::Dict, X, y; parallelizer::Union{Nothing,AbstractParallelizer}=nothing, train_parallel::Bool=false, kwargs...) + if is_multi_processed(parallelizer) && train_parallel + # Split models into groups of approximately equal size: + model_list = [(key, value) for (key, value) in models] + x = split_obs(model_list, parallelizer.n_proc) + x = MPI.scatter(x, parallelizer.comm) + # Train models: + model_dict = Dict() + for (mod_name, model) in x + model_dict[mod_name] = _train(model, X, y; mod_name=mod_name, verbose=false, kwargs...) + end + MPI.Barrier(parallelizer.comm) + output = MPI.gather(output, parallelizer.comm) + # Collect output from all processe in rank 0: + if parallelizer.rank == 0 + output = merge(output...) + else + output = nothing + end + # Broadcast output to all processes: + model_dict = MPI.bcast(output, parallelizer.comm; root=0) + MPI.Barrier(parallelizer.comm) + else + model_dict = Dict(mod_name => _train(model, X, y; mod_name=mod_name, kwargs...) for (mod_name, model) in models) + end return model_dict end @@ -20,11 +45,15 @@ end Trains a model and returns a `ConformalModel` object. """ -function _train(model, X, y; cov, method=:simple_inductive, mod_name="model") +function _train(model, X, y; cov, method=:simple_inductive, mod_name="model", verbose::Bool=true) conf_model = conformal_model(model; method=method, coverage=cov) mach = machine(conf_model, X, y) @info "Begin training $mod_name." - fit!(mach) + if verbose + fit!(mach) + else + fit!(mach, verbosity=0) + end @info "Finished training $mod_name." M = ECCCo.ConformalModel(mach.model, mach.fitresult) return M diff --git a/experiments/post_processing/meta_data.jl b/experiments/post_processing/meta_data.jl index a2e8f96db9404bac7d5a85c711ca9f2f34e528ed..63bfef2a40b65684a68f0c3b8c6989f48186fbb1 100644 --- a/experiments/post_processing/meta_data.jl +++ b/experiments/post_processing/meta_data.jl @@ -1,5 +1,5 @@ """ - meta(exp::Experiment) + meta(exper::Experiment) Extract and save meta data about the experiment. """ @@ -21,28 +21,28 @@ Extract and save meta data about the data and models in `outcome.model_dict`. function meta_model(outcome::ExperimentOutcome; save_output::Bool=false) # Unpack: - exp = outcome.exp - n_obs, batch_size = meta_data(exp) + exper = outcome.exper + n_obs, batch_size = meta_data(exper) model_dict = outcome.model_dict params = DataFrame( Dict( :n_obs => Int.(round(n_obs / 10) * 10), :batch_size => batch_size, - :dataname => exp.dataname, - :sgld_batch_size => exp.sampling_batch_size, - :epochs => exp.epochs, - :n_hidden => exp.n_hidden, + :dataname => exper.dataname, + :sgld_batch_size => exper.sampling_batch_size, + :epochs => exper.epochs, + :n_hidden => exper.n_hidden, :n_layers => length(model_dict["MLP"].fitresult[1][1]) - 1, - :activation => string(exp.activation), - :n_ens => exp.n_ens, - :lambda => string(exp.α[3]), - :jem_sampling_steps => exp.sampling_steps, + :activation => string(exper.activation), + :n_ens => exper.n_ens, + :lambda => string(exper.α[3]), + :jem_sampling_steps => exper.sampling_steps, ) ) if save_output - save_path = joinpath(exp.params_path, "$(exp.save_name)_model_params.csv") + save_path = joinpath(exper.params_path, "$(exper.save_name)_model_params.csv") @info "Saving model parameters to $(save_path)." CSV.write(save_path, params) end @@ -54,10 +54,10 @@ end function meta_generators(outcome::ExperimentOutcome; save_output::Bool=false) # Unpack: - exp = outcome.exp + exper = outcome.exper generator_dict = outcome.generator_dict - Λ = exp.Λ - Λ_Δ = exp.Λ_Δ + Λ = exper.Λ + Λ_Δ = exper.Λ_Δ # Output: opt = first(values(generator_dict)).opt @@ -65,19 +65,19 @@ function meta_generators(outcome::ExperimentOutcome; save_output::Bool=false) Dict( :opt => string(typeof(opt)), :eta => opt.eta, - :dataname => exp.dataname, + :dataname => exper.dataname, :lambda_1 => string(Λ[1]), :lambda_2 => string(Λ[2]), :lambda_3 => string(Λ[3]), :lambda_1_Δ => string(Λ_Δ[1]), :lambda_2_Δ => string(Λ_Δ[2]), :lambda_3_Δ => string(Λ_Δ[3]), - :n_individuals => exp.n_individuals, + :n_individuals => exper.n_individuals, ) ) if save_output - save_path = joinpath(exp.params_path, "$(exp.save_name)_generator_params.csv") + save_path = joinpath(exper.params_path, "$(exper.save_name)_generator_params.csv") @info "Saving generator parameters to $(save_path)." CSV.write(save_path, generator_params) end @@ -93,18 +93,18 @@ Compute and save the model performance for the models in `outcome.model_dict`. function meta_model_performance(outcome::ExperimentOutcome; measures::Union{Nothing,Dict}=nothing, save_output::Bool=false) # Unpack: - exp = outcome.exp - measures = isnothing(measures) ? exp.model_measures : measures + exper = outcome.exper + measures = isnothing(measures) ? exper.model_measures : measures model_dict = outcome.model_dict # Model performance: model_performance = DataFrame() for (mod_name, model) in model_dict # Test performance: - _perf = CounterfactualExplanations.Models.model_evaluation(model, exp.test_data, measure=collect(values(measures))) + _perf = CounterfactualExplanations.Models.model_evaluation(model, exper.test_data, measure=collect(values(measures))) _perf = DataFrame([[p] for p in _perf], collect(keys(measures))) _perf.mod_name .= mod_name - _perf.dataname .= exp.dataname + _perf.dataname .= exper.dataname model_performance = vcat(model_performance, _perf) end @@ -112,10 +112,10 @@ function meta_model_performance(outcome::ExperimentOutcome; measures::Union{Noth println(model_performance) if save_output - save_path = joinpath(exp.params_path, "$(exp.save_name)_model_performance.jls") + save_path = joinpath(exper.params_path, "$(exper.save_name)_model_performance.jls") @info "Saving model performance to $(save_path)." Serialization.serialize(save_path, model_performance) - save_path = joinpath(exp.params_path, "$(exp.save_name)_model_performance.csv") + save_path = joinpath(exper.params_path, "$(exper.save_name)_model_performance.csv") @info "Saving model performance to $(save_path)." CSV.write(save_path, model_performance) end diff --git a/experiments/post_processing/post_processing.jl b/experiments/post_processing/post_processing.jl index 7eefa387067dc6ebfa88b596d5d20d9d1e0aabe2..f70aabe43c34d7474afc522dd98013210eab5543 100644 --- a/experiments/post_processing/post_processing.jl +++ b/experiments/post_processing/post_processing.jl @@ -1,2 +1,3 @@ include("meta_data.jl") -include("artifacts.jl") \ No newline at end of file +include("artifacts.jl") +include("results.jl") \ No newline at end of file diff --git a/experiments/post_processing/results.jl b/experiments/post_processing/results.jl index 01e31bef33252b3207f077beb26493ee76d3803c..3b44bcd2abb9f2c035adc267a1fc3bb31ca72af2 100644 --- a/experiments/post_processing/results.jl +++ b/experiments/post_processing/results.jl @@ -1,9 +1,9 @@ """ - aggregate(outcome::ExperimentOutcome; measure::String="distance_from_targets") + aggregate_results(outcome::ExperimentOutcome; measure::String="distance_from_targets") Function to quickly aggregate benchmarking results for a given measure. """ -function aggregate(outcome::ExperimentOutcome; measure::String="distance_from_targets") +function aggregate_results(outcome::ExperimentOutcome; measure::String="distance_from_targets") df = @chain outcome.bmk() begin @group_by(generator, model) @filter(variable == measure) diff --git a/experiments/setup_env.jl b/experiments/setup_env.jl index de27e55444d2bc1ed0dea55d289cc248e9de6f8c..7d4cfede5f5b3a7c853d27e697e89626dc6ae982 100644 --- a/experiments/setup_env.jl +++ b/experiments/setup_env.jl @@ -33,12 +33,12 @@ include("data/data.jl") include("models/models.jl") include("benchmarking/benchmarking.jl") include("post_processing/post_processing.jl") +include("utils.jl") # Parallelization: plz = nothing if "threaded" ∈ ARGS - @info "Multi-threading using $(Threads.nthreads()) threads." const USE_THREADS = true plz = ThreadsParallelizer() else @@ -46,14 +46,17 @@ else end if "mpi" ∈ ARGS - @info "Multi-processing using MPI." import MPI MPI.Init() const USE_MPI = true - plz = MPIParallelizer(MPI.COMM_WORLD, USE_THREADS) + plz = MPIParallelizer(MPI.COMM_WORLD; threaded=USE_THREADS) if MPI.Comm_rank(MPI.COMM_WORLD) != 0 - @info "Disabling logging on non-root processes." global_logger(NullLogger()) + else + @info "Multi-processing using MPI. Disabling logging on non-root processes." + if USE_THREADS + @info "Multi-threading using $(Threads.nthreads()) threads." + end end else const USE_MPI = false diff --git a/experiments/utils.jl b/experiments/utils.jl new file mode 100644 index 0000000000000000000000000000000000000000..27df74b55622bb86835a133813d40b957521bd5c --- /dev/null +++ b/experiments/utils.jl @@ -0,0 +1,3 @@ +is_multi_processed(exper::Experiment) = isa(exper.parallelizer, Base.get_extension(CounterfactualExplanations, :MPIExt).MPIParallelizer) + +is_multi_processed(parallelizer::AbstractParallelizer) = isa(parallelizer, Base.get_extension(CounterfactualExplanations, :MPIExt).MPIParallelizer) \ No newline at end of file diff --git a/notebooks/gmsc.qmd b/notebooks/gmsc.qmd index 8df8eaefc76a737bacfcc34ce3e3cc2594f870a5..fd4994be7dd5a4ec01c529a48a60cb19e80b8f45 100644 --- a/notebooks/gmsc.qmd +++ b/notebooks/gmsc.qmd @@ -37,7 +37,7 @@ if pre_proc function boxcox(x) transf = MLJ.UnivariateBoxCoxTransformer() - x = exp.(x) + x = exper.(x) mach = machine(transf, x) fit!(mach) z = MLJ.transform(mach, x) diff --git a/notebooks/prototyping.qmd b/notebooks/prototyping.qmd index a8b2e667683ea0892b26ce39860ad7631e9f2aa5..4720ea450a732f8f940b032470d65d945aa16a9e 100644 --- a/notebooks/prototyping.qmd +++ b/notebooks/prototyping.qmd @@ -9,7 +9,7 @@ dataname = "linearly_separable" outcome = Serialization.deserialize(joinpath(DEFAULT_OUTPUT_PATH, "$(dataname)_outcome.jls")) # Unpack -exp = outcome.exp +exper = outcome.exper model_dict = outcome.model_dict generator_dict = outcome.generator_dict bmk = outcome.bmk @@ -19,7 +19,7 @@ bmk = outcome.bmk Random.seed!(2023) # Unpack -counterfactual_data = exp.counterfactual_data +counterfactual_data = exper.counterfactual_data X, labels = counterfactual_data.X, counterfactual_data.output_encoder.labels M = model_dict["MLP"] gen = filter(((k,v),) -> k in ["ECCCo", "ECCCo-Δ"], generator_dict) diff --git a/src/model.jl b/src/model.jl index 4caa8249f5c3a8070ec78638b4c41594156bc3d2..445bf4c77d8fbf8131b4d4859f469acc01b443e1 100644 --- a/src/model.jl +++ b/src/model.jl @@ -138,7 +138,7 @@ To keep things consistent with the architecture of `CounterfactualExplanations.j Let $\hat{p}_i$ denote the estimated softmax output for feature $i$. Then in the multi-class case the following formula can be applied: ```math -\beta_i x_i = \log (\hat{p}_i) + \log (\sum_i \exp(\hat{p}_i)) +\beta_i x_i = \log (\hat{p}_i) + \log (\sum_i \exper(\hat{p}_i)) ``` For a short derivation, see here: https://math.stackexchange.com/questions/2786600/invert-the-softmax-function.