Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 168 additions & 0 deletions submissions/bdgenomics/scripts/1_integration.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/**
*
*/
val dataDir = "/data/variant_db/1"
val relatednessFile = "README.sample_cryptic_relations"
val populationsFile = "phase1_integrated_calls.20101123.ALL.panel"
val vcfFile = "1KG.chr22.anno.infocol.vcf"

import org.apache.hadoop.fs.{ FileSystem, Path }
import org.apache.spark.rdd.MetricsContext._
import org.bdgenomics.adam.models.ReferenceRegion
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.formats.avro.GenotypeAllele
import org.bdgenomics.utils.instrumentation.Metrics
import scala.io.Source

object Timers extends Metrics {

// timers for setup code
val mungeVcf = timer("Munging whitespace in VCF")
val convertVcfToParquet = timer("Converting VCF to Parquet")
val loadMetadata = timer("Loading sample metadata")

// timers for running the query in pure spark (rdd's)
val pureSparkIntegrate = timer("Running integration query in pure Spark")
val pureSparkWithCachingIntegrate = timer("Running integration query in pure Spark with cached input")
val broadcastMetadata = timer("Broadcasting metadata")
val cacheInputRdd = timer("Loading and caching input RDD")
val queryRdd = timer("Querying the RDD")
}

// pull in all the timers
import Timers._

// convert VCF into ADAM genotypes and save to disk
convertVcfToParquet.time {
sc.loadGenotypes("%s/%s".format(dataDir, vcfFile))
.saveAsParquet("%s/chr22.annotated.gt.adam".format(dataDir))
}

case class Sample(sampleId: String, population: String)

// parse the sample metadata
val (samples, relations) = loadMetadata.time {

// make hadoop paths for the metadata files
val relationsPath = new Path("%s/%s".format(dataDir, relatednessFile))
val samplePath = new Path("%s/%s".format(dataDir, populationsFile))

// get the filesystem for the sample metadata
val fs = relationsPath.getFileSystem(sc.hadoopConfiguration)

// open the two files
val relationsStream = fs.open(relationsPath)
val sampleStream = fs.open(relationsPath)

// extract the cryptic relatedness data
val parsedRelations = Source.fromInputStream(relationsStream)
.getLines
.drop(4) // first 4 lines are whitespace
.flatMap(line => {
// line is tab delimited:
// Population Sample 1 Sample 2 Relationship IBD0 IBD1 IBD2
val splitLine = line.split("\t")

if (splitLine.length != 7) {
Iterable.empty[Sample]
} else {
val population = splitLine(0)

// exclude samples in ASW that are siblings
// we will not filter them out
if (population == "ASW" &&
splitLine(3).startsWith("Sibling")) {
Iterable.empty[Sample]
} else {
Iterable(Sample(splitLine(1), population),
Sample(splitLine(2), population))
}
}
}).toSeq

// extract the population labels
val parsedPopulations = Source.fromInputStream(sampleStream)
.getLines
.flatMap(line => {
// line is tab delimited:
// SampleId Population ? SequencerPlatform
val splitLine = line.split("\t")

if (splitLine.length > 2) {
Some(Sample(splitLine(0), splitLine(1)))
} else {
None
}
}).toSeq

// Scala.io.source is lazy --> force materialization
println("Have %d related samples across %d populations.".format(parsedRelations.size,
parsedPopulations.size))

(parsedPopulations, parsedRelations)
}

// load the rdd from disk
val genotypes = sc.loadParquetGenotypes("%s/chr22.annotated.gt.adam".format(dataDir))

// munge and broadcast the sample metadata
val bcastSampleToPopulationMap = broadcastMetadata.time {
val relatedSamples = relations.map(_.sampleId).toSet
val sampleToPopulationMap = samples.filter(sample => relatedSamples(sample.sampleId))
.map(sample => (sample.sampleId, sample.population))
.toArray
.toMap
sc.broadcast(sampleToPopulationMap)
}

