Source code for HadamardLangevin.samplers

import numpy as np
from numpy.linalg import inv
from scipy.stats import invgamma, norm,invgauss



[docs] def one_step_langevin(x,p, grad, tau,beta=1): """ This function implements one step of proximal langevin. To sample from exp(-beta*(G(x))) where G is smooth and R is nonsmooth. Returns x' = x - tau*grad_G(x) + sqrt(2*tau/beta)*N(0,I_p) :param x: initial vector size (p,) :param p: int, length of x :param tau: float, stepsize :param grad: gradient of G. This is a function mapping numpy.ndarray of size (p,) to numpy.ndarray of size (p,) :param beta: float, inverse temperature, beta=1 by default. :return y: numpy.ndarray of size (p,) next iteration of proximal langevin """ y = x - tau*grad(x) + np.sqrt(2*tau/beta)*np.random.randn(p,) return y
[docs] def one_step_MALA(x, p, fval, grad, tau, beta=1): """ This function implements one step of metropolis hasting proximal langevin. To sample from exp(-beta*G(x)) where G is smooth and R is nonsmooth. Returns x' = x - tau*(grad_G(x) + (x - prox_R(x,gamma))/gamma) + sqrt(2*tau/beta)*N(0,I_p) :param x: initial vector size (p,) :param p: int, length of x :param tau: float, stepsize :param fval: G. This is a function mapping numpy.ndarray of size (p,) to float :param grad: gradient of smooth term. This is a function mapping numpy.ndarray of size (p,) to numpy.ndarray of size (p,) :param beta: float, inverse temperature, beta=1 by default. :return y: numpy.ndarray of size (p,) next iteration of proximal langevin """ #propose new point Y = lambda x: x - tau*grad(x) x_ = Y(x) + np.sqrt(2*tau/beta)*np.random.randn(p,) log_pi = lambda x: -beta*fval(x) #probability of transitining from x to x_ def log_q(x_, x): return -beta/4/tau * np.linalg.norm(x_-Y(x))**2 # Compute Metropolis-Hastings acceptance probability log_acceptance_ratio = log_pi(x_) - log_pi(x) log_acceptance_ratio += log_q(x,x_) - log_q(x_,x) if np.log(np.random.uniform()) <= log_acceptance_ratio: return x_ #accept proposal else: return x
[docs] def one_step_hadamard(x, p, grad, tau, lam,beta=1): """ This function implements one step of Hadamard langevin. To sample from exp(-beta*(G(x)+lam*|x|_1)) where G is smooth. :param x: initial vector size (2*p,) representing (u,v) :param p: int, .5 * length of x :param tau: float, stepsize :param grad: gradient of smooth term. This is a function mapping numpy.ndarray of size (p,) to numpy.ndarray of size (p,) :param lam: float, regularization parameter for l1 term :param beta: float, inverse temperature, beta=1 by default. :return y: numpy.ndarray of size (2*p,) next iteration of hadamard langevin """ u = x[:p] v = x[p:] g = grad(u*v) Grad = np.concatenate((v*g, u*g)) z = x - tau*Grad + np.random.randn(2*p,)*np.sqrt(2*tau/beta) z[:p] = (z[:p]+np.sqrt(z[:p]**2 + 4*tau/beta*(1+tau*lam)))/2 x_ = z/(1+tau*lam) return x_
[docs] def one_step_MALA_hadamard(x, p, fval, grad, tau, lam, beta=1): """ This function implements one step of Hadamard langevin. To sample from exp(-beta*(G(x)+lam*|x|_1)) where G is smooth. :param x: initial vector size (2*p,) representing (u,v) :param p: int, .5 * length of x :param tau: float, stepsize :param fval: functional value of smooth part to negative log density :param grad: gradient of smooth term in negative log density. This is a function mapping numpy.ndarray of size (p,) to numpy.ndarray of size (p,) :param lam: float, regularization parameter for l1 term :param beta: float, inverse temperature, beta=1 by default. :return y: numpy.ndarray of size (2*p,) next iteration of hadamard langevin """ ru = lambda x: x[:p] rv = lambda x: x[p:] prod = lambda x,g: np.concatenate((rv(x)*g ,ru(x)*g)) Grd = lambda x: prod( x, grad( ru(x)*rv(x) ) ) # S1 = lambda x: (x+np.sqrt(x**2 + 4/beta*tau*(1+tau*lam)))/2 #propose new point z = x - tau*Grd(x) + np.random.randn(2*p,)*np.sqrt(2*tau/beta) z[:p] = (z[:p]+np.sqrt(z[:p]**2 + 4/beta*tau*(1+tau*lam)))/2 # z[:p] = S1(z[:p]) x_ = z/(1+tau*lam) log_pi = lambda x: (-0.5*lam*np.sum(x**2) - fval(ru(x)*rv(x) ))*beta + np.sum(np.log(ru(x))) #probability of transitining from x to x_ def log_q(x_, x): u_ = ru(x_) u = ru(x) g = (1+tau*lam)*x_ - x + tau*Grd(x) g[:p] = g[:p] - tau/u_/beta q = -g**2 * beta/(4*tau) q[:p] = q[:p] + np.log(1+tau*lam + tau/u_**2/beta) return np.sum(q) # Compute Metropolis-Hastings acceptance probability log_acceptance_ratio = log_pi(x_) - log_pi(x) log_acceptance_ratio += log_q(x,x_) - log_q(x_,x) if np.log(np.random.uniform()) <= log_acceptance_ratio: return x_ #accept proposal else: return x
# Gibbs Sampler for Bayesian Lasso with sigma^2 = 1
[docs] def gibbs_sampler(A,y, lam,init, n, burn_in=1000, beta=1): """ This function implements one step of Gibbs sampler for the Bayesian lasso. To sample from exp(-beta*(|A@x-y|^2/2+lam*|x|_1)) :param A: numpy.ndarray of size (m,p), data matrix :param y: numpy.ndarray of size (m,), data vector :param lam: float, regularization parameter for l1 term :param init: int, initial vector of size (p,) :param n: int, number of samples to return :param burn_in: int, number of iterations to run before recording the samples :param beta: float, inverse temperature, beta=1 by default. :return samples: numpy.ndarray of size (n,p), generated samples """ p = len(init) eta = init def one_step(): # Sample x | y, X, eta V_x = inv(beta*A.T@A + np.diag(1/eta)) m_x = beta* V_x @ A.T @ y x = np.random.multivariate_normal(m_x, V_x) # Sample eta_j | x_j for j in range(p): eta[j]= 1/invgauss.rvs(mu=abs(1./(beta*lam*np.abs(x[j]))), scale=(lam*beta)**2) return x #burn in for i in range(burn_in): one_step() #record n samples samples = np.zeros((n,p)) for i in range(n): x = one_step() samples[i,:] = x.reshape(-1) return samples
[docs] def generate_samples_x(Iterate,init, n, burn_in=1000): """ This function generates n samples using some sampling mechanisim given by Iterate. :param Iterate: function that takes input x_t of size (p,) and outputs x_{t+1} of size (p,) :param init: int, initial vector of size (p,) :param n: int, number of samples to return :param burn_in: int, number of iterations to run before recording the samples :return samples: numpy.ndarray of size (n,p) """ x = init p = len(init) #burn in for i in range(burn_in): x = Iterate(x) # p = len(x) #record n samples samples = np.zeros((n,p)) for i in range(n): x = Iterate(x) samples[i,:] = x.reshape(-1) return samples
[docs] def generate_samples_stride(Iterate,init, n, stride=1, burn_in=1000): """ This function generates n samples using some sampling mechanisim given by Iterate. :param Iterate: function that takes input x_t of size (p,) and outputs x_{t+1} of size (p,) :param init: int, initial vector of size (p,) :param n: int, number of samples to return :param stride: int, number of samples to skip before recording :param burn_in: int, number of iterations to run before recording the samples :return samples: numpy.ndarray of size (n,p) """ x = init #burn in for i in range(burn_in): x = Iterate(x) p = len(x) #record n samples samples = np.zeros((n,p)) k=0 for i in range(n*stride): x = Iterate(x) if np.mod(i,stride)==0: samples[k,:] = x.reshape(-1) k+=1 return samples