from __future__ import print_function
# Copyright (c) 2014 Rebecca R. Murphy
# All rights reserved.
# Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import os
import sys
import struct
import csv
try:
import numpy as np
from matplotlib import pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm
import matplotlib
import math as ma
from sklearn import mixture
#print "Imports all good"
except:
print("Numerical libraries not present in your systems")
def _runningMean(x, T):
"""
Calculate running sum over array x with summation window T.
Called by APBS and DCBS to calculate number of photons arriving within time window T
Arguments:
* x: a numpy array of photon counts
* T: window length (number of array entries) over which to sum photons
"""
return np.convolve(x, np.ones((T,)), mode='same')
[docs]class FRET_data:
"""
This class holds single molecule data.
It has two attributes, donor and acceptor to hold photon counts from the donor and acceptor channels respectively.
These are numpy arrays.
It can be initialized from two lists or two arrays of photon counts: data = FRET_data(donor_events_list, acceptor_events_list)
"""
def __init__(self, donor, acceptor):
"""
Initialize the FRET data object.
Arguments:
* donor: list or array of donor time-bins
* acceptor: list or array of acceptor time-bins.
"""
self.donor = np.array(donor).astype(float)
self.acceptor = np.array(acceptor).astype(float)
[docs] def subtract_bckd(self, bckd_d, bckd_a):
"""
Subtract background noise from donor and acceptor channel data.
Arguments:
* bckd_d: average noise per time-bin in the donor channel
* bckd_a: average noise per time-bin in the acceptor channel
"""
self.donor = self.donor - bckd_d
self.acceptor = self.acceptor - bckd_a
return self
[docs] def subtract_crosstalk(self, ct_d, ct_a):
"""
Subtract crosstalk from donor and acceptor channels.
Arguments:
* ct_d: fractional cross-talk from donor to acceptor (float between 0 and 1)
* ct_a: fractional cross-talk from acceptor to donor (float between 0 and 1)
"""
donor_cross_talk = self.donor * ct_d
acceptor_cross_talk = self.acceptor * ct_a
self.donor = self.donor - acceptor_cross_talk
self.acceptor = self.acceptor - donor_cross_talk
return self
[docs] def threshold_AND(self, D_T, A_T):
"""
Select data based on the AND thresholding criterion.
Arguments:
* D_T: threshold for the donor channel
* A_T: threshold for the acceptor channel
An event is above threshold if nD > donor_threshold AND nA > acceptor_threshold
for nD and nA photons in the donor and acceptor channels respectively
"""
select = (self.donor > D_T) & (self.acceptor > A_T)
self.donor = self.donor[select]
self.acceptor = self.acceptor[select]
return self
[docs] def threshold_OR(self, D_T, A_T):
"""
Select data based on the OR thresholding criterion.
Arguments:
* D_T: threshold for the donor channel
* A_T: threshold for the acceptor channel
An event is above threshold in nD > donor_threshold OR nA > acceptor_threshold
for nD and nA photons in the donor and acceptor channels respectively
"""
select = (self.donor > D_T) | (self.acceptor > A_T)
self.donor = self.donor[select]
self.acceptor = self.acceptor[select]
return self
[docs] def threshold_SUM(self, T):
"""
Select data based on the SUM thresholding criterion.
Arguments:
T: threshold above which a time-bin is accepted as a fluorescent burst
An event is above threshold in nD + nA > threshold
for nD and nA photons in the donor and acceptor channels respectively
"""
select = (self.donor + self.acceptor > T)
self.donor = self.donor[select]
self.acceptor = self.acceptor[select]
return self
[docs] def proximity_ratio(self, gamma=1.0):
"""
Calculate the proximity ratio (E) and return an array of values.
Arguments:
None
Keyword arguments:
* gamma (default value 1.0): the instrumental gamma-factor
Calculation:
E = nA / (nA + gamma*nD) for nA and nD photons in the acceptor and donor channels respectively
"""
E = self.acceptor / (self.acceptor + (gamma * self.donor))
return E
# burst search
[docs] def APBS(self, T, M, L):
"""
All-photon bust search algorithm as implemented in Nir et al. J Phys Chem B. 2006 110(44):22103-24.
Returns a FRET_bursts object.
Arguments:
* T: time-window (in bins) over which to sum photons
* M: number of photons in window of length T required to identify a potential burst.
* L: total number of photons required for an identified burst to be accepted.
From Nir et al.:
The start (respectively, the end) of a potential burst is detected when the number of photons in the averaging window of duration T is larger (respectively, smaller) than the minimum number of photons M.
A potential burst is retained if the number of photons it contains is larger than a minimum number L.
"""
data = self.donor + self.acceptor
means = _runningMean(data, T)
donor_bursts, acceptor_bursts, b_start, b_end = self._APBS_bursts(data, means, M, L)
FRET_bursts_obj = FRET_bursts(donor_bursts, acceptor_bursts, b_start, b_end)
return FRET_bursts_obj
def _APBS_bursts(self, data, means, M, L):
assert len(data) == len(means)
donor_bursts = []
acceptor_bursts = []
burst_starts = []
burst_ends = []
positions = []
bursts = []
burst = 0
b_donor = 0
b_acceptor = 0
burst_len = 0
collecting = False
for pos in range(len(data)):
if means[pos] >= M:
collecting = True
burst_len += 1
burst += data[pos]
b_donor += self.donor[pos]
b_acceptor += self.acceptor[pos]
else:
if collecting:
bursts.append(burst)
donor_bursts.append(b_donor)
acceptor_bursts.append(b_acceptor)
burst_pos = pos - (float(burst_len) / 2)
burst_starts.append(pos - burst_len)
burst_ends.append(pos)
positions.append(burst_pos)
burst = 0
b_donor = 0
b_acceptor = 0
burst_len = 0
collecting = False
bursts = np.array(bursts)
donor_bursts = np.array(donor_bursts)
acceptor_bursts = np.array(acceptor_bursts)
burst_starts = np.array(burst_starts)
burst_ends = np.array(burst_ends)
positions = np.array(positions)
select = bursts >= L
bursts = bursts[select]
positions = positions[select]
donor_bursts = donor_bursts[select]
acceptor_bursts = acceptor_bursts[select]
burst_starts = burst_starts[select]
burst_ends = burst_ends[select]
return donor_bursts, acceptor_bursts, burst_starts, burst_ends
[docs] def DCBS(self, T, M, L):
"""
Dual-channel bust search algorithm as implemented in Nir et al. J Phys Chem B. 2006 110(44):22103-24.
Returns a FRET_bursts object.
Arguments:
* T: time-window (in bins) over which to sum photons
* M: number of photons in window of length T required to identify a potential burst.
* L: total number of photons required for an identified burst to be accepted.
From Nir et al.:
The start (respectively, the end) of a potential burst is detected when the number of photons in the averaging window of duration T is larger (respectively, smaller) than the minimum number of photons M.
A potential burst is retained if the number of photons it contains is larger than a minimum number L.
"""
donor_means = _runningMean(self.donor, T)
acceptor_means = _runningMean(self.acceptor, T)
donor_bursts, acceptor_bursts, b_start, b_end = self._DCBS_bursts(donor_means, acceptor_means, M, L)
FRET_bursts_obj = FRET_bursts(donor_bursts, acceptor_bursts, b_start, b_end)
return FRET_bursts_obj
def _DCBS_bursts(self, donor_means, acceptor_means, M, L):
assert len(self.donor) == len(donor_means)
donor_bursts = []
acceptor_bursts = []
burst_starts = []
burst_ends = []
positions = []
b_donor = 0
b_acceptor = 0
burst_len = 0
collecting = False
for pos in range(len(self.donor)):
# condition depends on both channels
if (donor_means[pos] >= M) & (acceptor_means[pos] >= M):
collecting = True
burst_len += 1
b_donor += self.donor[pos]
b_acceptor += self.acceptor[pos]
else:
if collecting:
donor_bursts.append(b_donor)
acceptor_bursts.append(b_acceptor)
burst_pos = pos - (float(burst_len) / 2)
burst_starts.append(pos - burst_len)
burst_ends.append(pos)
positions.append(burst_pos)
b_donor = 0
b_acceptor = 0
burst_len = 0
collecting = False
donor_bursts = np.array(donor_bursts)
acceptor_bursts = np.array(acceptor_bursts)
burst_starts = np.array(burst_starts)
burst_ends = np.array(burst_ends)
positions = np.array(positions)
select = (donor_bursts >= L) & (acceptor_bursts >= L)
positions = positions[select]
donor_bursts = donor_bursts[select]
acceptor_bursts = acceptor_bursts[select]
burst_starts = burst_starts[select]
burst_ends = burst_ends[select]
return donor_bursts, acceptor_bursts, burst_starts, burst_ends
[docs] def make_3d_plot(self, filepath, imgname, imgtype="pdf", labels = ["Donor", "Acceptor", "Frequency"]):
"""
Make a 3D histogram of donor and acceptor photon counts.
Arguments:
* filepath: path to folder where data will be saved
* filename: name of image file to save plot
Keyword arguments:
* filetype: image type (as string). Default "pdf". Accepted values: jpg, tiff, rgba, png, ps, svg, eps, pdf
* labels: axes labels, list of strings ["Xtitle", "Ytitle", "Ztitle"]. Default ["Donor", "Acceptor", "Frequency"]
"""
fullname = ".".join([imgname, imgtype])
max_val = max(max(self.donor), max(self.acceptor))
big_matrix = np.zeros((ma.ceil(max_val)+1, ma.ceil(max_val)+1))
for (i, j) in zip(self.donor, self.acceptor):
big_matrix[i][j] += 1
fig = plt.figure()
ax = fig.gca(projection="3d")
ax.locator_params(nbins=4)
X = np.arange(0, max_val+1, 1)
Y = np.arange(0, max_val+1, 1)
X, Y = np.meshgrid(X, Y)
surf = ax.plot_surface(X, Y, big_matrix, rstride=1, cstride=1, cmap=cm.coolwarm,
linewidth=0, antialiased=False)
#fig.colorbar(surf, shrink=0.5, aspect=5)
ax.set_xlabel("\n%s" %labels[0])
ax.set_ylabel("\n%s" %labels[1])
ax.set_zlabel("\n%s" %labels[2])
figname = os.path.join(filepath, fullname)
plt.savefig(figname)
plt.close()
return self
[docs] def make_hex_plot(self, filepath, imgname, imgtype="pdf", labels = ["Donor", "Acceptor"], xmax = None, ymax = None, binning=None):
"""
Make a 2D representation of donor and acceptor photon count frequencies.
Based on the matplotlib.pyplot construction "hexbin": http://matplotlib.org/api/pyplot_api.html
Arguments:
* filepath: path to folder where data will be saved
* imgname: name of image file to save plot
Keyword arguments:
* imgtype: image type (as string). Default "pdf". Accepted values: jpg, tiff, rgba, png, ps, svg, eps, pdf
* labels: axes labels, list of strings ["Xtitle", "Ytitle"]. Default ["Donor", "Acceptor"]
* xmax: maximum x-axis value. Default None (maximum will be the brightest donor event)
* ymax: maximum x-axis value. Default None (maximum will be the brightest acceptor event)
* binning: type of binning to use for plot. Default: None (bin colour corresponds to frequency).
Accepted vals: "log" (bin colour corresponds to frequency), integer (specifies number of bins), sequence (specifies bin lower bounds)
"""
fullname = ".".join([imgname, imgtype])
plt.hexbin(self.donor, self.acceptor, bins=binning)
plt.colorbar()
plt.xlabel("\n%s" %labels[0])
plt.ylabel("\n%s" %labels[1])
figname = os.path.join(filepath, fullname)
if (xmax != None) & (ymax != None):
plt.xlim(0, xmax)
plt.ylim(0, ymax)
plt.savefig(figname)
plt.close()
return self
[docs] def build_histogram(self, filepath, csvname, gamma=1.0, bin_min=0.0, bin_max=1.0, bin_width=0.02, image = False, imgname = None, imgtype=None, gauss = True, gaussname = None, n_gauss=1):
"""
Build a proximity ratio histogram and save the frequencies and bin centres as a csv file.
Optionally plot and save a graph and perform a simple gaussian fit.
Arguments:
* E: array of FRET efficiecies
* filepath: path to folder where the histogram will be saved (as a string)
* csvname: the name of the file in which the histogram will be saved (as a string)
Keyword arguments:
* gamma: Instrumental gamma factor. (float, default value 1.0)
* bin_min: the minimum value for a histogram bin (default 0.0)
* bin_max: the maximum value for a histogram bin (default 1.0)
* bin_width: the width of one bin (default 0.02)
* image: Boolean. True plots a graph of the histogram and saves it (default False)
* imgname: the name of the file in which the histogram graph will be saved (as a string)
* imgtype: filetype of histogram image. Accepted values: jpg, tiff, rgba, png, ps, svg, eps, pdf
* gauss: Boolean. True will fit the histogram with a single gaussian distribution (default False)
* gaussname: the name of the file in which the parameters of the Gaussian fit will be saved
* n_gauss: number of Gaussain distributions to fit. Default = 1
"""
E = self.proximity_ratio(gamma)
csv_title = ".".join([csvname, "csv"])
csv_full = os.path.join(filepath, csv_title)
with open(csv_full, "w") as csv_file:
bins = np.arange(bin_min, bin_max, bin_width)
freq, binss, _ = plt.hist(E, bins, facecolor = "grey")
bin_centres = []
for i in range(len(binss)-1):
bin_centres.append((binss[i+1] + binss[i])/2)
for bc, fr in zip(bin_centres, freq):
csv_file.write("%s, %s \n" %(bc, fr))
if image == True:
img_name = ".".join([imgname, imgtype])
img_path = os.path.join(filepath, img_name)
plt.xlabel("FRET Efficiency")
plt.ylabel("Number of Events")
plt.savefig(img_path)
plt.cla()
if gauss == True and gaussname != None:
# fit
print(E)
ms, cs, ws = fit_mixture(E, ncomp=n_gauss)
# set up
histo = plt.hist(E, bins, label='Test data', facecolor = "grey", normed=True)
csv_title = ".".join([gaussname, "csv"])
csv_full = os.path.join(filepath, csv_title)
csv_file = open(csv_full, "w")
csv_file.write("E, sigma, weight\n")
# plot and write to file
for w, m, c in zip(ws, ms, cs):
plt.plot(histo[1],w*matplotlib.mlab.normpdf(histo[1],m,c), linewidth=3)
csv_file.write("%s,%s,%s\n" % (m, c, w))
plt.xlabel("FRET Efficiency")
plt.ylabel("Number of Events")
gauss_title = ".".join([gaussname, imgtype])
gauss_title = os.path.join(filepath, gauss_title)
plt.savefig(gauss_title)
csv_file.close()
plt.close()
return self
[docs]class FRET_bursts(FRET_data):
"""
This class holds single molecule burst data. Photon bursts are stored in numpy arrays. There is a separate array for each of the two
photon streams, for the start and end of each burst and for the burst duration.
The two attributes corresponding to bursts from the four photon streams from a FRET experiment are numpy arrays:
* donor: The donor channel
* acceptor: The acceptor channel
The three further attributes, corresponding to burst duration, burst start time and burst end time are also numpy arrays:
* burst_len: Length (in bins) of each identified burst
* burst_starts: Start time (bin number) of each identified burst
* burst_ends: End time (bn number) of each identified burst
The class can be initialized directly from four lists or arrays: two of burst photon counts; the burst start times and the burst end times:
bursts = FRET_bursts(donor_bursts, acceptor_bursts, burst_starts, burst_ends).
However, it is more typically achieved by running the APBS or DCBS algorithm that forms part of the FRET_data class.
"""
def __init__(self, donor, acceptor, burst_starts, burst_ends):
FRET_data.__init__(self, donor, acceptor)
self.total = self.donor + self.acceptor
self.burst_starts = burst_starts
self.burst_ends = burst_ends
self.burst_len = self.burst_ends - self.burst_starts
[docs] def denoise_bursts(self, N_D, N_A):
"""
Subtract background noise from donor and acceptor bursts.
Arguments:
* N_D: average noise per time-bin in the donor channel
* N_A: average noise per time-bin in the acceptor channel
"""
donor_noise = N_D * self.burst_len
acceptor_noise = N_A * self.burst_len
self.donor = self.donor - donor_noise
self.acceptor = self.acceptor - acceptor_noise
return self
[docs] def scatter_intensity(self, filepath, imgname, imgtype="pdf", labels = ["Burst Duration", "Burst Intensity"]):
"""
Plot a scatter plot of burst brightness vs burst duration
Arguments:
* filepath: file path to the directory in which the image will be saved
* imgname: name under which the image will be saved
Keyword arguments:
* imgtype: filetype of histogram image. Accepted values: jpg, tiff, rgba, png, ps, svg, eps, pdf
* labels: labels for x and y axes, as a 2-element list of strings: ["x-title", "y-title"]. Default value: ["Burst Duration", "Burst Intensity"]
"""
fullname = ".".join([imgname, imgtype])
figname = os.path.join(filepath, fullname)
plt.scatter(self.burst_len, self.total)
plt.xlabel(labels[0])
plt.ylabel(labels[1])
plt.savefig(figname)
plt.close()
return self
[docs] def RASP(self, Emin, Emax, Tmin, Tmax, gamma=1.0):
"""
Recurrence Analysis of Single Particles analysis as implemented in Hoffmann et al. Phys Chem Chem Phys. 2011 13(5):1857-1871.
Returns a FRET_bursts object.
Arguments:
* Emin: minimum value of E (proximity ratio) to consider for initial bursts
* Emax: maximum value of E (proximity ratio) to consider for initial bursts
* Tmin: start time (in number of bins after the initial burst) to search for recurrent bursts
* Tmax: end time (in number of bins after the initial burst) to search for recurrent bursts
Keyword Arguments:
* gamma: value of instrumental gamma factor to use in calculating the proximity ratio. Default value = 1.0.
From Hoffmann et al.:
First, the bursts b2 must be detected during a time interval between t1 and t2 (the 'recurrence interval', T = (t1,t2))
after a previous burst b1 (the 'initial burst'). Second, the initial bursts must yield a transfer efficiency, E(b1),
within a defined range, Delta E1 (the 'initial E range').
In this implementation, Tmin and Tmax correspond to t1 and t2 respectively. The initial E range lies between Emin and Emax.
"""
E1 = self.proximity_ratio(gamma)
E_select = (E1 > Emin) & (E1 < Emax)
E_sel = E1[E_select]
end_sel = self.burst_ends[E_select]
T_start = end_sel + Tmin
T_end = end_sel + Tmax
Eevents = np.array([])
donor = np.array([])
acceptor = np.array([])
b_start = np.array([])
b_end = np.array([])
for s, e in zip(T_start, T_end):
sel = (self.burst_starts >= s) & (self.burst_starts <= e)
Eevents = np.hstack((Eevents, E1[sel]))
donor = np.hstack((donor, self.donor[sel]))
acceptor = np.hstack((acceptor, self.acceptor[sel]))
b_start = np.hstack((b_start, self.burst_starts[sel]))
b_end = np.hstack((b_end, self.burst_ends[sel]))
recurrent_bursts = FRET_bursts(donor, acceptor, b_start, b_end)
return recurrent_bursts
[docs]def parse_csv(filepath, filelist, delimiter=","):
"""
Read data from a list of csv and return a FRET_data object.
Arguments:
* filepath: the path to the folder containing the files
* filelist: list of files to be analysed
Keyword arguments:
* delimiter (default ","): the delimiter between values in a row of the csv file.
This function assumes that each row of your file has the format:
"donor_item,acceptor_item"
If your data does not have this format (for example if you have separate files for donor and acceptor data), this function will not work well for you.
"""
donor_data = []
acceptor_data = []
for fp in filelist:
current_file = os.path.join(filepath, fp)
with open(current_file) as csv_file:
current_data = csv.reader(csv_file, delimiter=',')
for row in current_data:
donor_data.append(float(row[0]))
acceptor_data.append(float(row[1]))
FRET_data_obj = FRET_data(donor_data, acceptor_data)
return FRET_data_obj
[docs]def parse_bin(filepath, filelist, bits=8):
"""
Read data from a list of binary files and return a FRET_data object.
Arguments:
* filepath: the path to the folder containing the files
* filelist: list of files to be analysed
Keyword arguments:
* bits (default value 8): the number of bits used to store a donor-acceptor pair of time-bins
**Note: This file structure is probably specific to the Klenerman group's .dat files.**
**Please don't use it unless you know you have the same filetype!**
"""
byte_order = sys.byteorder
try:
if byte_order == "little":
edn = '>ii'
elif byte_order == "big":
edn = '<ii'
except ValueError:
print("Unknown byte order")
raise
donor_data = []
acceptor_data = []
for fp in filelist:
current_file = os.path.join(filepath, fp)
counter = 0
data = []
with open(current_file, "rb") as f:
x = f.read(bits)
while x:
data.append(struct.unpack(edn, x))
x = f.read(bits)
#return np.array(data)
for tup in data:
donor_data.append(float(tup[0]))
acceptor_data.append(float(tup[1]))
FRET_data_obj = FRET_data(donor_data, acceptor_data)
return FRET_data_obj
# def _fit_gauss(bin_centres, frequencies, p0=[1.0, 0.0, 1.0]):
# """
# Fit a histogram with a single gaussian distribution.
# Arguments:
# * bin_centres: list of histogram bin centres
# * frequencies: list of histogram bin frequencies
# Keyword arguments:
# p0: initial values for gaussian fit as a list of floats: [Area, mu, sigma]. (Default: [1.0, 0.0, 1.0])
# """
# def gauss(x, *p):
# A, mu, sigma = p
# return A*np.exp(-(x-mu)**2/(2.*sigma**2))
# coeff, var_matrix = curve_fit(gauss, bin_centres, frequencies, p0)
# hist_fit = gauss(bin_centres, *coeff)
# return coeff, var_matrix, hist_fit
[docs]def fit_mixture(data, ncomp=1):
"""
Fit data using Gaussian mixture model
Arguments:
* data: data to be fitted, as a numpy array
Key-word arguments:
* ncomp (default value 1): number of components in the mixture model.
"""
data = data.reshape(-1, 1)
clf = mixture.GMM(n_components=ncomp, covariance_type='full')
print(data)
clf.fit(data)
ml = clf.means_
wl = clf.weights_
cl = clf.covars_
ms = [m[0] for m in ml]
cs = [np.sqrt(c[0][0]) for c in cl]
ws = [w for w in wl]
return ms, cs, ws