#!/usr/bin/env python # Copyright (C) 2017 by the University of Bristol # Contact: Mark Rogers # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # """ fathmm_xf_query.py: Predict the Pathogenic Potential of Single Nucleotide Variants (SNVs) """ # script depends only on standard Python libraries import optparse,os,sys,subprocess,time # CScape databases: DEFAULT_CODING_DB = 'fathmm_xf_coding.vcf.gz' DEFAULT_NONCODING_DB = 'fathmm_xf_noncoding.vcf.gz' NUCLEOTIDES = 'ACGT' # Prediction types CODING_PRED = 'coding' NONCODING_PRED = 'noncoding' # Confidence thresholds DEFAULT_THRESH = 0.5 CODING_HIGH_THRESH = 0.97 NONCODING_HIGH_THRESH = 0.96 CODING_LOW_THRESH = 1-CODING_HIGH_THRESH NONCODING_LOW_THRESH = 1-NONCODING_HIGH_THRESH # Confidence ratings PREDICTION_STATUS = [NO_PRED, HIGH_NEG, LOW_NEG, LOW_POS, HIGH_POS] = range(5) PREDICTION_LEVEL = {NO_PRED : '', HIGH_NEG : 'high', LOW_NEG : 'low', LOW_POS : 'low', HIGH_POS : 'high'} PREDICTION_CLASS = {NO_PRED : 'None', HIGH_NEG : 'neutral', LOW_NEG : 'neutral', LOW_POS : 'pathogenic', HIGH_POS : 'pathogenic'} # Error messages BAD_CHROM = "Error: unexpected chromosome" BAD_POS = "Error: unexpected position" BAD_BASE = "\nError: reference and mutant nucleotides must both be ACGT\n%s\n\n" BAD_MUTANT = "\nError: reference and mutant nucleotides must be different\n%s\n\n" TABIX_WARN = "** Warning: cannot find tabix executable in your path\n" DEFAULT_PRED = ['', '', NO_PRED] class ProgressIndicator(object) : """A simple progress indicator.""" def __init__(self, increment, verbose=True) : self.limit = increment self.barlim = int(self.limit/10) self.dotlim = int(self.barlim/5) self.started = False self.verbose = verbose self.ctr = 0 def finish(self) : """Finishes the progress output by appending a newline, if anything has been written.""" if self.started : sys.stderr.write('\n') self.started = False def update(self) : """Updates the indicator.""" self.ctr += 1 if not self.verbose : return if self.ctr % self.limit == 0 : sys.stderr.write('%d\n' % self.ctr) elif self.ctr % self.barlim == 0 : sys.stderr.write('|') elif self.ctr % self.dotlim == 0 : self.started = True sys.stderr.write('.') def confidence(s, high, low) : """Returns the confidence level for (neutral, low-confidence or high-confidence) for the given p-value string using the given threshold. If the string is invalid, returns 'no prediction'""" try : x = float(s) if x >= high : return HIGH_POS elif x <= low : return HIGH_NEG elif x >= DEFAULT_THRESH : return LOW_POS else : return LOW_NEG except Exception : return NO_PRED def findFile(name, path, delim=':') : """Finds the first instance of a file name in the given path string.""" paths = path.split(delim) for p in paths : filePath = os.path.join(p, name) if os.path.exists(filePath) and os.path.isfile(filePath) : return filePath def getPrediction(recordList, query, predictionType) : """Returns a full prediction result for the given record list and query.""" (chrom,pos,reference,mutant) = query result = list(DEFAULT_PRED) if not recordList : return result for record in recordList.split("\n"): if not record: continue parts = record.strip().split("\t") result = list(DEFAULT_PRED) if parts[0] != chrom: raise ValueError(BAD_CHROM) elif int(parts[1]) != pos : #raise ValueError(BAD_POS) continue elif parts[2] != reference: sys.stderr.write("Error: invalid reference allele %s (expected %s) at %s:%s\n" % (reference, parts[3], chrom, pos)) return None break elif parts[3] == mutant: # SUCCESS! if predictionType == CODING_PRED : result = [parts[4], '', confidence(parts[4], CODING_HIGH_THRESH, CODING_LOW_THRESH)] else : result = ['', parts[4], confidence(parts[4], NONCODING_HIGH_THRESH, NONCODING_LOW_THRESH)] break return result def runTabix(db, query, predictionType) : """Spawns a subprocess to run tabix and query the given database.""" (chrom,pos,reference,mutant) = query cmd = "tabix %s %s:%d-%d" % (db, chrom, pos, pos+1) proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) result, err = proc.communicate() if err: raise IOError("** Error running %s query for %s on %s" % (predictionType, queryString, db)) return result def timeString(s, format_string='%X') : """Returns the input string with user-readable a timestamp prefix.""" timestamp = time.strftime(format_string, time.localtime()) result = '%s %s' % (timestamp, s) return result def validateFile(path) : """Standard method for validating file paths.""" if not path : raise Exception("'%s' is not a valid file path; exiting." % path) if not os.path.exists(path) : raise Exception("File '%s' not found; exiting." % path) USAGE = """%prog query-file [options] Predict the pathogenic potential of single nucleotide variants (SNVs). The query file must be a list of queries in VCF format. Note: the id column and columns beyond the first five are ignored. chromosome position id reference mutant ... Example: 1 69094 . G A 11 168961 . T A 18 119888 . G A X 200855 . A C""" # Establish command-line options: parser = optparse.OptionParser(usage=USAGE) parser.add_option('-c', dest='cdb', default=DEFAULT_CODING_DB, help='CScape coding database [default: %default]') parser.add_option('-n', dest='ndb', default=DEFAULT_NONCODING_DB, help='CScape noncoding database [default: %default]') parser.add_option('-o', dest='output', default=None, help='Output file [default: stdout]') parser.add_option('-v', dest='verbose', default=False, help='Verbose mode [default: %default]', action='store_true') opts, args = parser.parse_args(sys.argv[1:]) MIN_ARGS = 1 if len(args) != MIN_ARGS : parser.print_help() sys.exit(1) queryFile = args[0] for f in [queryFile,opts.cdb,opts.ndb] : validateFile(f) # Note: script depends only on standard Python libraries so users need not install # the tabix module; however the tabix executable must be available try : tabixExe = findFile('tabix', os.environ['PATH']) if not tabixExe : sys.stderr.write(TABIX_WARN) except KeyError : sys.stderr.write(TABIX_WARN) # Validate all queries first if opts.verbose : sys.stderr.write('Validating queries\n') queryList = [] for queryString in open(queryFile,'r') : queryString = queryString.strip() if not queryString or queryString.startswith("#"): continue # Ensure chromosome and nucleotides are upper case queryParts = queryString.upper().split('\t') # validate query format if len(queryParts) < 5 : raise ValueError("Error: invalid query format '%s'" % queryString) try : pos = int(queryParts[1]) except: raise ValueError("Error: invalid position %s in '%s'" % (queryParts[1], queryString)) chrom = queryParts[0].strip().replace('CHR','') reference = queryParts[3] mutant = queryParts[4] if reference not in NUCLEOTIDES : sys.stderr.write(BAD_BASE % queryString) sys.exit(1) if mutant not in NUCLEOTIDES : sys.stderr.write(BAD_BASE % queryString) sys.exit(1) if reference == mutant : sys.stderr.write(BAD_MUTANT % queryString) sys.exit(1) query = (chrom.strip(), pos, reference, mutant) queryList.append(query) # run queries if opts.verbose : sys.stderr.write('Processing %d queries\n' % len(queryList)) indicator = ProgressIndicator(1000, verbose=opts.verbose) outStream = sys.stdout if not opts.output else open(opts.output,'w') outStream.write('%s\n' % '\t'.join(['# Chromosome','Position','Reference','Mutant','Coding','Noncoding','Prediction','Confidence'])) for query in queryList : indicator.update() # first try coding prediction ... predType = CODING_PRED data = runTabix(opts.cdb, query, predType) if not data : # try noncoding prediction ... predType = NONCODING_PRED data = runTabix(opts.ndb, query, predType) # get prediction columns predList = getPrediction(data, query, predType) if predList is None : continue predCode = predList[2] outStream.write('%s\t%d\t%s\t%s\t' % query) outStream.write('%s\t%s\t%s\t%s\n' % (predList[0], predList[1], PREDICTION_CLASS[predCode], PREDICTION_LEVEL[predCode])) indicator.finish()