First Attempts with the parallel Package

The last time I worked with parallel processing in R I used the package Rmpi. This ws very flexible, but took a lot of setup to handle passing jobs, code, and data between the master and the slave. I have used the package snow for some simple projects, but I have new need for parallel computing and recently found out about the parallel package (something I should have known about earlier). This is my first attempt at using the mcparallel function from the parallel package.

As a simple test, I have used mcparallel to implement multiple chains for a Metropolis-Hastings sampler of the mean parameter in a multivariate log-normal distribution. That is, to sample from the posterior distribution of  \mu given a an iid sample of random p-vectors X_1,\ldots,X_n such that  \log(X_i) \sim N_p(\mu,\Sigma) . In the simulation I have set  p=2 ,  \mu=(1,1)^T , and  \Sigma=.5 + .5 I_2.

The sampler I have implemented generates proposals using a symmetric multivariate normal proposal distribution with variance Sigma.prop. I have manually tuned the proposal variance to achieve acceptance rates just above .5 and visually pleasing traceplots.

The use of the parallel package comes in three steps:

1. Initialize the random number stream:

This part of the code selects and initializes the L’Ecuyer random number generator so that each chain uses a different RNG.


## Initialize random number generator to obtain different chains from
## each parallel run
RNGkind("L'Ecuyer-CMRG")
set.seed(7777)
mc.reset.stream()

2. Start the parallel processes:

The second part of the code starts the parallels processes running. I have used the lapply function here so that the number of parallel processes is not hardwired and can be changed easily.

## Start parallel processes
niter <- 1000
Sigma.prop <- .03*Sigma</p>
MHprocesses <- lapply(1:nchains,function(i){
mcparallel(MHalg(mu.init[i,],x,Sigma,n.iter,Sigma.prop))
})

3. Collect results:

The final chunk uses the mccollect function to wait for the parallel jobs to finish and collect the output.

## Wait and collect output
MHout <- mccollect(MHprocesses)

In this case, the chains could have been run in parallel using the mcapply function. This would avoid the need to set the RNG and to collect results manually. Both functions use forks and share memory until objects are modified, so they should not differ in speed or memory use. However, mcparallel seems to be more flexible and I will need the flexibility in some of projects.

Here is the full code. Any comments are welcome!

############################################################
#
# parallel_1.R
#
# First test with the parallel package.
#
# In this example I run parallel chains for a Metroposlis-
# Hastings sampler for the multivariate log-normal
# distribution using the mcparallel() function to
# parallelize the code.
#
############################################################

##### Preliminaries #####
## Load packages
library(parallel)
library(mvtnorm)

## Define functions
dmvlnorm <- function(x,mu,Sigma,log=FALSE){
 ## Multivariate log-normal density
 dmvnorm(log(x),mu,Sigma,log=log)
}

rmvlnorm <- function(n,mu,Sigma){
 ## Multivariate log-normal random generator

## Generate normal random variates
 logx <- rmvnorm(n,mu,Sigma)

## Exponentiate to get log normal random variates
 x <- exp(logx)

return(x)
}

MHstep <- function(mu.curr,x,Sigma,Sigma.prop){
 ## Metropolis-Hastings step

## Generate proposal
 mu.prop <- mu.curr + rmvnorm(1,rep(0,length(mu.curr)),Sigma.prop)

## Compute Hastings ratio (on log scale)
 log.alpha <- sum(dmvlnorm(x,mu.prop,Sigma,log=TRUE)) -
 sum(dmvlnorm(x,mu.curr,Sigma,log=TRUE))

## Accept or reject proposal
 if(-rexp(1) < log.alpha)
 return(mu.prop)
 else
 return(mu.curr)
}

MHalg <- function(mu,x,Sigma,niter,Sigma.prop){
 ## Initialize storage
 mu.store <- matrix(nrow=niter + 1,ncol=length(mu))
 mu.store[1,] <- mu

## Initialize acceptance monitor
 accept <- rep(0,niter)

## Run MH algorithm
 for(k in 1:niter){
 ## Perform one MH step
 mu <- MHstep(mu,x,Sigma,Sigma.prop)

## Store values
 mu.store[k+1,] <- mu

## Track acceptance
 if(all(mu.store[k+1,] != mu.store[k,]))
 accept[k] <- 1
 }

 ## Return sampled values
 return(list(mu=mu.store,accept=accept))
}

##### Run Samplers #####

## Generate some random data
mu <- c(1,1)
Sigma <- .5 + .5 * diag(2)

n <- 25

x <- rmvlnorm(n,mu,Sigma)

## Parameters
n <- 25
mu <- c(1,.5)
Sigma <- .5 + .5*diag(2)

## Contour plot of true density
xgrid <- seq(0,30,length=100)
Z <- sapply(xgrid,function(x1){
 dmvlnorm(cbind(x1,xgrid),mu,Sigma)
})

contour(xgrid,xgrid,Z)

## Generate data
x <- rmvlnorm(n,mu,Sigma)

## Generate initial values
nchains <- 4
mu.init <- rmvnorm(nchains,apply(log(x),2,mean),2*Sigma)

## Initialize random number generator to obtain different chains from
## each parallel run
RNGkind("L'Ecuyer-CMRG")
set.seed(7777)
mc.reset.stream()

## Start parallel processes
niter <- 1000
Sigma.prop <- .03*Sigma

MHprocesses <- lapply(1:nchains,function(i){
 mcparallel(MHalg(mu.init[i,],x,Sigma,n.iter,Sigma.prop))
})

## Wait and collect output
MHout <- mccollect(MHprocesses)

##### Examine Output #####

## One-dimensional traceplots
col <- rainbow(nchains)
par(mfrow=c(2,1))

trace1 <- sapply(1:nchains,function(i) MHout[[i]]$mu[,1])
matplot(trace1,type="l",lty=1,col=col)
abline(h=mu[1],lty=2,col="grey")

trace2 <- sapply(1:nchains,function(i) MHout[[i]]$mu[,2])
matplot(trace2,type="l",lty=1,col=col)
abline(h=mu[2],lty=2,col="grey")

## Two-dimensional traceplot
par(mfrow=c(1,1))
lim <- range(sapply(MHout,function(out) range(out$mu)))
plot(NA,NA,xlim=lim,ylim=lim)
for(i in 1:nchains)
 lines(MHout[[i]]$mu[,1],MHout[[i]]$mu[,2],col=col[i])

abline(v=mu[1],lty=2,col="grey")
abline(h=mu[2],lty=2,col="grey")

## Acceptance rates
sapply(1:nchains,function(i) mean(MHout[[i]]$accept))

## Density plots
par(mfrow=c(2,1))
dens1 <- lapply(1:nchains,function(i){
 density(MHout[[i]]$mu[,1],from=lim[1],to=lim[2])
})
ylim1 <- c(0,max(sapply(1:nchains,function(i) max(dens1[[i]]$y))))

plot(NA,NA,xlim=lim,ylim=ylim1)
for(i in 1:nchains)
 lines(dens1[[i]],col=col[i])

abline(v=mu[1],lty=2,col="grey")

dens2 <- lapply(1:nchains,function(i){
 density(MHout[[i]]$mu[,2],from=lim[1],to=lim[2])
})
ylim2 <- c(0,max(sapply(1:nchains,function(i) max(dens2[[i]]$y))))

plot(NA,NA,xlim=lim,ylim=ylim2)
for(i in 1:nchains)
 lines(dens2[[i]],col=col[i])

abline(v=mu[2],lty=2,col="grey")

This entry was posted in Programming, R and tagged , . Bookmark the permalink.