-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstep2.py
More file actions
171 lines (148 loc) · 5.87 KB
/
step2.py
File metadata and controls
171 lines (148 loc) · 5.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# CS466 Mini Project (Bioinformatics)
# Edward, Hank, William
# Step 2
import numpy as np
import operator
import sys
import time
import os
# Seed
np.random.seed(seed=1)
base_list = ['A', 'C', 'G', 'T']
def read_sequence_file(num):
sequence_list = []
f = open('./data_sets/{}/{}/sequences.fa'.format(data_folder, str(num)), 'r')
for i, line in enumerate(f):
if i == 0:
continue
sequence_list += [line.strip()]
f.close()
return sequence_list
def initialize_random_positions():
'''
Initializes a random starting position for each sequence.
'''
sequence_length = len(sequence_list[0])
# The highest possible starting index.
highest_index = sequence_length - ml
return [np.random.choice(range(highest_index + 1)
) for sequence in sequence_list]
def build_profile_matrix(position_list, i):
'''
Build a PWM for every sequence except i.
'''
# Initialize weight matrix.
pwm = [np.zeros(4) for j in range(ml)]
for sequence_i, sequence in enumerate(sequence_list):
# Skip the ith sequence.
if sequence_i == i:
continue
position = position_list[sequence_i]
possible_motif = sequence[position:position+ml]
for base_i, base in enumerate(possible_motif):
pwm[base_i][base_list.index(base)] += 1
# Normalize each sequence position's probability.
pwm = np.array([row / np.sum(row) for row in pwm])
return pwm
def string_score(subsequence, pwm):
'''
Given a subsequence, compute the score according to the profile matrix.
'''
score_total = 0.0
for base_i, base in enumerate(subsequence):
base_prob = pwm[base_i][base_list.index(base)]
if base_prob == 0:
continue
score_total += np.log(base_prob / 0.25)
return score_total
def find_best_index(sequence, pwm):
'''
Given a sequence and a PWM, find the index where the PWM best matches the
sequence. Probabilistically choose the index.
'''
index_prob_list = []
# Loop over possible starting indices.
for x_i in range(len(sequence) - ml + 1):
possible_motif = sequence[x_i:x_i+ml]
current_score = string_score(possible_motif, pwm)
index_prob_list += [current_score]
# assert len(index_prob_list) == ml
# Normalize the score list with softmax.
exp_list = np.exp(index_prob_list)
index_prob_list = exp_list / np.sum(exp_list)
best_index = np.random.choice(len(index_prob_list), p=index_prob_list)
return best_index
def compute_information_content(pwm):
'''
Given a profile weight matrix, compute its information content.
'''
information_content = 0.0
for row in pwm:
for base_prob in row:
if base_prob == 0:
continue
information_content += base_prob * np.log(base_prob / 0.25)
return information_content
def gibbs():
'''
Performs Gibbs sampling on the list of sequences, given a motif length.
'''
curr_position_list = initialize_random_positions()
pos_info_dct = {}
# Repeat a hundred times. Get the position list with the best information
# content.
for i in range(10):
for sequence_i, position in enumerate(curr_position_list):
# Build a profile Q using sequences in curr_position_list, except i.
pwm = build_profile_matrix(curr_position_list, sequence_i)
# Find where the profile matches best in sequence_i.
curr_position_list[sequence_i] = find_best_index(sequence_list[
sequence_i], pwm)
key = tuple(curr_position_list[:])
if key in pos_info_dct:
continue
s = build_profile_matrix(curr_position_list, 'dummy_arg')
pos_info_dct[key] = compute_information_content(s)
pos_info_dct = sorted(pos_info_dct.items(), key=operator.itemgetter(1),
reverse=True)
return pos_info_dct[0][0]
def main():
if len(sys.argv) != 2:
print('Usage: python %s data_folder' % (sys.argv[0]))
exit()
global data_folder, sequence_list, ml
data_folder = sys.argv[1]
if not os.path.exists('outcomes/{}'.format(data_folder)):
os.makedirs('outcomes/{}'.format(data_folder))
for num in range(10):
print num
run_start = time.time()
# set up background matrix
with open('./data_sets/{}/{}/motiflength.txt'.format(data_folder, str(num)), 'r') as f:
ml = int(f.read())
# Read sequence file and run Gibbs sampler.
sequence_list = read_sequence_file(num)
curr_position_list = gibbs()
# Make PWM predictedmotif out of the curr_position_list
predictedmotif = build_profile_matrix(curr_position_list, 'dummy_arg')
# Generate files
if not os.path.exists('outcomes/{}/{}'.format(data_folder, str(num))):
os.makedirs('outcomes/{}/{}'.format(data_folder, str(num)))
with open('outcomes/{}/{}/predictedmotif.txt'.format(data_folder, str(num)), 'w') as f:
# f.write('>MOTIF ' + str(ml) + ' ' + data_folder + '\n')
f.write('>MOTIF ' + str(ml) + '\n')
for l in predictedmotif:
for s in l:
f.write(str(s) + ' ')
f.write('\n')
f.write('<')
with open('outcomes/{}/{}/predictedsites.txt'.format(data_folder, str(num)), 'w') as f:
for l in curr_position_list:
f.write(str(l) + '\n')
runtime = (time.time() - run_start)
with open('outcomes/{}/{}/runtime.txt'.format(data_folder, str(num)), 'w') as f:
f.write(str(runtime))
if __name__ == '__main__':
start_time = time.time()
main()
print("--- %s seconds ---" % (time.time() - start_time))