import numpy as np
import pandas as pd
import trackpy as tp
import networkx as nx
import skvideo.io
import skimage.io
import skvideo.utils
from skimage.filters import laplace, gaussian, threshold_otsu
from skimage import measure
from sklearn.cluster import OPTICS
from sklearn.neighbors import NearestNeighbors
from scipy.spatial import distance_matrix
from pathlib import Path
from typing import List, Union, Tuple
[docs]class SarcGraph:
def __init__(self, output_dir: str = "test-run", file_type: str = "video"):
"""Zdiscs and sarcomeres segmentation and tracking.
Attributes
----------
output_dir : str, optional
location to save processed information, by default ``'test-run'``
file_type : str, optional
use ``'image'`` for single-frame samples and ``'video'`` for
multi-frame samples, by default ``'video'``
"""
if file_type not in ["video", "image"]:
raise ValueError(
f"{file_type} is not recognized as a valid file_type. Choose "
"from ['video', 'image']."
)
if not isinstance(output_dir, str):
raise TypeError("output_dir must be a string.")
Path(f"{output_dir}").mkdir(parents=True, exist_ok=True)
self.output_dir = output_dir
self.file_type = file_type
###########################################################
# Utility Functions #
###########################################################
[docs] def _data_loader(self, file_path: str) -> np.ndarray:
"""Loads a video/image file.
Parameters
----------
file_path : str
A video/image file address
Returns
-------
numpy.ndarray
All frames in the raw format
"""
if self.file_type == "video":
data = skvideo.io.vread(file_path)
if data.shape[0] > 1:
return data
return skimage.io.imread(file_path)
[docs] def _to_gray(self, frames: np.ndarray) -> np.ndarray:
"""Convert RGB frames to grayscale.
Parameters
----------
frames : np.ndarray
RGB frames
Returns
-------
numpy.ndarray
All frames in gray scale
"""
frames_gray = skvideo.utils.rgb2gray(frames)
if self.file_type == "video" and frames_gray.shape[0] < 2:
raise ValueError(
"Failed to load video correctly! Manually load the video into "
"a numpy array and input to the function as raw_frames."
)
if self.file_type == "image" and frames_gray.shape[0] > 1:
raise ValueError(
"Trying to load a video while file_type='image'. Load the "
"image manually or change the file_type to 'video."
)
return frames_gray
[docs] def _save_numpy(
self, data: Union[List, np.ndarray], file_name: str
) -> None:
"""Saves a numpy array.
Parameters
----------
data : np.ndarray
file_name: str
"""
if not isinstance(file_name, str):
raise TypeError("file_name must be a string.")
if isinstance(data, np.ndarray) or isinstance(data, List):
np.save(
f"./{self.output_dir}/{file_name}.npy", data, allow_pickle=True
)
return
raise TypeError("data must be a numpy.ndarray or a List.")
[docs] def _save_dataframe(self, data: pd.DataFrame, file_name: str) -> None:
"""Saves a pandas dataframe.
Parameters
----------
data : pd.DataFrame
file_name: str
"""
if not isinstance(file_name, str):
raise TypeError("file_name must be a string.")
if isinstance(data, pd.DataFrame):
data.to_csv(f"./{self.output_dir}/{file_name}.csv")
return
raise TypeError(
f"'data' type is {type(data)}. 'data' must be a pandas DataFrame."
)
[docs] def _filter_frames(
self, frames: np.ndarray, sigma: float = 1.0
) -> np.ndarray:
"""Convolves all frames with laplacian and gaussian filters.
Parameters
----------
frames : np.ndarray
sigma : float
Standard deviation for Gaussian kernel
Returns
-------
np.ndarray, shape=(frames, dim_1, dim_2)
"""
if frames.ndim > 4 or frames.ndim < 3:
raise ValueError(
"Input array must have the shape (frames, dim_1, dim_2, "
"channels=1 (optional))"
)
if frames.ndim == 4:
if frames.shape[3] != 1:
raise ValueError(
"Number of channels in the input array must be 1. Input "
"shape: (frames, dim_1, dim_2, channels=1 (optional))"
)
frames = frames[:, :, :, 0]
filtered_frames = np.zeros(frames.shape)
for i, frame in enumerate(frames):
laplacian = laplace(frame)
filtered_frames[i] = gaussian(laplacian, sigma=sigma)
return filtered_frames
#############################################################
# Z-Disc Segmentation #
#############################################################
[docs] def _detect_contours(
self,
filtered_frames: np.ndarray,
min_length: int = 8,
save_output: bool = True,
) -> List[np.ndarray]:
"""Returns contours of detected zdiscs in each frame.
Parameters
----------
filtered_frames : np.ndarray, shape=(frames, dim_1, dim_2)
min_length : int
Zdisc contours shorter than ``min_length`` pixels will be ignored.
save_output : bool
by default True
Returns
-------
List[np.ndarray]
Notes
-----
.. warning::
Here we use the ``skimage.measure.find_contours`` function to find
contours. This function returns contours with coordinates in
``(row, column)`` order, which corresponds to ``(y, x)`` in
Cartesian coordinates. Therefore, in the rest of the code, we use
``y`` for the first dimension and ``x`` for the second dimension.
For example, for plotting we use ``plt.plot(y, x)``.
"""
if filtered_frames.ndim != 3:
raise ValueError(
"The input must be a 3d numpy array: (frames, " "dim 1, dim 2)"
)
if not isinstance(min_length, int):
try:
min_length = int(min_length)
except ValueError:
raise ValueError("min_length must be an integer.") from None
length_checker = np.vectorize(len)
valid_contours = []
for frame in filtered_frames:
contour_thresh = threshold_otsu(frame)
contours = np.array(
measure.find_contours(frame, contour_thresh), dtype="object"
)
contours_size = list(length_checker(contours))
valid_contours.append(
contours[np.greater_equal(contours_size, min_length)]
)
valid_contours = np.array(valid_contours, dtype="object")
if save_output:
self._save_numpy(valid_contours, file_name="contours")
return valid_contours
[docs] def _process_contour(self, contour: Union[List, np.ndarray]) -> np.ndarray:
"""Computes the center location of a zdisc as well as its two end
points given its 2d contour.
Parameters
----------
contour : np.ndarray, shape=(contour_length, 2)
Returns
-------
np.ndarray, shape=(6,)
zdisc center, end point 1, end point 2
"""
if len(contour) < 3:
raise ValueError("A contour must have at least 3 coordinates.")
# center of a contour
center_coords = np.mean(contour, axis=0)
# coordinates of the two points on the contour with maximum distance
dist_mat = distance_matrix(contour, contour)
indices = np.unravel_index(dist_mat.argmax(), dist_mat.shape)
p1, p2 = contour[indices[0]], contour[indices[1]]
return np.hstack((center_coords, p1, p2))
[docs] def _zdiscs_to_pandas(self, zdiscs_all: List[np.ndarray]) -> pd.DataFrame:
"""Creates a pandas dataframe from the information of detected zdiscs
in all frames.
Parameters
----------
zdiscs_all : List[np.ndarray]
A list of numpy arrays each containing information of detected
zdiscs in a frame, (zdisc center, end point 1, end point 2)
Returns
-------
pd.DataFrame
Columns are ``frame`` (frame number), ``x`` and ``y`` (zdiscs
center position), ``p1_x``, ``p1_y``, ``p2_x``, ``p2_y`` (zdiscs
end points positions).
"""
if self.file_type == "video" and len(zdiscs_all) < 2:
raise ValueError(
"Only one frame detected. Video is not loaded correctly."
)
if self.file_type == "image" and len(zdiscs_all) > 1:
raise ValueError(
"More than one frame detected. The image is not loaded "
"correctly."
)
data_frame = []
for i, zdiscs_frame in enumerate(zdiscs_all):
if type(zdiscs_frame) != np.ndarray:
raise TypeError("Input should be a list of numpy arrays.")
if zdiscs_frame.ndim != 2 or zdiscs_frame.shape[1] != 6:
raise ValueError(
"Each numpy array must have the shape: (number of zdiscs, "
"6)"
)
frame_id = i * np.ones((len(zdiscs_frame), 1), dtype=int)
zdiscs_frame_extended = np.hstack((frame_id, zdiscs_frame))
data_frame.append(
pd.DataFrame(
zdiscs_frame_extended,
columns=[
"frame",
"x",
"y",
"p1_x",
"p1_y",
"p2_x",
"p2_y",
],
)
)
return pd.concat(data_frame)
[docs] def zdisc_segmentation(
self,
file_path: Union[str, None] = None,
raw_frames: Union[np.ndarray, None] = None,
sigma: float = 1.0,
min_length: int = 8,
save_output: bool = True,
) -> pd.DataFrame:
"""Finds Z-Discs in a video/image and extracts related information,
(frame number, center position, end points).
Parameters
----------
file_path : str
address of the input video/image file
raw_frames : np.ndarray, shape=(frames, dim_1, dim_2, channels)
Raw input image or video given as a 4 dimensional array
sigma : float
Standard deviation for Gaussian kernel
min_length : int
Minimum length for zdisc contours measured in pixels
save_output : bool
by default True
Returns
-------
pd.DataFrame
Information of all detected zdiscs in every frame. Columns are
``'frame'`` (frame number), ``'x'`` and ``'y'`` (zdiscs center
position), ``'p1_x'``, ``'p1_y'``, ``'p2_x'``, ``'p2_y'`` (zdiscs
end points positions).
"""
filtered_frames = self._process_input(
file_path, raw_frames, sigma, save_output
)
contours_all = self._detect_contours(
filtered_frames, min_length, save_output
)
zdiscs_all = []
for contours_frame in contours_all:
zdiscs_frame = np.zeros((len(contours_frame), 6))
for i, contour in enumerate(contours_frame):
zdiscs_frame[i] = self._process_contour(contour)
zdiscs_all.append(zdiscs_frame)
zdiscs_all_dataframe = self._zdiscs_to_pandas(zdiscs_all)
if save_output:
self._save_dataframe(
zdiscs_all_dataframe, file_name="segmented-zdiscs"
)
return zdiscs_all_dataframe
#########################################################
# Z-disc Tracking #
#########################################################
[docs] def _merge_tracked_zdiscs(
self,
tracked_zdiscs: pd.DataFrame,
full_track_ratio: float = 0.75,
) -> pd.DataFrame:
"""A post processing step to group related partially tracked zdiscs
using the OPTICS algorithm. Increases the robustness of zdisc tracking
as well as the number of fully tracked zdiscs.
Parameters
----------
tracked_zdiscs : pd.DataFrame
tracked zdiscs information for all frames
full_track_ratio : float, optional
If a tracked zdisc appears in enough number of frames defined by
this ratio, it is considered a fully tracked zdisc. The default
value 0.75 means if a zdisc is tracked in more than 75 percent of
all frames it is fully tracked.
Returns
-------
pd.DataFrame
Notes
-----
For a detailed description of the OPTICS algorithm check:
https://scikit-learn.org/stable/modules/generated/sklearn.cluster.OPTICS.html
"""
num_frames = tracked_zdiscs.frame.max() + 1
tracked_zdiscs_grouped = tracked_zdiscs.groupby("particle")["particle"]
tracked_zdiscs["freq"] = tracked_zdiscs_grouped.transform("count")
fully_tracked_zdiscs = tracked_zdiscs.loc[
tracked_zdiscs.freq == num_frames
]
partially_tracked_zdiscs = tracked_zdiscs.loc[
tracked_zdiscs.freq < num_frames
]
if partially_tracked_zdiscs.empty:
return fully_tracked_zdiscs
partially_tracked_clusters = (
partially_tracked_zdiscs[["x", "y", "particle"]]
.groupby(by=["particle"])
.mean()
)
# merge related clusters (neighbors)
all_clusters_xy = (
tracked_zdiscs.groupby("particle").mean()[["x", "y"]].to_numpy()
)
clusters_min_dist = np.min(
distance_matrix(all_clusters_xy, all_clusters_xy)
+ 1e6 * np.eye(len(all_clusters_xy)),
axis=1,
)
optics_max_eps = np.mean(clusters_min_dist)
data = np.array(partially_tracked_clusters)
optics_model = OPTICS(max_eps=optics_max_eps, min_samples=2)
optics_result = optics_model.fit_predict(data)
optics_clusters = np.unique(optics_result)
all_merged_zdiscs = []
for i, optics_cluster in enumerate(optics_clusters):
index = np.where(optics_result == optics_cluster)[0]
particles_in_cluster = partially_tracked_clusters.iloc[
index
].index.to_numpy()
if optics_cluster >= 0:
merged_zdiscs = (
partially_tracked_zdiscs.loc[
partially_tracked_zdiscs["particle"].isin(
particles_in_cluster
)
]
.groupby("frame")
.mean()
)
if len(merged_zdiscs) > num_frames * full_track_ratio:
merged_zdiscs.particle = -(i + 1)
merged_zdiscs.freq = len(merged_zdiscs)
all_merged_zdiscs.append(merged_zdiscs.reset_index())
else:
for p in particles_in_cluster:
no_merge_zdiscs = partially_tracked_zdiscs.loc[
partially_tracked_zdiscs["particle"] == p
]
if len(no_merge_zdiscs) > num_frames * full_track_ratio:
all_merged_zdiscs.append(no_merge_zdiscs)
if all_merged_zdiscs:
all_merged_zdiscs = pd.concat(all_merged_zdiscs)
return pd.concat((fully_tracked_zdiscs, all_merged_zdiscs))
else:
return fully_tracked_zdiscs
[docs] def zdisc_tracking(
self,
file_path: str = None,
raw_frames: np.array = None,
segmented_zdiscs: pd.DataFrame = None,
sigma: float = 1.0,
min_length: int = 8,
tp_depth: float = 4.0,
full_track_ratio: float = 0.75,
skip_merging: bool = False,
save_output: bool = True,
) -> pd.DataFrame:
"""Track detected Z-Discs in a video. The input could be the address to
a video/image sample (``file_path``), raw frames as a numpy array
(``raw_frames``), or segmented zdiscs information in a pandas datafram
(``segmented_zdiscs``)e. If the function is run with no inputs, it will
search for 'raw-frames.npy', or 'segmented-zdiscs.csv' in the
specified output directory ``SarcGraph().output_dir``.
Parameters
----------
file_path : str
The address of an image or a video file to be loaded
raw_frames : np.ndarray, shape=(frames, dim_1, dim_2, channels)
Raw input image or video given as a 4 dimensional array
segmented_zdiscs : pd.DataFrame
Information of all detected zdiscs in every frame.
sigma : float
Standard deviation for Gaussian kernel, by default ``1.0``
min_length : int
Minimum length for zdisc contours measured in pixels, by default
``8``
tp_depth : float, optional
the maximum distance features can move between frames, by default
``4.0``
full_track_ratio : float, optional
by default ``0.75``
skip_merging : bool, optional
skipping the merging step will result in fewer fully tracked
zdiscs, by default ``False``
save_output : bool
by default ``True``
Returns
-------
pd.DataFrame
tracked zdiscs information. Columns are ``'frame'`` (frame number),
``'x'`` and ``'y'`` (zdiscs center position), ``'p1_x'``,
``'p1_y'``, ``'p2_x'``, ``'p2_y'`` (zdiscs end points positions),
and ``'particle'`` (id of each tracked zdisc).
Notes
-----
- If ``SarcGraph().file_type='image'``, tracking will be skipped.
- For a detailed description of the Trackpy package check:
http://soft-matter.github.io/trackpy/v0.5.0/tutorial.html
- For a detailed description of the OPTICS algorithm check:
https://scikit-learn.org/stable/modules/generated/sklearn.cluster.OPTICS.html
See Also
--------
:func:`sarcgraph.sg.SarcGraph.zdisc_segmentation`
:func:`sarcgraph.sg.SarcGraph._merge_tracked_zdiscs`
"""
if segmented_zdiscs is None:
segmented_zdiscs = self.zdisc_segmentation(
file_path, raw_frames, sigma, min_length, save_output
)
if type(segmented_zdiscs) != pd.DataFrame:
raise TypeError("segmented_zdiscs shoulb be a dataframe.")
correct_columns = set(("frame", "x", "y")).issubset(
set(segmented_zdiscs.columns)
)
if not correct_columns:
raise ValueError(
"segmented_zdiscs must contain at least three columns: 'x', "
"'y', 'frame'."
)
# skip tracking if the sample is an image
if self.file_type == "image":
segmented_zdiscs["particle"] = np.arange(len(segmented_zdiscs))
tracked_zdiscs = segmented_zdiscs
else:
# run tracking with the trackpy package:
num_frames = len(segmented_zdiscs.frame.unique())
t = tp.link_df(
segmented_zdiscs, search_range=tp_depth, memory=num_frames
)
tracked_zdiscs = tp.filter_stubs(t, 2).reset_index(drop=True)
if skip_merging is False:
tracked_zdiscs = self._merge_tracked_zdiscs(
tracked_zdiscs,
full_track_ratio,
)
if save_output:
self._save_dataframe(tracked_zdiscs, "tracked-zdiscs")
return tracked_zdiscs
#############################################################
# Sarcomere Detection #
#############################################################
[docs] def _zdisc_to_graph(self, zdiscs: np.array, K: int = 3) -> nx.Graph:
"""Creates a graph with zdiscs as nodes. Each zdisc is connected to its
``K`` nearest neighbors.
Parameters
----------
zdiscs : np.array, shape=(N, 3)
zdiscs information as an array. The first two columns are the x and
y location of zdisc centers and the last is the particle id.
K : int, optional
number of nearest neighbors for each zdisc, by default 3
Returns
-------
nx.Graph
"""
num_nodes = len(zdiscs)
G = nx.Graph()
G.add_nodes_from(range(num_nodes))
# add the position of each zdisc as a node attribute to the graph
nodes_pos_dict = {i: pos for i, pos in enumerate(zdiscs[:, 0:2])}
nodes_particle_dict = {j: id for j, id in enumerate(zdiscs[:, -1])}
nx.set_node_attributes(G, values=nodes_pos_dict, name="pos")
nx.set_node_attributes(
G, values=nodes_particle_dict, name="particle_id"
)
# find K nearest zdisc to each zdisc
neigh = NearestNeighbors(n_neighbors=2)
neigh.fit(zdiscs[:, 0:2])
nearestNeighbors = neigh.kneighbors(
zdiscs[:, 0:2], K + 1, return_distance=False
)
# connect each zdisc to K nearest neighbors
edges = []
for node, neighbors in enumerate(nearestNeighbors[:, 1:]):
for neighbor in neighbors:
edges.append((node, neighbor))
G.add_edges_from(edges)
return G
[docs] def _score_graph(
self,
G: nx.Graph,
c_avg_length: float = 1.0,
c_angle: float = 1.0,
c_length_diff: float = 1.0,
l_avg: float = 15.0,
l_max: float = 30.0,
) -> nx.Graph:
"""Assigns a score to each connection of the input graph. Higher score
indicates the two corresponding zdiscs are likely to be two ends of a
sarcomere.
Parameters
----------
G : nx.Graph
c_avg_length : float, optional
comparison of the length of a connection with ``l_avg`` affects the
connection score, ``c_avg_length`` sets the relative effect of this
score metric compared to the other two, by default 1.0
c_angle : float, optional
the angle between a connection and its neighboring connections
affects the connection score, ``c_angle`` sets the relative effect
of this score metric compared to the other two, by default 1.0
c_length_diff : float, optional
comparison of the length of a connection with the length of all
neighboring connections affects the score, ``c_length_diff`` sets
the relative effect of this score metric compared to the other two,
by default 1.0
l_avg : float, optional
an initial guess for the average length of sarcomeres in pixels, by
default 15.0
l_max : float, optional
Max allowable length for the length of sarcomeres in pixels, by
default 30.0
Returns
-------
nx.Graph
a graph of zdiscs with all connections scored
"""
edges_attr_dict = {}
for node in range(G.number_of_nodes()):
for neighbor in G.neighbors(node):
score = 0
v1 = G.nodes[neighbor]["pos"] - G.nodes[node]["pos"]
l1 = np.linalg.norm(v1)
if l1 <= l_max:
avg_length_score = np.exp(-np.pi * (1 - l1 / l_avg) ** 2)
for far_neighbor in G.neighbors(neighbor):
if far_neighbor in [node, neighbor]:
pass
else:
v2 = (
G.nodes[neighbor]["pos"]
- G.nodes[far_neighbor]["pos"]
) # vector connecting neighbor to far_neighbor
l2 = np.linalg.norm(v2)
d_theta = np.arccos(np.dot(v1, v2) / (l1 * l2)) / (
np.pi / 2
)
d_l = np.abs(l2 - l1) / l1
angle_score = (
np.power(1 - d_theta, 2) if d_theta >= 1 else 0
)
diff_length_score = 1 / (1 + d_l)
score = np.max(
(
score,
c_length_diff * diff_length_score
+ c_angle * angle_score,
)
)
score += c_avg_length * avg_length_score
edges_attr_dict[(node, neighbor)] = score
edges_attr_dict_keep_max = {}
for key in edges_attr_dict.keys():
node_1 = key[0]
node_2 = key[1]
max_score = max(
edges_attr_dict[(node_1, node_2)],
edges_attr_dict[(node_2, node_1)],
)
edges_attr_dict_keep_max[(min(key), max(key))] = max_score
nx.set_edge_attributes(
G, values=edges_attr_dict_keep_max, name="score"
)
return G
[docs] def _prune_graph(
self,
G: nx.Graph,
score_threshold: float = 0.1,
angle_threshold: float = 1.2,
) -> nx.Graph:
"""Prunes the input graph to get rid of invalid or less probable
connections.
Parameters
----------
G : nx.Graph
A scored graph of zdiscs clusters
score_threshold : float
any connection with a score less than the threshold will be
removed, by default 0.1
angle_threshold : float
if a zdisc has two valid connection the angle between must be
higher than the theshold, otherwise the connection with a lower
score will be removed, by default 1.2 (in radians)
Returns
-------
nx.Graph
"""
nx.set_edge_attributes(G, values=0, name="validity")
for node in range(G.number_of_nodes()):
vectors = []
scores = []
neighbors = list(G.neighbors(node))
for neighbor in neighbors:
vectors.append(G.nodes[neighbor]["pos"] - G.nodes[node]["pos"])
scores.append(G[node][neighbor]["score"])
# sort by scores
if np.max(scores) > score_threshold:
sort_indices = np.argsort(scores)[::-1]
best_vector = vectors[sort_indices[0]]
G[node][neighbors[sort_indices[0]]]["validity"] += 1
for idx in sort_indices[1:]:
s = scores[idx]
n = neighbors[idx]
v = vectors[idx]
l1 = np.linalg.norm(best_vector)
l2 = np.linalg.norm(v)
theta = np.arccos(np.dot(v, best_vector) / (l1 * l2)) / (
np.pi / 2
)
if theta > angle_threshold and s > score_threshold:
G[node][n]["validity"] += 1
break
edges2remove = []
for edge in G.edges():
if G.edges[edge]["validity"] < 2:
edges2remove.append(edge)
G.remove_edges_from(edges2remove)
return G
[docs] def sarcomere_detection(
self,
file_path: str = None,
raw_frames: np.array = None,
segmented_zdiscs: pd.DataFrame = None,
tracked_zdiscs: pd.DataFrame = None,
sigma: float = 1.0,
min_length: int = 8,
tp_depth: float = 4.0,
full_track_ratio: float = 0.75,
skip_merging: bool = False,
c_avg_length: float = 1,
c_angle: float = 1,
c_diff_length: float = 1,
l_avg: float = 12.0,
score_threshold: float = 0.01,
angle_threshold: float = 1.2,
save_output: bool = True,
) -> Tuple[pd.DataFrame, List[nx.Graph]]:
"""Detect sarcomeres in a video/image. The input could be the address
to the video/image sample (``file_path``), raw frames as a numpy array
(``raw_frames``), segmented zdiscs information in a pandas dataframe
(``segmented_zdiscs``), or a dataframe of tracked zdiscs
(``tracked_zdiscs``).
Parameters
----------
file_path : str
The address of an image or a video file to be loaded
raw_frames : np.ndarray, shape=(frames, dim_1, dim_2, channels)
Raw input image or video given as a 4 dimensional array
segmented_zdiscs : pd.DataFrame
Information of all detected zdiscs in every frame.
tracked_zdiscs : pd.DataFrame
Information of tracked zdiscs
sigma : float
Standard deviation for Gaussian kernel, by default ``1.0``
min_length : int
Minimum length for zdisc contours measured in pixels, by default
``8``
tp_depth : float, optional
the maximum distance features can move between frames, by default
``4.0``
full_track_ratio : float, optional
Frame ratio to seperate partially tracked vs fully tracked
sarcomeres, by default ``0.75``
skip_merging : bool, optional
skipping the merging step will result in fewer fully tracked
zdiscs, by default ``False``
c_avg_length : float, optional
comparison of the length of a connection with ``l_avg`` affects the
connection score, ``c_avg_length`` sets the relative effect of this
score metric compared to the other two, by default ``1.0``
c_angle : float, optional
the angle between a connection and its neighboring connections
affects the connection score, ``c_angle`` sets the relative effect
of this score metric compared to the other two, by default ``1.0``
c_length_diff : float, optional
comparison of the length of a connection with the length of all
neighboring connections affects the score, ``c_length_diff`` sets
the relative effect of this score metric compared to the other two,
by default ``1.0``
l_avg : float, optional
an initial guess for the average length of sarcomeres in pixels, by
default ``15.0``
score_threshold : float
any connection with a score less than the threshold will be
removed, by default ``0.1``
angle_threshold : float
if a zdisc has two valid connection the angle between must be
higher than the theshold, otherwise the connection with a lower
score will be removed, by default ``1.2`` (in radians)
save_output : bool
by default ``True``
Returns
-------
pd.DataFrame
Detected sarcomeres information. Columns are ``'frame'`` (frame
number), ``'sarc_id'`` (sarcomere id), ``'zdiscs'`` (particle id of
the two zdiscs forming a sarcomere), ``'x'`` and ``'y'`` (sarcomere
center position), ``'length'``, ``'width'``, and ``'angle'``
(sarcomere length, width, and angle).
List[nx.Graph]:
A list of graphs each indicating connected sarcomeres (myofibrils)
Notes
-----
- For a detailed description of the Trackpy package check:
http://soft-matter.github.io/trackpy/v0.5.0/tutorial.html
- For a detailed description of the OPTICS algorithm check:
https://scikit-learn.org/stable/modules/generated/sklearn.cluster.OPTICS.html
See Also
--------
:func:`sarcgraph.sg.SarcGraph.zdisc_segmentation`
:func:`sarcgraph.sg.SarcGraph.zdisc_tracking`
"""
if tracked_zdiscs is None:
tracked_zdiscs = self.zdisc_tracking(
file_path,
raw_frames,
segmented_zdiscs,
sigma,
min_length,
tp_depth,
full_track_ratio,
skip_merging,
save_output,
)
zdiscs_clusters = (
tracked_zdiscs.groupby("particle")
.mean()
.reset_index()[["x", "y", "particle"]]
.to_numpy()
)
G = self._zdisc_to_graph(zdiscs_clusters)
G = self._score_graph(G, c_avg_length, c_angle, c_diff_length, l_avg)
G = self._prune_graph(G, score_threshold, angle_threshold)
myofibrils = [G.subgraph(c).copy() for c in nx.connected_components(G)]
sarcs = []
num_frames = tracked_zdiscs.frame.max() + 1
z0 = pd.DataFrame(np.arange(0, num_frames, 1), columns=["frame"])
for i, edge in enumerate(G.edges):
p1 = int(G.nodes[edge[0]]["particle_id"])
p2 = int(G.nodes[edge[1]]["particle_id"])
z1 = tracked_zdiscs[tracked_zdiscs.particle == p1]
z2 = tracked_zdiscs[tracked_zdiscs.particle == p2]
z1.columns = np.insert(z2.columns.values[1:] + "_p1", 0, "frame")
z2.columns = np.insert(z2.columns.values[1:] + "_p2", 0, "frame")
sarc = pd.merge(z0, z1, how="outer", on="frame")
sarc = pd.merge(sarc, z2, how="outer", on="frame")
sarc["sarc_id"] = i
sarc["zdiscs"] = ",".join(
map(str, sorted((p1, p2)))
) # list(map(float, sarc.zdiscs[0].split(',')))
sarc["x"] = (sarc.x_p1 + sarc.x_p2) / 2
sarc["y"] = (sarc.y_p1 + sarc.y_p2) / 2
length = np.sqrt(
(sarc.x_p1 - sarc.x_p2) ** 2 + (sarc.y_p1 - sarc.y_p2) ** 2
)
sarc["length"] = length
width1 = np.sqrt(
(sarc.p1_x_p1 - sarc.p2_x_p1) ** 2
+ (sarc.p1_y_p1 - sarc.p2_y_p1) ** 2
)
width2 = np.sqrt(
(sarc.p1_x_p2 - sarc.p2_x_p2) ** 2
+ (sarc.p1_y_p2 - sarc.p2_y_p2) ** 2
)
sarc["width"] = (width1 + width2) / 2
angle = np.arctan2(sarc.x_p2 - sarc.x_p1, sarc.y_p2 - sarc.y_p1)
angle[angle < 0] += np.pi
sarc["angle"] = angle
sarcs.append(
sarc[
[
"frame",
"sarc_id",
"x",
"y",
"length",
"width",
"angle",
"zdiscs",
]
]
)
sarcs = pd.concat(sarcs).reset_index().drop("index", axis=1)
if save_output:
self._save_dataframe(sarcs, "sarcomeres")
return sarcs, myofibrils