-
Notifications
You must be signed in to change notification settings - Fork 6
WIP ADAM queries. #6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,168 @@ | ||
| /** | ||
| * | ||
| */ | ||
| val dataDir = "/data/variant_db/1" | ||
| val relatednessFile = "README.sample_cryptic_relations" | ||
| val populationsFile = "phase1_integrated_calls.20101123.ALL.panel" | ||
| val vcfFile = "1KG.chr22.anno.infocol.vcf" | ||
|
|
||
| import org.apache.hadoop.fs.{ FileSystem, Path } | ||
| import org.apache.spark.rdd.MetricsContext._ | ||
| import org.bdgenomics.adam.models.ReferenceRegion | ||
| import org.bdgenomics.adam.rdd.ADAMContext._ | ||
| import org.bdgenomics.formats.avro.GenotypeAllele | ||
| import org.bdgenomics.utils.instrumentation.Metrics | ||
| import scala.io.Source | ||
|
|
||
| object Timers extends Metrics { | ||
|
|
||
| // timers for setup code | ||
| val mungeVcf = timer("Munging whitespace in VCF") | ||
| val convertVcfToParquet = timer("Converting VCF to Parquet") | ||
| val loadMetadata = timer("Loading sample metadata") | ||
|
|
||
| // timers for running the query in pure spark (rdd's) | ||
| val pureSparkIntegrate = timer("Running integration query in pure Spark") | ||
| val pureSparkWithCachingIntegrate = timer("Running integration query in pure Spark with cached input") | ||
| val broadcastMetadata = timer("Broadcasting metadata") | ||
| val cacheInputRdd = timer("Loading and caching input RDD") | ||
| val queryRdd = timer("Querying the RDD") | ||
| } | ||
|
|
||
| // pull in all the timers | ||
| import Timers._ | ||
|
|
||
| // convert VCF into ADAM genotypes and save to disk | ||
| convertVcfToParquet.time { | ||
| sc.loadGenotypes("%s/%s".format(dataDir, vcfFile)) | ||
| .saveAsParquet("%s/chr22.annotated.gt.adam".format(dataDir)) | ||
| } | ||
|
|
||
| case class Sample(sampleId: String, population: String) | ||
|
|
||
| // parse the sample metadata | ||
| val (samples, relations) = loadMetadata.time { | ||
|
|
||
| // make hadoop paths for the metadata files | ||
| val relationsPath = new Path("%s/%s".format(dataDir, relatednessFile)) | ||
| val samplePath = new Path("%s/%s".format(dataDir, populationsFile)) | ||
|
|
||
| // get the filesystem for the sample metadata | ||
| val fs = relationsPath.getFileSystem(sc.hadoopConfiguration) | ||
|
|
||
| // open the two files | ||
| val relationsStream = fs.open(relationsPath) | ||
| val sampleStream = fs.open(relationsPath) | ||
|
|
||
| // extract the cryptic relatedness data | ||
| val parsedRelations = Source.fromInputStream(relationsStream) | ||
| .getLines | ||
| .drop(4) // first 4 lines are whitespace | ||
| .flatMap(line => { | ||
| // line is tab delimited: | ||
| // Population Sample 1 Sample 2 Relationship IBD0 IBD1 IBD2 | ||
| val splitLine = line.split("\t") | ||
|
|
||
| if (splitLine.length != 7) { | ||
| Iterable.empty[Sample] | ||
| } else { | ||
| val population = splitLine(0) | ||
|
|
||
| // exclude samples in ASW that are siblings | ||
| // we will not filter them out | ||
| if (population == "ASW" && | ||
| splitLine(3).startsWith("Sibling")) { | ||
| Iterable.empty[Sample] | ||
| } else { | ||
| Iterable(Sample(splitLine(1), population), | ||
| Sample(splitLine(2), population)) | ||
| } | ||
| } | ||
| }).toSeq | ||
|
|
||
| // extract the population labels | ||
| val parsedPopulations = Source.fromInputStream(sampleStream) | ||
| .getLines | ||
| .flatMap(line => { | ||
| // line is tab delimited: | ||
| // SampleId Population ? SequencerPlatform | ||
| val splitLine = line.split("\t") | ||
|
|
||
| if (splitLine.length > 2) { | ||
| Some(Sample(splitLine(0), splitLine(1))) | ||
| } else { | ||
| None | ||
| } | ||
| }).toSeq | ||
|
|
||
| // Scala.io.source is lazy --> force materialization | ||
| println("Have %d related samples across %d populations.".format(parsedRelations.size, | ||
| parsedPopulations.size)) | ||
|
|
||
| (parsedPopulations, parsedRelations) | ||
| } | ||
|
|
||
| // load the rdd from disk | ||
| val genotypes = sc.loadParquetGenotypes("%s/chr22.annotated.gt.adam".format(dataDir)) | ||
|
|
||
| // munge and broadcast the sample metadata | ||
| val bcastSampleToPopulationMap = broadcastMetadata.time { | ||
| val relatedSamples = relations.map(_.sampleId).toSet | ||
| val sampleToPopulationMap = samples.filter(sample => relatedSamples(sample.sampleId)) | ||
| .map(sample => (sample.sampleId, sample.population)) | ||
| .toArray | ||
| .toMap | ||
| sc.broadcast(sampleToPopulationMap) | ||
| } | ||
|
|
||
| // are we caching the data? if so, cache it now | ||
| val cachedGenotypes = cacheInputRdd.time { | ||
|
|
||
| // cache the genotypes and force the genotypes to be materialized into | ||
| // memory by doing a count | ||
| val cachedGenotypes = genotypes.transform(_.cache) | ||
| println("Have %d genotypes.".format(cachedGenotypes.rdd.count)) | ||
|
|
||
| cachedGenotypes | ||
| } | ||
|
|
||
| val variantsByPopulation = queryRdd.time { | ||
| cachedGenotypes.rdd | ||
| .flatMap(gt => { | ||
| bcastSampleToPopulationMap.value | ||
| .get(gt.getSampleId) | ||
| .map(population => (population, gt)) | ||
| }).filter(kv => { | ||
| val (_, gt) = kv | ||
| ((gt.getAlleles.contains(GenotypeAllele.REF) && | ||
| gt.getAlleles.contains(GenotypeAllele.ALT)) && // is het | ||
| (Option(gt.getReadDepth).map(i => i: Int).orElse({ | ||
| (Option(gt.getReferenceReadDepth), Option(gt.getAlternateReadDepth)) match { | ||
| case (Some(refDepth), Some(altDepth)) => Some(refDepth + altDepth) | ||
| case _ => None | ||
| } | ||
| }).fold(true)(_ > 30)) && // DP or sum(AD) is > 30, if provided | ||
| Option(gt.getAlternateReadDepth).fold(false)(_ > 10) && // alt depth > 10 | ||
| Option(gt.getVariant.getAnnotation.getAttributes.get("SAVANT_IMPACT")).filter(impact => { | ||
| impact == "HIGH" || impact == "MEDIUM" | ||
| }).isDefined && | ||
| Option(gt.getVariant.getAnnotation.getAttributes.get("ExAC.Info.AF")).fold(true)(afString => { | ||
| try { | ||
| afString.toFloat < 0.1 | ||
| } catch { | ||
| case t: Throwable => { | ||
| true | ||
| } | ||
| } | ||
| })) | ||
| }).map(kv => { | ||
| val (population, gt) = kv | ||
| (ReferenceRegion(gt), population, gt.getVariant.getAlternateAllele) | ||
| }).distinct | ||
| .map(t => { | ||
| val (_, population, _) = t | ||
| }).countByValue | ||
| } | ||
| } | ||
|
|
||
| println(variantsByPopulation.mkString("\n")) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,102 @@ | ||
| /** | ||
| * | ||
| */ | ||
| val dataDir = "/data/variant_db/2" | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add some metrics to this script? |
||
| //val na12878 = "NA12878.chr22.g.vcf.gz" | ||
| //val na12891 = "NA12891.chr22.g.vcf.gz" | ||
| //val na12892 = "NA12892.chr22.g.vcf.gz" | ||
| val na12878 = "NA12878.chr22.g.vcf" | ||
| val na12891 = "NA12891.chr22.g.vcf" | ||
| val na12892 = "NA12892.chr22.g.vcf" | ||
|
|
||
| import htsjdk.samtools.ValidationStringency | ||
| import org.apache.spark.rdd.MetricsContext._ | ||
| import org.bdgenomics.adam.rdd.ADAMContext._ | ||
| import org.bdgenomics.adam.rdd.variant.VariantRDD | ||
| import org.bdgenomics.formats.avro.GenotypeAllele | ||
| import scala.collection.JavaConversions._ | ||
|
|
||
| // convert data into parquet | ||
| val partitions = 64 | ||
| //sc.loadGenotypes("%s/%s".format(dataDir, na12878)).transform(_.repartition(partitions)).saveAsParquet("%s/NA12878.gt.adam".format(dataDir)) | ||
| //sc.loadGenotypes("%s/%s".format(dataDir, na12891)).transform(_.repartition(partitions)).saveAsParquet("%s/NA12891.gt.adam".format(dataDir)) | ||
| //sc.loadGenotypes("%s/%s".format(dataDir, na12892)).transform(_.repartition(partitions)).saveAsParquet("%s/NA12892.gt.adam".format(dataDir)) | ||
| sc.loadGenotypes("%s/%s".format(dataDir, na12878)).saveAsParquet("%s/NA12878.gt.adam".format(dataDir)) | ||
| sc.loadGenotypes("%s/%s".format(dataDir, na12891)).saveAsParquet("%s/NA12891.gt.adam".format(dataDir)) | ||
| sc.loadGenotypes("%s/%s".format(dataDir, na12892)).saveAsParquet("%s/NA12892.gt.adam".format(dataDir)) | ||
|
|
||
| // reload datasets and cache | ||
| val na12878Gts = sc.loadGenotypes("%s/NA12878.gt.adam".format(dataDir)).transform(_.cache()) | ||
| val otherGts = sc.loadGenotypes("%s/NA1289*.gt.adam/*".format(dataDir)).transform(_.cache()) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also asked this elsewhere, why the |
||
|
|
||
| // force materialization | ||
| println("NA12878 has %d genotypes.".format(na12878Gts.rdd.count)) | ||
| println("NA12891/2 has %d genotypes.".format(otherGts.rdd.count)) | ||
|
|
||
| // filter high quality genotypes, hom alt in 12878, hom ref in others | ||
| val gqThreshold = 20 | ||
| val na12878HomAltGts = na12878Gts.transform(rdd => { | ||
| rdd.filter(gt => { | ||
| val gtCalls = gt.getAlleles | ||
| Option(gt.getGenotypeQuality).fold(false)(_ >= gqThreshold) && | ||
| !gtCalls.isEmpty && | ||
| gtCalls.forall(_ == GenotypeAllele.ALT) | ||
| }) | ||
| }) | ||
| val otherHomRefGts = otherGts.transform(rdd => { | ||
| rdd.filter(gt => { | ||
| val gtCalls = gt.getAlleles | ||
| !gtCalls.isEmpty && | ||
| gtCalls.forall(_ == GenotypeAllele.REF) && | ||
| Option(gt.getGenotypeQuality).fold(false)(gq => { | ||
| if (gq == 0) { | ||
| // this is fun code. | ||
| // | ||
| // the tool that generated the gVCFs we are working with occasionally writes out | ||
| // GQ = 0 for Hom REF gVCF lines where the Ref/Ref PL entry is high confidence. | ||
| // | ||
| // what is likely happening here can be inferred from one of the suspect lines, | ||
| // from the NA12892 VCF: | ||
| // chr22 24300634 . G <NON_REF> . . END=24300634 GT:DP:GQ:MIN_DP:PL 0/0:24:0:24:0,0,713 | ||
| // | ||
| // if you look at the PL field, Ref/Ref PL is very high while Alt/Alt and Alt/Ref PLs are 0. | ||
| // probably what is happening is that when the tool is calculating the GQ from PLs, it's doing something like | ||
| // max(PL) / sum(PL), or alternatively max(PL) / sum g in 0..m,g!=argmax_idx(PL(idx)) PL(g) | ||
| // and dumping a bad GQ | ||
| // | ||
| // since these lines are obviously strong HomRef calls, let's recover them by parsing PL's | ||
| if (gt.getVariant.getAlternateAllele == null) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In converting from VCF, we go from |
||
| val nonReferenceLikelihoods = gt.getNonReferenceLikelihoods.toSeq | ||
| nonReferenceLikelihoods.dropRight(1).forall(_ == java.lang.Double.NEGATIVE_INFINITY) && | ||
| nonReferenceLikelihoods.last != java.lang.Double.NEGATIVE_INFINITY | ||
| } else { | ||
| false | ||
| } | ||
| } else { | ||
| gq >= gqThreshold | ||
| } | ||
| }) | ||
| }) | ||
| }) | ||
|
|
||
| // now, do a broadcast region join | ||
| val joinedGts = na12878HomAltGts.broadcastRegionJoin(otherHomRefGts) | ||
|
|
||
| // and then take the variants that show up in two samples | ||
| // | ||
| // in a simple world, we would just count the number of home ref genotypes | ||
| // that overlapped a given hom alt site. howaaaaaayver, a poorly formatted | ||
| // gVCF "could" contain multiple overlapping records at a single site. | ||
| // this would occur if we scored two different alts, and emitted hom ref | ||
| // calls against each alt. | ||
| // | ||
| // note: this happens on the test data | ||
| val selectedVariants = joinedGts.rdd.map(p => { | ||
| val (na12878Gt, gt) = p | ||
| (na12878Gt.getVariant, Set(gt.getSampleId)) | ||
| }).reduceByKey(_ ++ _).filter(kv => kv._2.size == 2).keys | ||
|
|
||
| // and save as vcf | ||
| VariantRDD(selectedVariants, | ||
| na12878Gts.sequences).saveAsVcf("%s/selectedVariants.vcf".format(dataDir), | ||
| asSingleFile = true, stringency = ValidationStringency.LENIENT) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if there would be utility in
filterBymethod(s) on VariantRDD/GenotypeRDD to make this easier to read? The filter would need to happen before the flatMap to(population, gt)then. Is there much performance difference between a compound filter like this with lots of expressions and a series offiltercalls with smaller expressions?