diff --git a/submissions/bdgenomics/scripts/1_integration.scala b/submissions/bdgenomics/scripts/1_integration.scala new file mode 100644 index 0000000..a3564c5 --- /dev/null +++ b/submissions/bdgenomics/scripts/1_integration.scala @@ -0,0 +1,168 @@ +/** + * + */ +val dataDir = "/data/variant_db/1" +val relatednessFile = "README.sample_cryptic_relations" +val populationsFile = "phase1_integrated_calls.20101123.ALL.panel" +val vcfFile = "1KG.chr22.anno.infocol.vcf" + +import org.apache.hadoop.fs.{ FileSystem, Path } +import org.apache.spark.rdd.MetricsContext._ +import org.bdgenomics.adam.models.ReferenceRegion +import org.bdgenomics.adam.rdd.ADAMContext._ +import org.bdgenomics.formats.avro.GenotypeAllele +import org.bdgenomics.utils.instrumentation.Metrics +import scala.io.Source + +object Timers extends Metrics { + + // timers for setup code + val mungeVcf = timer("Munging whitespace in VCF") + val convertVcfToParquet = timer("Converting VCF to Parquet") + val loadMetadata = timer("Loading sample metadata") + + // timers for running the query in pure spark (rdd's) + val pureSparkIntegrate = timer("Running integration query in pure Spark") + val pureSparkWithCachingIntegrate = timer("Running integration query in pure Spark with cached input") + val broadcastMetadata = timer("Broadcasting metadata") + val cacheInputRdd = timer("Loading and caching input RDD") + val queryRdd = timer("Querying the RDD") +} + +// pull in all the timers +import Timers._ + +// convert VCF into ADAM genotypes and save to disk +convertVcfToParquet.time { + sc.loadGenotypes("%s/%s".format(dataDir, vcfFile)) + .saveAsParquet("%s/chr22.annotated.gt.adam".format(dataDir)) +} + +case class Sample(sampleId: String, population: String) + +// parse the sample metadata +val (samples, relations) = loadMetadata.time { + + // make hadoop paths for the metadata files + val relationsPath = new Path("%s/%s".format(dataDir, relatednessFile)) + val samplePath = new Path("%s/%s".format(dataDir, populationsFile)) + + // get the filesystem for the sample metadata + val fs = relationsPath.getFileSystem(sc.hadoopConfiguration) + + // open the two files + val relationsStream = fs.open(relationsPath) + val sampleStream = fs.open(relationsPath) + + // extract the cryptic relatedness data + val parsedRelations = Source.fromInputStream(relationsStream) + .getLines + .drop(4) // first 4 lines are whitespace + .flatMap(line => { + // line is tab delimited: + // Population Sample 1 Sample 2 Relationship IBD0 IBD1 IBD2 + val splitLine = line.split("\t") + + if (splitLine.length != 7) { + Iterable.empty[Sample] + } else { + val population = splitLine(0) + + // exclude samples in ASW that are siblings + // we will not filter them out + if (population == "ASW" && + splitLine(3).startsWith("Sibling")) { + Iterable.empty[Sample] + } else { + Iterable(Sample(splitLine(1), population), + Sample(splitLine(2), population)) + } + } + }).toSeq + + // extract the population labels + val parsedPopulations = Source.fromInputStream(sampleStream) + .getLines + .flatMap(line => { + // line is tab delimited: + // SampleId Population ? SequencerPlatform + val splitLine = line.split("\t") + + if (splitLine.length > 2) { + Some(Sample(splitLine(0), splitLine(1))) + } else { + None + } + }).toSeq + + // Scala.io.source is lazy --> force materialization + println("Have %d related samples across %d populations.".format(parsedRelations.size, + parsedPopulations.size)) + + (parsedPopulations, parsedRelations) +} + +// load the rdd from disk +val genotypes = sc.loadParquetGenotypes("%s/chr22.annotated.gt.adam".format(dataDir)) + +// munge and broadcast the sample metadata +val bcastSampleToPopulationMap = broadcastMetadata.time { + val relatedSamples = relations.map(_.sampleId).toSet + val sampleToPopulationMap = samples.filter(sample => relatedSamples(sample.sampleId)) + .map(sample => (sample.sampleId, sample.population)) + .toArray + .toMap + sc.broadcast(sampleToPopulationMap) +} + +// are we caching the data? if so, cache it now +val cachedGenotypes = cacheInputRdd.time { + + // cache the genotypes and force the genotypes to be materialized into + // memory by doing a count + val cachedGenotypes = genotypes.transform(_.cache) + println("Have %d genotypes.".format(cachedGenotypes.rdd.count)) + + cachedGenotypes +} + +val variantsByPopulation = queryRdd.time { + cachedGenotypes.rdd + .flatMap(gt => { + bcastSampleToPopulationMap.value + .get(gt.getSampleId) + .map(population => (population, gt)) + }).filter(kv => { + val (_, gt) = kv + ((gt.getAlleles.contains(GenotypeAllele.REF) && + gt.getAlleles.contains(GenotypeAllele.ALT)) && // is het + (Option(gt.getReadDepth).map(i => i: Int).orElse({ + (Option(gt.getReferenceReadDepth), Option(gt.getAlternateReadDepth)) match { + case (Some(refDepth), Some(altDepth)) => Some(refDepth + altDepth) + case _ => None + } + }).fold(true)(_ > 30)) && // DP or sum(AD) is > 30, if provided + Option(gt.getAlternateReadDepth).fold(false)(_ > 10) && // alt depth > 10 + Option(gt.getVariant.getAnnotation.getAttributes.get("SAVANT_IMPACT")).filter(impact => { + impact == "HIGH" || impact == "MEDIUM" + }).isDefined && + Option(gt.getVariant.getAnnotation.getAttributes.get("ExAC.Info.AF")).fold(true)(afString => { + try { + afString.toFloat < 0.1 + } catch { + case t: Throwable => { + true + } + } + })) + }).map(kv => { + val (population, gt) = kv + (ReferenceRegion(gt), population, gt.getVariant.getAlternateAllele) + }).distinct + .map(t => { + val (_, population, _) = t + }).countByValue + } + } + +println(variantsByPopulation.mkString("\n")) \ No newline at end of file diff --git a/submissions/bdgenomics/scripts/2_continuous.scala b/submissions/bdgenomics/scripts/2_continuous.scala new file mode 100644 index 0000000..0248b4d --- /dev/null +++ b/submissions/bdgenomics/scripts/2_continuous.scala @@ -0,0 +1,102 @@ +/** + * + */ +val dataDir = "/data/variant_db/2" +//val na12878 = "NA12878.chr22.g.vcf.gz" +//val na12891 = "NA12891.chr22.g.vcf.gz" +//val na12892 = "NA12892.chr22.g.vcf.gz" +val na12878 = "NA12878.chr22.g.vcf" +val na12891 = "NA12891.chr22.g.vcf" +val na12892 = "NA12892.chr22.g.vcf" + +import htsjdk.samtools.ValidationStringency +import org.apache.spark.rdd.MetricsContext._ +import org.bdgenomics.adam.rdd.ADAMContext._ +import org.bdgenomics.adam.rdd.variant.VariantRDD +import org.bdgenomics.formats.avro.GenotypeAllele +import scala.collection.JavaConversions._ + +// convert data into parquet +val partitions = 64 +//sc.loadGenotypes("%s/%s".format(dataDir, na12878)).transform(_.repartition(partitions)).saveAsParquet("%s/NA12878.gt.adam".format(dataDir)) +//sc.loadGenotypes("%s/%s".format(dataDir, na12891)).transform(_.repartition(partitions)).saveAsParquet("%s/NA12891.gt.adam".format(dataDir)) +//sc.loadGenotypes("%s/%s".format(dataDir, na12892)).transform(_.repartition(partitions)).saveAsParquet("%s/NA12892.gt.adam".format(dataDir)) +sc.loadGenotypes("%s/%s".format(dataDir, na12878)).saveAsParquet("%s/NA12878.gt.adam".format(dataDir)) +sc.loadGenotypes("%s/%s".format(dataDir, na12891)).saveAsParquet("%s/NA12891.gt.adam".format(dataDir)) +sc.loadGenotypes("%s/%s".format(dataDir, na12892)).saveAsParquet("%s/NA12892.gt.adam".format(dataDir)) + +// reload datasets and cache +val na12878Gts = sc.loadGenotypes("%s/NA12878.gt.adam".format(dataDir)).transform(_.cache()) +val otherGts = sc.loadGenotypes("%s/NA1289*.gt.adam/*".format(dataDir)).transform(_.cache()) + +// force materialization +println("NA12878 has %d genotypes.".format(na12878Gts.rdd.count)) +println("NA12891/2 has %d genotypes.".format(otherGts.rdd.count)) + +// filter high quality genotypes, hom alt in 12878, hom ref in others +val gqThreshold = 20 +val na12878HomAltGts = na12878Gts.transform(rdd => { + rdd.filter(gt => { + val gtCalls = gt.getAlleles + Option(gt.getGenotypeQuality).fold(false)(_ >= gqThreshold) && + !gtCalls.isEmpty && + gtCalls.forall(_ == GenotypeAllele.ALT) + }) +}) +val otherHomRefGts = otherGts.transform(rdd => { + rdd.filter(gt => { + val gtCalls = gt.getAlleles + !gtCalls.isEmpty && + gtCalls.forall(_ == GenotypeAllele.REF) && + Option(gt.getGenotypeQuality).fold(false)(gq => { + if (gq == 0) { + // this is fun code. + // + // the tool that generated the gVCFs we are working with occasionally writes out + // GQ = 0 for Hom REF gVCF lines where the Ref/Ref PL entry is high confidence. + // + // what is likely happening here can be inferred from one of the suspect lines, + // from the NA12892 VCF: + // chr22 24300634 . G . . END=24300634 GT:DP:GQ:MIN_DP:PL 0/0:24:0:24:0,0,713 + // + // if you look at the PL field, Ref/Ref PL is very high while Alt/Alt and Alt/Ref PLs are 0. + // probably what is happening is that when the tool is calculating the GQ from PLs, it's doing something like + // max(PL) / sum(PL), or alternatively max(PL) / sum g in 0..m,g!=argmax_idx(PL(idx)) PL(g) + // and dumping a bad GQ + // + // since these lines are obviously strong HomRef calls, let's recover them by parsing PL's + if (gt.getVariant.getAlternateAllele == null) { + val nonReferenceLikelihoods = gt.getNonReferenceLikelihoods.toSeq + nonReferenceLikelihoods.dropRight(1).forall(_ == java.lang.Double.NEGATIVE_INFINITY) && + nonReferenceLikelihoods.last != java.lang.Double.NEGATIVE_INFINITY + } else { + false + } + } else { + gq >= gqThreshold + } + }) + }) +}) + +// now, do a broadcast region join +val joinedGts = na12878HomAltGts.broadcastRegionJoin(otherHomRefGts) + +// and then take the variants that show up in two samples +// +// in a simple world, we would just count the number of home ref genotypes +// that overlapped a given hom alt site. howaaaaaayver, a poorly formatted +// gVCF "could" contain multiple overlapping records at a single site. +// this would occur if we scored two different alts, and emitted hom ref +// calls against each alt. +// +// note: this happens on the test data +val selectedVariants = joinedGts.rdd.map(p => { + val (na12878Gt, gt) = p + (na12878Gt.getVariant, Set(gt.getSampleId)) + }).reduceByKey(_ ++ _).filter(kv => kv._2.size == 2).keys + +// and save as vcf +VariantRDD(selectedVariants, + na12878Gts.sequences).saveAsVcf("%s/selectedVariants.vcf".format(dataDir), + asSingleFile = true, stringency = ValidationStringency.LENIENT) \ No newline at end of file