// are we caching the data? if so, cache it now
val cachedGenotypes = cacheInputRdd.time {

// cache the genotypes and force the genotypes to be materialized into
// memory by doing a count
val cachedGenotypes = genotypes.transform(_.cache)
println("Have %d genotypes.".format(cachedGenotypes.rdd.count))

cachedGenotypes
}

val variantsByPopulation = queryRdd.time {
cachedGenotypes.rdd
.flatMap(gt => {
bcastSampleToPopulationMap.value
.get(gt.getSampleId)
.map(population => (population, gt))
}).filter(kv => {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there would be utility in filterBy method(s) on VariantRDD/GenotypeRDD to make this easier to read? The filter would need to happen before the flatMap to (population, gt) then. Is there much performance difference between a compound filter like this with lots of expressions and a series of filter calls with smaller expressions?

val (_, gt) = kv
((gt.getAlleles.contains(GenotypeAllele.REF) &&
gt.getAlleles.contains(GenotypeAllele.ALT)) && // is het
(Option(gt.getReadDepth).map(i => i: Int).orElse({
(Option(gt.getReferenceReadDepth), Option(gt.getAlternateReadDepth)) match {
case (Some(refDepth), Some(altDepth)) => Some(refDepth + altDepth)
case _ => None
}
}).fold(true)(_ > 30)) && // DP or sum(AD) is > 30, if provided
Option(gt.getAlternateReadDepth).fold(false)(_ > 10) && // alt depth > 10
Option(gt.getVariant.getAnnotation.getAttributes.get("SAVANT_IMPACT")).filter(impact => {
impact == "HIGH" || impact == "MEDIUM"
}).isDefined &&
Option(gt.getVariant.getAnnotation.getAttributes.get("ExAC.Info.AF")).fold(true)(afString => {
try {
afString.toFloat < 0.1
} catch {
case t: Throwable => {
true
}
}
}))
}).map(kv => {
val (population, gt) = kv
(ReferenceRegion(gt), population, gt.getVariant.getAlternateAllele)
}).distinct
.map(t => {
val (_, population, _) = t
}).countByValue
}
}

println(variantsByPopulation.mkString("\n"))
102 changes: 102 additions & 0 deletions submissions/bdgenomics/scripts/2_continuous.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/**
*
*/
val dataDir = "/data/variant_db/2"

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some metrics to this script?

//val na12878 = "NA12878.chr22.g.vcf.gz"
//val na12891 = "NA12891.chr22.g.vcf.gz"
//val na12892 = "NA12892.chr22.g.vcf.gz"
val na12878 = "NA12878.chr22.g.vcf"
val na12891 = "NA12891.chr22.g.vcf"
val na12892 = "NA12892.chr22.g.vcf"

import htsjdk.samtools.ValidationStringency
import org.apache.spark.rdd.MetricsContext._
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.variant.VariantRDD
import org.bdgenomics.formats.avro.GenotypeAllele
import scala.collection.JavaConversions._

// convert data into parquet
val partitions = 64
//sc.loadGenotypes("%s/%s".format(dataDir, na12878)).transform(_.repartition(partitions)).saveAsParquet("%s/NA12878.gt.adam".format(dataDir))
//sc.loadGenotypes("%s/%s".format(dataDir, na12891)).transform(_.repartition(partitions)).saveAsParquet("%s/NA12891.gt.adam".format(dataDir))
//sc.loadGenotypes("%s/%s".format(dataDir, na12892)).transform(_.repartition(partitions)).saveAsParquet("%s/NA12892.gt.adam".format(dataDir))
sc.loadGenotypes("%s/%s".format(dataDir, na12878)).saveAsParquet("%s/NA12878.gt.adam".format(dataDir))
sc.loadGenotypes("%s/%s".format(dataDir, na12891)).saveAsParquet("%s/NA12891.gt.adam".format(dataDir))
sc.loadGenotypes("%s/%s".format(dataDir, na12892)).saveAsParquet("%s/NA12892.gt.adam".format(dataDir))

