Source code for bear_model.bear_net

import tensorflow.compat.v2 as tf
from . import core

epsilon = tf.keras.backend.epsilon()


def _bear_kmer_counts(kmer_seqs, kmer_total_counts,
                      condition_trans_counts=None, h=None, ar_func=None):
    """Get random variable of kmer transition counts conditioned on observed counts.

    Either condition_trans_counts is not None or both of h and ar_func are not None.

    Parameters
    ----------
    kmer_seqs : dtype
        A tensor of shape [A1, ..., An, lag, alphabet_size+1] of one-hot encoded kmers.
    kmer_total_counts : int
        A tensor of shape [A1, ..., An] of counts of each kmer.
    condition_trans_counts : int, default = None
        A tensor of shape [Am, ..., An, alphabet_size+1] of transition counts of each
        kmer to condition on. Set to 0 if None. Will broadcast if m>1.
    h : dtype, default = None
        A positive constant of the :math:`h` parameter from the BEAR model. Set to 1 if None.
    ar_func : function, default = None
        A function that takes a tensor of shape [A1, ..., An, lag, alphabet_size+1]
        of dtype and returns a tensor of shape [A1, ..., An, alphabet_size+1] of dtype
        of transition probabilities for each kmer. The autoregressive function.
        Set to 0 if None.

    Returns
    -------
    x : tensorflow probability distribution
        Distribution of kmer transition counts for a BEAR model.
    """
    dtype = kmer_seqs.dtype
    if condition_trans_counts is None:
        condition_trans_counts = tf.constant(0., dtype)
    if h is None or ar_func is None:
        h = tf.constant(1., dtype)

        def ar_func(x):
            return tf.constant(0., dtype)
    concentrations = ar_func(kmer_seqs) / h + condition_trans_counts + epsilon
    x = core.tfpDirichletMultinomialPerm(kmer_total_counts, concentrations, name='x')
    return x


def _ar_kmer_counts(kmer_seqs, kmer_total_counts, ar_func):
    """Get Random variable of kmer transition counts conditioned on observed counts.

    Parameters
    ----------
    kmer_seqs : dtype
        A tensor of shape [A1, ..., An, lag, alphabet_size+1] of one-hot encoded kmers.
    kmer_total_counts : int
        A tensor of shape [A1, ..., An] of counts of each kmer.
    ar_func : function, default = None
        A function that takes a tensor of shape [A1, ..., An, lag, alphabet_size+1]
        of dtype and returns a tensor of shape [A1, ..., An, alphabet_size+1] of dtype
        of transition probabilities for each kmer. The autoregressive function.
        Set to 0 if None.

    Returns
    -------
    x : tensorflow probability distribution
        Distribution of kmer transition counts for an AR model.
    """
    probs = ar_func(kmer_seqs) + epsilon
    x = core.tfpMultinomialPerm(kmer_total_counts, probs, name='x')
    return x


def _create_params(lag, alphabet_size, make_ar_func,
                   af_kwargs, dtype=tf.float64):
    """Define and get parameters of BEAR or AR distribution.

    Parameters
    ----------
    lag : int
    alphabet_size : int
    make_ar_func : function
        Takes lag, alphabet_size, af_kwargs, dtype and returns an ar_func.
    af_kwargs : dict
        Keyword arguments for particular ar_func. For example, filter_width.
    dtype : tensorflow dtype, default = tf.float64
        dtype for the ar_func and h_signed.

    Returns
    -------
    params: list
        List of parameters as tensorflow variables.
    h_signed : dtype
        :math:`\log(h)` where :math:`h` is the BEAR parameter.
    ar_func : function
        The autoregressive function.
    """
    ar_func, ar_func_params = make_ar_func(lag, alphabet_size, **af_kwargs, dtype=dtype)
    h_signed = tf.Variable(0, dtype=dtype, name='h_signed')
    params = ([h_signed] + ar_func_params)
    return params, h_signed, ar_func


