-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathPyWR.py
More file actions
165 lines (146 loc) · 6.07 KB
/
Copy pathPyWR.py
File metadata and controls
165 lines (146 loc) · 6.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
#PyWR.py (version1.1) -- 27 Oct 2020
#Python functions for weather typing (using K-means) and flow-dependent model diagnostics
#Authors: ÁG Muñoz (agmunoz@iri.columbia.edu), James Doss-Gollin, AW Robertson (awr@iri.columbia.edu)
#The International Research Institute for Climate and Society (IRI)
#The Earth Institute at Columbia University.
#Notes:
#Log: see version.log in GitHub
import numpy as np
from numba import jit
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
import xarray as xr
from typing import Tuple
def download_data(url, authkey, outfile, force_download=False):
"""A smart function to download data from IRI Data Library
If the data can be read in and force_download is False, will read from file
Otherwise will download from IRIDL and then read from file
PARAMETERS
----------
url: the url pointing to the data.nc file
authkey: the authentication key for IRI DL (see above)
outfile: the data filename
force_download: False if it's OK to read from file, True if data *must* be re-downloaded
"""
if not force_download:
try:
data = xr.open_dataset(outfile, decode_times=False)
except:
force_download = True
if force_download:
# calls curl to download data
command = "curl -L -k -b '__dlauth_id={}' '{}' > {}".format(authkey, url, outfile)
#command = "curl -L '{}' > {}".format(url, outfile)
get_ipython().system(command)
# open the data
data = xr.open_dataset(outfile, decode_times=False)
return data
def get_number_eof(X: np.ndarray, var_to_explain: float, plot=False) -> int:
"""Get the number of EOFs of X that explain a given variance proportion
"""
assert var_to_explain > 0, 'var_to_explain must be between 0 and 1'
assert var_to_explain < 1, 'var_to_explain must be between 0 and 1'
pca = PCA().fit(X)
var_explained = pca.explained_variance_ratio_
cum_var = var_explained.cumsum()
n_eof = np.where(cum_var > var_to_explain)[0].min() + 1
if plot:
n_padding = 4
plt.figure(figsize=(12, 7))
plt.plot(np.arange(1, n_eof + 1 + n_padding), cum_var[0:(n_padding + n_eof)])
plt.scatter(np.arange(1, n_eof + 1 + n_padding), cum_var[0:(n_padding + n_eof)])
plt.xlabel('Number of EOFs')
plt.ylabel('Cumulative Proportion of Variance Explained')
plt.grid()
plt.title('{} EOF Retained'.format(n_eof))
plt.show()
return n_eof
@jit
def _vector_ci(P: np.ndarray, Q: np.ndarray) -> float:
"""Implement the Michaelangeli (1995) Classifiability Index
The variable naming here is not pythonic but follows the notation in the 1995 paper
which makes it easier to follow what is going on. You shouldn't need to call
this function directly but it is called in cluster_xr_eof.
PARAMETERS
----------
P: a cluster centroid
Q: another cluster centroid
"""
k = P.shape[0]
Aij = np.ones([k, k])
for i in range(k):
for j in range(k):
Aij[i, j] = np.corrcoef(P[i, :], Q[j, :])[0, 1]
Aprime = Aij.max(axis=0)
ci = Aprime.min()
return ci
def calc_classifiability(P, Q):
"""Implement the Michaelangeli (1995) Classifiability Index
The variable naming here is not pythonic but follows the notation in the 1995 paper
which makes it easier to follow what is going on. You shouldn't need to call
this function directly but it is called in cluster_xr_eof.
Args:
P: a cluster centroid
Q: another cluster centroid
"""
k = P.shape[0]
Aij = np.ones([k, k])
for i in range(k):
for j in range(k):
Aij[i, j], _ = correl(P[i, :], Q[j, :])
Aprime = Aij.max(axis=0)
ci = Aprime.min()
return ci
@jit
def get_classifiability_index(centroids: np.ndarray) -> Tuple[float, int]:
"""Get the classifiability of a set of centroids
This function will compute the classifiability index for a set of centroids and
indicate which is the best one
PARAMETERS
----------
centroids: input array of centroids, indexed [simulation, dimension]
"""
nsim = centroids.shape[0]
c_pq = np.ones([nsim, nsim])
for i in range(0, nsim):
for j in range(0, nsim):
if i == j:
c_pq[i, j] = np.nan
else:
c_pq[i, j] = _vector_ci(P=centroids[i, :, :], Q=centroids[j, :, :])
classifiability = np.nanmean(c_pq)
best_part = np.where(c_pq == np.nanmax(c_pq))[0][0]
return classifiability, best_part
@jit
def loop_kmeans(X: np.ndarray, n_cluster: int, n_sim: int) -> Tuple[np.ndarray, np.ndarray]:
"""Compute weather types
PARAMETERS
----------
X: an array (should be in reduced dimension space already) indexed [time, dimension]
n_cluster: how many clusters to compute
n_sim: how many times to initialize the clusters (note: computation increases order (n_sim**2))
"""
centroids = np.zeros(shape=(n_sim, n_cluster, X.shape[1]))
w_types = np.zeros(shape=(n_sim, X.shape[0]))
for i in np.arange(n_sim):
km = KMeans(n_clusters=n_cluster).fit(X)
centroids[i, :, :] = km.cluster_centers_
w_types[i, :] = km.labels_
return centroids, w_types
def resort_labels(old_labels):
"""Re-sort cluster labels
Takes in x, a vector of cluster labels, and re-orders them so that
the lowest number is the most common, and the highest number
the least common
Args:
old_labels: the previous labels of the clusters
Returns:
new_labels: the new cluster labels, ranked by frequency of occurrence
"""
old_labels = np.int_(old_labels)
labels_from = np.unique(old_labels).argsort()
counts = np.array([np.sum(old_labels == xi) for xi in labels_from])
orders = counts.argsort()[::-1].argsort()
new_labels = orders[labels_from[old_labels]] + 1
return new_labels