// reload datasets and cache
val na12878Gts = sc.loadGenotypes("%s/NA12878.gt.adam".format(dataDir)).transform(_.cache())
val otherGts = sc.loadGenotypes("%s/NA1289*.gt.adam/*".format(dataDir)).transform(_.cache())

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also asked this elsewhere, why the /* in the glob pattern?


// force materialization
println("NA12878 has %d genotypes.".format(na12878Gts.rdd.count))
println("NA12891/2 has %d genotypes.".format(otherGts.rdd.count))

// filter high quality genotypes, hom alt in 12878, hom ref in others
val gqThreshold = 20
val na12878HomAltGts = na12878Gts.transform(rdd => {
rdd.filter(gt => {
val gtCalls = gt.getAlleles
Option(gt.getGenotypeQuality).fold(false)(_ >= gqThreshold) &&
!gtCalls.isEmpty &&
gtCalls.forall(_ == GenotypeAllele.ALT)
})
})
val otherHomRefGts = otherGts.transform(rdd => {
rdd.filter(gt => {
val gtCalls = gt.getAlleles
!gtCalls.isEmpty &&
gtCalls.forall(_ == GenotypeAllele.REF) &&
Option(gt.getGenotypeQuality).fold(false)(gq => {
if (gq == 0) {
// this is fun code.
//
// the tool that generated the gVCFs we are working with occasionally writes out
// GQ = 0 for Hom REF gVCF lines where the Ref/Ref PL entry is high confidence.
//
// what is likely happening here can be inferred from one of the suspect lines,
// from the NA12892 VCF:
// chr22 24300634 . G <NON_REF> . . END=24300634 GT:DP:GQ:MIN_DP:PL 0/0:24:0:24:0,0,713
//
// if you look at the PL field, Ref/Ref PL is very high while Alt/Alt and Alt/Ref PLs are 0.
// probably what is happening is that when the tool is calculating the GQ from PLs, it's doing something like
// max(PL) / sum(PL), or alternatively max(PL) / sum g in 0..m,g!=argmax_idx(PL(idx)) PL(g)
// and dumping a bad GQ
//
// since these lines are obviously strong HomRef calls, let's recover them by parsing PL's
if (gt.getVariant.getAlternateAllele == null) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In converting from VCF, we go from <NON_REF> symbolic allele to null variant alternateAllele? For going back to VCF, is there any other reason why variant alternateAllele might be null? Do we write "<NON_REF>" when variant alternateAllele is null or do we rely on htsjdk to do that for us?

val nonReferenceLikelihoods = gt.getNonReferenceLikelihoods.toSeq
nonReferenceLikelihoods.dropRight(1).forall(_ == java.lang.Double.NEGATIVE_INFINITY) &&
nonReferenceLikelihoods.last != java.lang.Double.NEGATIVE_INFINITY
} else {
false
}
} else {
gq >= gqThreshold
}
})
})
})

// now, do a broadcast region join
val joinedGts = na12878HomAltGts.broadcastRegionJoin(otherHomRefGts)

// and then take the variants that show up in two samples
//
// in a simple world, we would just count the number of home ref genotypes
// that overlapped a given hom alt site. howaaaaaayver, a poorly formatted
// gVCF "could" contain multiple overlapping records at a single site.
// this would occur if we scored two different alts, and emitted hom ref
// calls against each alt.
//
// note: this happens on the test data
val selectedVariants = joinedGts.rdd.map(p => {
val (na12878Gt, gt) = p
(na12878Gt.getVariant, Set(gt.getSampleId))
}).reduceByKey(_ ++ _).filter(kv => kv._2.size == 2).keys

// and save as vcf
VariantRDD(selectedVariants,
na12878Gts.sequences).saveAsVcf("%s/selectedVariants.vcf".format(dataDir),
asSingleFile = true, stringency = ValidationStringency.LENIENT)