The last time I worked with parallel processing in R I used the package Rmpi. This ws very flexible, but took a lot of setup to handle passing jobs, code, and data between the master and the slave. I have used the package snow for some simple projects, but I have new need for parallel computing and recently found out about the parallel package (something I should have known about earlier). This is my first attempt at using the mcparallel function from the parallel package.
As a simple test, I have used mcparallel to implement multiple chains for a Metropolis-Hastings sampler of the mean parameter in a multivariate log-normal distribution. That is, to sample from the posterior distribution of given a an iid sample of random p-vectors such that . In the simulation I have set , , and .
The sampler I have implemented generates proposals using a symmetric multivariate normal proposal distribution with variance Sigma.prop. I have manually tuned the proposal variance to achieve acceptance rates just above .5 and visually pleasing traceplots.
The use of the parallel package comes in three steps:
1. Initialize the random number stream:
This part of the code selects and initializes the L’Ecuyer random number generator so that each chain uses a different RNG.
## Initialize random number generator to obtain different chains from ## each parallel run RNGkind("L'Ecuyer-CMRG") set.seed(7777) mc.reset.stream()
2. Start the parallel processes:
The second part of the code starts the parallels processes running. I have used the lapply function here so that the number of parallel processes is not hardwired and can be changed easily.
## Start parallel processes niter <- 1000 Sigma.prop <- .03*Sigma</p> MHprocesses <- lapply(1:nchains,function(i){ mcparallel(MHalg(mu.init[i,],x,Sigma,n.iter,Sigma.prop)) })
3. Collect results:
The final chunk uses the mccollect function to wait for the parallel jobs to finish and collect the output.
## Wait and collect output MHout <- mccollect(MHprocesses)
In this case, the chains could have been run in parallel using the mcapply function. This would avoid the need to set the RNG and to collect results manually. Both functions use forks and share memory until objects are modified, so they should not differ in speed or memory use. However, mcparallel seems to be more flexible and I will need the flexibility in some of projects.
Here is the full code. Any comments are welcome!
############################################################ # # parallel_1.R # # First test with the parallel package. # # In this example I run parallel chains for a Metroposlis- # Hastings sampler for the multivariate log-normal # distribution using the mcparallel() function to # parallelize the code. # ############################################################ ##### Preliminaries ##### ## Load packages library(parallel) library(mvtnorm) ## Define functions dmvlnorm <- function(x,mu,Sigma,log=FALSE){ ## Multivariate log-normal density dmvnorm(log(x),mu,Sigma,log=log) } rmvlnorm <- function(n,mu,Sigma){ ## Multivariate log-normal random generator ## Generate normal random variates logx <- rmvnorm(n,mu,Sigma) ## Exponentiate to get log normal random variates x <- exp(logx) return(x) } MHstep <- function(mu.curr,x,Sigma,Sigma.prop){ ## Metropolis-Hastings step ## Generate proposal mu.prop <- mu.curr + rmvnorm(1,rep(0,length(mu.curr)),Sigma.prop) ## Compute Hastings ratio (on log scale) log.alpha <- sum(dmvlnorm(x,mu.prop,Sigma,log=TRUE)) - sum(dmvlnorm(x,mu.curr,Sigma,log=TRUE)) ## Accept or reject proposal if(-rexp(1) < log.alpha) return(mu.prop) else return(mu.curr) } MHalg <- function(mu,x,Sigma,niter,Sigma.prop){ ## Initialize storage mu.store <- matrix(nrow=niter + 1,ncol=length(mu)) mu.store[1,] <- mu ## Initialize acceptance monitor accept <- rep(0,niter) ## Run MH algorithm for(k in 1:niter){ ## Perform one MH step mu <- MHstep(mu,x,Sigma,Sigma.prop) ## Store values mu.store[k+1,] <- mu ## Track acceptance if(all(mu.store[k+1,] != mu.store[k,])) accept[k] <- 1 } ## Return sampled values return(list(mu=mu.store,accept=accept)) } ##### Run Samplers ##### ## Generate some random data mu <- c(1,1) Sigma <- .5 + .5 * diag(2) n <- 25 x <- rmvlnorm(n,mu,Sigma) ## Parameters n <- 25 mu <- c(1,.5) Sigma <- .5 + .5*diag(2) ## Contour plot of true density xgrid <- seq(0,30,length=100) Z <- sapply(xgrid,function(x1){ dmvlnorm(cbind(x1,xgrid),mu,Sigma) }) contour(xgrid,xgrid,Z) ## Generate data x <- rmvlnorm(n,mu,Sigma) ## Generate initial values nchains <- 4 mu.init <- rmvnorm(nchains,apply(log(x),2,mean),2*Sigma) ## Initialize random number generator to obtain different chains from ## each parallel run RNGkind("L'Ecuyer-CMRG") set.seed(7777) mc.reset.stream() ## Start parallel processes niter <- 1000 Sigma.prop <- .03*Sigma MHprocesses <- lapply(1:nchains,function(i){ mcparallel(MHalg(mu.init[i,],x,Sigma,n.iter,Sigma.prop)) }) ## Wait and collect output MHout <- mccollect(MHprocesses) ##### Examine Output ##### ## One-dimensional traceplots col <- rainbow(nchains) par(mfrow=c(2,1)) trace1 <- sapply(1:nchains,function(i) MHout[[i]]$mu[,1]) matplot(trace1,type="l",lty=1,col=col) abline(h=mu[1],lty=2,col="grey") trace2 <- sapply(1:nchains,function(i) MHout[[i]]$mu[,2]) matplot(trace2,type="l",lty=1,col=col) abline(h=mu[2],lty=2,col="grey") ## Two-dimensional traceplot par(mfrow=c(1,1)) lim <- range(sapply(MHout,function(out) range(out$mu))) plot(NA,NA,xlim=lim,ylim=lim) for(i in 1:nchains) lines(MHout[[i]]$mu[,1],MHout[[i]]$mu[,2],col=col[i]) abline(v=mu[1],lty=2,col="grey") abline(h=mu[2],lty=2,col="grey") ## Acceptance rates sapply(1:nchains,function(i) mean(MHout[[i]]$accept)) ## Density plots par(mfrow=c(2,1)) dens1 <- lapply(1:nchains,function(i){ density(MHout[[i]]$mu[,1],from=lim[1],to=lim[2]) }) ylim1 <- c(0,max(sapply(1:nchains,function(i) max(dens1[[i]]$y)))) plot(NA,NA,xlim=lim,ylim=ylim1) for(i in 1:nchains) lines(dens1[[i]],col=col[i]) abline(v=mu[1],lty=2,col="grey") dens2 <- lapply(1:nchains,function(i){ density(MHout[[i]]$mu[,2],from=lim[1],to=lim[2]) }) ylim2 <- c(0,max(sapply(1:nchains,function(i) max(dens2[[i]]$y)))) plot(NA,NA,xlim=lim,ylim=ylim2) for(i in 1:nchains) lines(dens2[[i]],col=col[i]) abline(v=mu[2],lty=2,col="grey")