[docs]def change_scope_params(lag, alphabet_size, make_ar_func, af_kwargs, params, dtype=tf.float64): """Redefine and get parameters of BEAR or AR distribution in given scope. Used to to get unmirrored variables after training on multiple GPUs in parallel or to unpack a list of params into h and the autoregressive function. Parameters ---------- lag : int alphabet_size : int make_ar_func : function Takes lag, alphabet_size, af_kwargs, dtype and returns an ar_func. af_kwargs : dict Keyword arguments for particular ar_func. For example, filter_width. params: list List of parameters as tensorflow variables. dtype : dtype, default = tf.float64 dtype for the ar_func and h. Returns ------- params : list List of parameters as tensorflow variables. h_signed : dtype :math:`\log(h)` where :math:`h` is the BEAR concentration parameter. ar_func : function A function that takes a tensor of shape [A1, ..., An, lag, alphabet_size+1] of dtype and returns a tensor of shape [A1, ..., An, alphabet_size+1] of dtype of transition probabilities for each kmer. The autoregressive function. """ pos_in_params = 0 h_nmir = tf.Variable(0, dtype=dtype, name='h_signed') h_nmir.assign(params[pos_in_params]) pos_in_params += 1 ar_func_nmir, ar_func_params_nmir = make_ar_func(lag, alphabet_size, **af_kwargs, dtype=dtype) for param in ar_func_params_nmir: param.assign(params[pos_in_params]) pos_in_params += 1 params_nmir = ([h_nmir] + ar_func_params_nmir) return params_nmir, h_nmir, ar_func_nmir
def _train_step(batch, num_kmers, h_signed, ar_func, params, acc_grads, train_ar): """Add gradient of unbiased estimate of loss to accumulated gradients. Parameters ---------- batch : list of two tensors of the same dtype. The first element is a one hot encoding of kmers of shape [kmer_batch_size, lag, alphabet_size+1] and the second is the transition counts of size [kmer_batch_size, alphabet_size+1]. num_kmers : int Total number of kmers seen in data. Used to normalize estimate of loss. h_signed : dtype :math:`\log(h)` where :math:`h` is the BEAR parameter. ar_func : function A function that takes a tensor of shape [A1, ..., An, lag, alphabet_size+1] of dtype and returns a tensor of shape [A1, ..., An, alphabet_size+1] of dtype of transition probabilities for each kmer. The autoregressive function. params : list List of parameters as tensorflow variables. acc_grads : list List of accumulated gradients for parameters. train_ar : bool Whether to evaluate the likelihood using an AR (True) or BEAR (False) model. Returns ------- loss : dtype An unbiased estimate of the log likelihood of the data. """ with tf.GradientTape() as grad_tape: kmer_batch_size = tf.shape(batch[0])[0] kmer_seqs = batch[0] transition_counts = batch[1] kmer_total_counts = tf.math.reduce_sum(transition_counts, axis=-1) if train_ar: post = _ar_kmer_counts(kmer_seqs, kmer_total_counts, ar_func) else: post = _bear_kmer_counts(kmer_seqs, kmer_total_counts, h=tf.math.exp(h_signed), ar_func=ar_func) log_likelihood = tf.reduce_sum( post.counts_log_prob(transition_counts)) elbo = (num_kmers / kmer_batch_size) * log_likelihood loss = -elbo gradients = grad_tape.gradient(loss, params) for tv, grad in zip(acc_grads, gradients): if grad is not None: tv.assign_add(grad) return loss
[docs]def train(data, num_kmers, epochs, ds_loc, alphabet, lag, make_ar_func, af_kwargs, learning_rate, optimizer_name, train_ar, acc_steps=1, params_restart=None, writer=None, loss_save=None, dtype=tf.float64): """Train a BEAR or AR model using all available GPUs in parallel. Parameters ---------- data : tensorflow data object Load sequence data using tools in dataloader.py. Minibatch before passing. num_kmers : int Total number of kmers seen in data. Used to normalize estimate of loss. epochs : int ds_loc : int Column in count data that corresponds with the training data. alphabet : str One of 'dna', 'rna', 'prot'. lag : int make_ar_func : function Takes lag, alphabet_size, af_kwargs, dtype and returns an ar_func. See ar_funcs submodule. af_kwargs : dict Keyword arguments for particular ar_func. For example, filter_width. learning_rate : float optimizer_name : str For example 'Adam'. train_ar : bool Whether to train an AR (True) or BEAR (False) model. writer : tensorboard writer object, default = None loss_save : list, default = None Pass a list to have losses at each step appended to it. acc_steps : int, default = 1 Number of steps to accumulate gradients over. params_restart : list of tensorflow variables, default = None Pass the parameter list from a previous run to restart training. dtype : dtype, default = tf.float64 Returns ------- params: list List of parameters as tensorflow variables. h_signed : dtype :math:`\log(h)` where :math:`h` is the BEAR concentration parameter. ar_func : function The autoregressive function. """ alphabet_size = len(core.alphabets_tf[alphabet]) - 1 strategy = tf.distribute.MirroredStrategy() with strategy.scope(): # Define parameters in parallel GPU usage scope. if params_restart is None: params, h_signed, ar_func = _create_params( lag, alphabet_size, make_ar_func, af_kwargs, dtype=dtype) else: params, h_signed, ar_func = change_scope_params( lag, alphabet_size, make_ar_func, af_kwargs, params_restart, dtype=dtype) # Set up accumulated gradient variables.p;'; acc_grads = [tf.Variable(tf.zeros_like(tv), trainable=False) for tv in params] for tv in acc_grads: tv.assign(tf.zeros_like(tv)) # Define optimizer in parallel GPU usage scope. optimizer = getattr(tf.keras.optimizers, optimizer_name)( learning_rate=learning_rate) # One hot encode kmers and get appropriate column from training data. def map_(kmers, counts): return core.tf_one_hot(kmers, alphabet), tf.gather(counts, ds_loc, axis=1) data = data.map( map_, num_parallel_calls=tf.data.experimental.AUTOTUNE).prefetch(10) # Get data iterable for use with GPU parallelization. data_iter = iter(strategy.experimental_distribute_dataset(data)) # Define functions for updating parameters and accumulating gradients # with GPU parallelization. def add_grads(params, acc_grads, optimizer): optimizer.apply_gradients(zip(acc_grads, params)) @tf.function def dist_add_grads(params, acc_grads, optimizer): strategy.run(add_grads, args=(params, acc_grads, optimizer)) @tf.function def dist_train_step(batch, num_kmers, h_signed, ar_func, params, acc_grads, train_ar): losses = strategy.run(_train_step, args=( batch, num_kmers, h_signed, ar_func, params, acc_grads, train_ar)) return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None) # Training loop loss = 0. step = 1 for batch in data_iter: # Accumulate gradients and update loss. loss += dist_train_step(batch, num_kmers, h_signed, ar_func, params, acc_grads, train_ar) if step % acc_steps == 0: # Record loss if writer is not None: with writer.as_default(): tf.summary.scalar('elbo', - loss / acc_steps, step=step) if loss_save != None: loss_save.append(-loss/acc_steps) # Update gradients dist_add_grads(params, acc_grads, optimizer) # Reset accumulated gradients and cumulative loss to zero. for tv in acc_grads: tv.assign(tf.zeros_like(tv)) loss = 0 step += 1 # Remove the distributed scope from the parameters. params, h_signed, ar_func = change_scope_params( lag, alphabet_size, make_ar_func, af_kwargs, params, dtype=dtype) return params, h_signed, ar_func
def _evaluation_step(batch, h, ar_func, van_reg, alphabet_size, use_train, dtype=tf.float64): kmer_seqs = batch[0] transition_counts_test = batch[1] if use_train: transition_counts_train = batch[2] van_condition = transition_counts_train[:, None, :] + van_reg[:, None] else: transition_counts_train = None van_condition = van_reg[:, None] * tf.ones([1, alphabet_size + 1], dtype=dtype) kmer_total_counts_test = tf.math.reduce_sum(transition_counts_test, axis=-1) # Get posteriors. post_ear = _bear_kmer_counts(kmer_seqs, kmer_total_counts_test, condition_trans_counts=transition_counts_train, h=h, ar_func=ar_func) post_arm = _ar_kmer_counts(kmer_seqs, kmer_total_counts_test, ar_func) post_van = _bear_kmer_counts(kmer_seqs, kmer_total_counts_test[:, None], condition_trans_counts=van_condition) # Get likelihoods. log_likelihood_ear = tf.reduce_sum( post_ear.counts_log_prob(transition_counts_test), axis=-1) log_likelihood_arm = tf.reduce_sum( post_arm.counts_log_prob(transition_counts_test)) log_likelihood_van = tf.reduce_sum( post_van.counts_log_prob(transition_counts_test[:, None, :]), axis=0) # Get most likely transition and accuracy. ml_ear = post_ear.ml_output() ml_arm = post_arm.ml_output() ml_van = post_van.ml_output() oh_ml_ear = tf.cast(tf.math.equal(ml_ear[..., None], tf.range(alphabet_size+1, dtype=dtype)), dtype=dtype) oh_ml_arm = tf.cast(tf.math.equal(ml_arm[..., None], tf.range(alphabet_size+1, dtype=dtype)), dtype=dtype) oh_ml_van = tf.cast(tf.math.equal(ml_van[..., None], tf.range(alphabet_size+1, dtype=dtype)), dtype=dtype) correct_ear = tf.math.reduce_sum(tf.math.reduce_sum( transition_counts_test*oh_ml_ear, axis=-1), axis=-1) correct_arm = tf.math.reduce_sum(transition_counts_test*oh_ml_arm) correct_van = tf.math.reduce_sum(tf.math.reduce_sum( transition_counts_test[:, None, :]*oh_ml_van, axis=0), axis=-1) # Sum total number of transitions. total_len = tf.math.reduce_sum(transition_counts_test) return (log_likelihood_ear, log_likelihood_arm, log_likelihood_van, correct_ear, correct_arm, correct_van, total_len) @tf.function def _distributed_evaluation_step(batch, h, ar_func, van_reg, alphabet_size, use_train, strategy): (log_likelihood_ear, log_likelihood_arm, log_likelihood_van, correct_ear, correct_ar, correct_van, total_len) = strategy.run( _evaluation_step, args=(batch, h, ar_func, van_reg, alphabet_size, use_train)) return (strategy.reduce(tf.distribute.ReduceOp.SUM, log_likelihood_ear, axis=None), strategy.reduce(tf.distribute.ReduceOp.SUM, log_likelihood_arm, axis=None), strategy.reduce(tf.distribute.ReduceOp.SUM, log_likelihood_van, axis=None), strategy.reduce(tf.distribute.ReduceOp.SUM, correct_ear, axis=None), strategy.reduce(tf.distribute.ReduceOp.SUM, correct_ar, axis=None), strategy.reduce(tf.distribute.ReduceOp.SUM, correct_van, axis=None), strategy.reduce(tf.distribute.ReduceOp.SUM, total_len, axis=None))
[docs]def evaluation(data, ds_loc_train, ds_loc_test, alphabet, h, ar_func, van_reg, dtype=tf.float64): """Evaluate a trained BEAR, AR or BMM model. Can use multiple GPUs in parallel. Parameters ---------- data : tensorflow data object Load sequence data using tools in dataloader.py. Minibatch before passing. ds_loc_train : int Column in count data that corresponds with the training data. Set to -1 for conditioning on training data. ds_loc_test : int Column in count data that corresponds with the testing data. alphabet : str One of 'dna', 'rna', 'prot'. h : dtype The :math:`h` parameter from the BEAR model. ar_func : function A function that takes a tensor of shape [A1, ..., An, lag, alphabet_size+1] of dtype and returns a tensor of shape [A1, ..., An, alphabet_size+1] of dtype of transition probabilities for each kmer. The autoregressive function. van_reg : 1D numpy or tensorflow array Prior on vanilla BEAR model (Dirichlet concentration parameter). dtype : dtype, default = tf.float64 Returns ------- log_likelihood_ear, log_likelihood_arm, log_likelihood_van : float Total log likelihood of the data with the model evaluated as a BEAR, AR or vanilla BEAR model. perplexity_ear, perplexity_arm, perplexity_van : float Perplexity of the data with the model evaluated as a BEAR, AR or vanilla BEAR model. accuracy_ear, accuracy_arm, accuracy_van : float Accuracy of the data with the model evaluated as a BEAR, AR or vanilla BEAR model. Ties in maximum model probability are resolved randomly. """ alphabet_size = len(core.alphabets_tf[alphabet]) - 1 strategy = tf.distribute.MirroredStrategy() use_train = ds_loc_train >= 0 if use_train: def map_(kmers, counts): return (core.tf_one_hot(kmers, alphabet), tf.gather(counts, ds_loc_test, axis=1), tf.gather(counts, ds_loc_train, axis=1)) else: def map_(kmers, counts): return (core.tf_one_hot(kmers, alphabet), tf.gather(counts, ds_loc_test, axis=1)) data_iter = iter(strategy.experimental_distribute_dataset(data.map(map_).prefetch(10))) log_likelihood_ear = tf.constant(0., dtype=dtype) log_likelihood_arm = tf.constant(0., dtype=dtype) log_likelihood_van = tf.zeros(len(van_reg), dtype=dtype) correct_ear = tf.constant(0., dtype=dtype) correct_arm = tf.constant(0., dtype=dtype) correct_van = tf.zeros(len(van_reg), dtype=dtype) total_len = tf.constant(0., dtype=dtype) for batch in data_iter: (ll_ear, ll_arm, ll_van, cor_ear, cor_arm, cor_van, tot_lens ) = _distributed_evaluation_step( batch, h, ar_func, van_reg, alphabet_size, use_train, strategy) log_likelihood_ear += ll_ear log_likelihood_arm += ll_arm log_likelihood_van += ll_van correct_ear += cor_ear correct_arm += cor_arm correct_van += cor_van total_len += tot_lens return (log_likelihood_ear, log_likelihood_arm, log_likelihood_van, tf.exp(-log_likelihood_ear/total_len), tf.exp(-log_likelihood_arm/total_len), tf.exp(-log_likelihood_van/total_len), correct_ear/total_len, correct_arm/total_len, correct_van/total_len)
[docs]def h_scan(data, ds_loc_train, ds_loc_test, alphabet, h, ar_func, dtype=tf.float64): """Evaluate a trained BEAR model at multiple h values. Can use multiple GPUs in parallel. Parameters ---------- data : tensorflow data object Load sequence data using tools in dataloader.py. Minibatch before passing. ds_loc_train : int Column in count data that corresponds with the training data. Set to -1 for conditioning on training data. ds_loc_test : int Column in count data that corresponds with the testing data. alphabet : str One of 'dna', 'rna', 'prot'. h : tensor The :math:`h` parameter from the BEAR model. ar_func : function A function that takes a tensor of shape [A1, ..., An, lag, alphabet_size+1] of dtype and returns a tensor of shape [A1, ..., An, alphabet_size+1] of dtype of transition probabilities for each kmer. The autoregressive function. dtype : dtype, default = tf.float64 Returns ------- log_likelihood_ear, log_likelihood_arm, log_likelihood_van : float Total log likelihood of the data with the model evaluated as a BEAR, AR or vanilla BEAR model. perplexity_ear, perplexity_arm, perplexity_van : float Perplexity of the data with the model evaluated as a BEAR, AR or vanilla BEAR model. accuracy_ear, accuracy_arm, accuracy_van : float Accuracy of the data with the model evaluated as a BEAR, AR or vanilla BEAR model. Ties in maximum model probability are resolved randomly. """ alphabet_size = len(core.alphabets_tf[alphabet]) - 1 van_reg = tf.ones(1, dtype=dtype) strategy = tf.distribute.MirroredStrategy() use_train = ds_loc_train >= 0 if use_train: def map_(kmers, counts): return (core.tf_one_hot(kmers, alphabet), tf.gather(counts, ds_loc_test, axis=1), tf.gather(counts, ds_loc_train, axis=1)) else: def map_(kmers, counts): return (core.tf_one_hot(kmers, alphabet), tf.gather(counts, ds_loc_test, axis=1)) data_iter = iter(strategy.experimental_distribute_dataset(data.map(map_).prefetch(10))) log_likelihood_ear = tf.zeros(tf.shape(h), dtype=dtype) correct_ear = tf.zeros(tf.shape(h), dtype=dtype) total_len = tf.constant(0., dtype=dtype) for i, batch in enumerate(data_iter): if i == 0: len_ar_func_out = len(tf.shape(batch[1])) hs = tf.reshape(h, tf.concat([tf.shape(h), tf.ones(len_ar_func_out, dtype=tf.int32)], axis=0)) (ll_ear, ll_arm, ll_van, cor_ear, cor_arm, cor_van, tot_lens ) = _distributed_evaluation_step( batch, hs, ar_func, van_reg, alphabet_size, use_train, strategy) log_likelihood_ear += ll_ear correct_ear += cor_ear total_len += tot_lens return (log_likelihood_ear, tf.exp(-log_likelihood_ear/total_len), correct_ear/total_len, )