Commit 0343bef5 authored by Dominique Piche's avatar Dominique Piche

add Util module

parent 13360453
......@@ -156,8 +156,8 @@ class MecaRobotForDosimetry(MecaRobot):
Reads one sample of the NI DAQ board to get the scintillators' signal. Returns every input channel in
an array.
"""
# TODO : add to doc
return self.daq_board.read_measure()
scintillators_signal = self.daq_board.read_measure()
return scintillators_signal
def get_scintillators_positions(self) -> np.ndarray:
"""
......@@ -209,11 +209,14 @@ class MecaRobotForDosimetry(MecaRobot):
TRF_position = pose[0:3]
self.source_position = TRF_position + source_position_relative_to_TRF
def find_source_positions(self):
def find_source_position(self, distance_to_source=20):
"""
Automatically finds the source's position by moving along every axis near the source until we get the maximum signal.
This assumes that an estimate of the source position as already been set, as it will move around this poins to find
This assumes that an estimate of the source position as already been set, as it will move around this poin tto find
the maximum.
:param distance_to_source: how far from the source in mm will the the dosimeter be placed. If the user is unsure of the
source position estimation, this parameter should be higher to make sure the robot does not hit the source holder.
"""
self.clear_buffer()
......@@ -223,7 +226,7 @@ class MecaRobotForDosimetry(MecaRobot):
# Moving the robot near the source
self.move_position_relative_to_source(np.array([-5,50,0]))
self.move_position_relative_to_source(np.array([-5,15,0]))
self.move_position_relative_to_source(np.array([-5,distance_to_source,0]))
self.wait_for('3012', 'Did not receive EOB.')
# Moving alond the x axis
......@@ -235,7 +238,7 @@ class MecaRobotForDosimetry(MecaRobot):
# Moving the robot near the source
self.move_position_relative_to_source(np.array([-10,50,0]))
self.move_position_relative_to_source(np.array([-50,-5,0]))
self.move_position_relative_to_source(np.array([-15,-5,0]))
self.move_position_relative_to_source(np.array([-distance_to_source,-5,0]))
self.wait_for('3012', 'Did not receive EOB.')
# Moving along the y axis
......@@ -247,7 +250,7 @@ class MecaRobotForDosimetry(MecaRobot):
# Moving the robot near the source
self.move_position_relative_to_source(np.array([-50,-5,0]))
self.move_position_relative_to_source(np.array([0,50,-10]))
self.move_position_relative_to_source(np.array([0,15,-10]))
self.move_position_relative_to_source(np.array([0,distance_to_source,-10]))
# Moving along the z-axis
for i in range(200):
......@@ -259,9 +262,9 @@ class MecaRobotForDosimetry(MecaRobot):
self.zero_joints()
self.wait_for('3012', 'Did not receive EOB.')
x_max = np.argmax(np.linalg.norm(x_axis_signal, axis=1))*0.1-4.9
y_max = np.argmax(np.linalg.norm(y_axis_signal, axis=1))*0.1-4.9
z_max = np.argmax(np.linalg.norm(z_axis_signal, axis=1))*0.1-9.9
x_max = np.argmax(x_axis_signal[:,0])*0.1-4.9
y_max = np.argmax(y_axis_signal[:,0])*0.1-4.9
z_max = np.argmax(z_axis_signal[:,0])*0.1-9.9
np.savetxt('Data/x_calibration_data.txt', x_axis_signal, delimiter = ',')
np.savetxt('Data/y_calibration_data.txt', y_axis_signal, delimiter = ',')
......
......@@ -30,7 +30,7 @@ class DAQ_Volt_Tracking:
for output_channel_number in output_channels:
channel_name = 'Dev{}/ao{}'.format(daq_path_number, output_channel_number)
self.write_task.ao_channels.add_ao_voltage_chan(channel_name, max_val=1, min_val=0.9999)
self.write_task.ao_channels.add_ao_voltage_chan(channel_name, max_val=1, min_val=0)
Finite_Mode = nidaqmx.constants.AcquisitionType(10178)
Cont_Mode = nidaqmx.constants.AcquisitionType(10123) # Continious acquisition
......@@ -48,7 +48,7 @@ class DAQ_Volt_Tracking:
self.writer.write_one_sample(np.ones(self.number_of_output_channels, dtype=np.float64))
def close(self):
#self.writer.write_one_sample(np.zeros(self.number_of_output_channels, dtype=np.float64))
self.writer.write_one_sample(np.zeros(self.number_of_output_channels, dtype=np.float64))
warnings.filterwarnings('error')
# Stoping tasks
......
import numpy as np
import pytg43
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
def get_spherical_coordinates(x, y, z):
r = np.sqrt((x**2 + y**2 + z**2))
theta = np.arccos(z/r) * 180 / np.pi
return r, theta
def export_tg43_dose_rate(number_of_scintillators=1):
# Exports the dose rate corresponding to each dwell position in the Data folder calculatead using the TG43 formalism.
seed = pytg43.Seed('Flexisource')
scintillators_positions = []
for scintillator in range(number_of_scintillators):
cart_coords = np.genfromtxt('Data/scintillator_{}_position_relative_to_source.txt'.format(scintillator+1), delimiter=',') / 10
x, y, z = cart_coords[:, 0], cart_coords[:, 1], cart_coords[:, 2]
r, theta = get_spherical_coordinates(x, y, z)
dose = np.array(seed.dose(r, theta))
np.savetxt('Data/scintillator_{}_tg43_dose_rate.txt'.format(scintillator+1), dose, delimiter=',')
def export_trajectory_graph(scintillator_to_plot=1, show_figure=True):
# Exports a PNG of the robot trajectory graph correspoding to the positions in the Data folder.
data = np.genfromtxt('Data/scintillator_{}_position_relative_to_source.txt'.format(scintillator_to_plot), delimiter=',')
x, y, z = data[:,0], data[:,1], data[:,2]
x, y, z = np.round(x, 10), np.round(y, 10), np.round(z, 10)
NPOINTS = len(x)
COLOR='blue'
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
for i in range(NPOINTS-1):
ax.plot(x[i:i+2],y[i:i+2],z[i:i+2], alpha=float(i+int(NPOINTS/10))/(NPOINTS+int(NPOINTS/10+1)),color=COLOR)
ax.scatter(0,0,0, color = 'red', label = 'Source')
ax.set_xlabel('x [mm]')
ax.set_ylabel('y [mm]')
ax.set_zlabel('z [mm]')
ax.legend()
plt.savefig('Data/trajectory_scint_{}.png'.format(scintillator_to_plot), dpi=600)
if show_figure:
plt.show()
def export_list_of_commands():
# Writes in the list_of_movement_commands.txt file all of the movement commands in the log.txt file. These can then be copy pasted
# into the web interface to redo the same sequence.
unwanted_commands = ['GetPose', 'ActivateSim', 'ResetError', 'ActivateRobot', 'DeactivateSim', 'DeactivateRobot', 'Home', 'SetEOM']
with open('Data/list_of_movement_commands.txt', 'w+') as export_file:
export_file.write('SetEOM(0)\n')
with open('log.txt') as log:
for line in log.readlines():
# Only checking sent commands
if line[:4] == 'Sent':
# Skip unwanted commands
for command in unwanted_commands:
if line.find(command) != -1:
break
else:
export_file.write(line[6:])
\ No newline at end of file
import sys
import numpy as np
from MecaRobot.meca_robot_for_dosimetry import MecaRobotForDosimetry
# Moves the robot in straight lines of 150 mm towards the source in a water tank.
robot_ip = "192.168.0.100"
robot_port = 10000
simulation_mode = False
enable_log = True
scintillators_positions = np.array([224, 241, 250]) #In mm, relative to robot end (FRF)
# When the robot is manually jogged to the position depicted in Tools/calibration_setup.stl, the
# source is positionned at (10.00,0.02,-347.00) mm relative to the end of the robot.
source_position = np.array([212.943,108.693,136.658])+np.array([10,0.02,-347])
measure_with_nidaq = True #Enable tracking with NI daqmx board
daq_path_number = 2 #Path of the NI daqmx device, i.e. Dev1
analog_input_channels = (8, 9, 10, 11)
analog_output_channels = (0,)
robot = MecaRobotForDosimetry(ip = robot_ip,
port = robot_port,
simulation_mode = simulation_mode,
enable_log = enable_log,
scintillators_positions = scintillators_positions,
source_position = source_position,
measure_with_nidaq = measure_with_nidaq,
daq_path_number = daq_path_number,
analog_input_channels = analog_input_channels,
analog_output_channels = analog_output_channels)
robot.move_position_relative_to_source(np.array([0,-40,30]))
robot.wait_for('3004', 'Did not receive EOM.')
robot.start_tracking()
robot.turn_around_source(90,-2)
robot.turn_around_source(180,2)
robot.turn_around_source(90,-2)
robot.stop_tracking()
robot.run('MoveLinRelTRF', [0,-75,0,0,0,0])
robot.deactivate()
\ No newline at end of file
"""
TG43 class
"""
__all__ = [
"Seed",
]
import os
import warnings
import numpy as np
import xarray as xr
class SourceDataNotFound(Exception):
pass
def _coords3D(p):
if np.isscalar(p):
return np.array((p, )*3)
else:
return np.array(p)
def _get_sphere_coords(x, y, z, orientation=(0, 0, 1)):
r = np.sqrt(x**2 + y**2 + z**2)
m = x*orientation[0] + y*orientation[1] + z*orientation[2]
theta = np.degrees(np.arccos(m/r))
return r, theta
def domain(fn):
def wrapper(*args, domain=None):
a = fn(*args)
if domain is None:
return a
else:
return a.sel(r=slice(domain[0], domain[1]))
return wrapper
class Seed:
def __init__(self, dirname):
"""
Construct a TG43 source object.
The two top level functions are `line` and `point` that implement the
line-formalism and point-formalism TG43.
Some sources are already defined in the sources.py file. A new source
can be initialized with the datafile, the source length and the dose
rate constant.
`dirname`: the datafile directory name
"""
# find the absolute path of the data directory
path = os.path.join(os.path.dirname(__file__), "data", dirname)
if not os.path.isdir(path):
raise SourceDataNotFound(dirname)
## The length of the source
self.L = np.loadtxt(os.path.join(path, "L.dat"))
## The dose rate constant of the source
self.Lambda = np.loadtxt(os.path.join(path, "Lambda.dat"))
## The function gL
gL = np.loadtxt(os.path.join(path, "gL.dat")).T
self.gL = xr.DataArray(gL[1], dims=["r"], coords=[gL[0]])
## The function F
F = np.loadtxt(os.path.join(path, "F.dat"))
self.F = xr.DataArray(F[1:,1:].T, dims=["r", "theta"], coords=[F[0,1:], F[1:,0]])
## the function phi
if os.path.isfile(os.path.join(path, "phi.dat")):
phi = np.loadtxt(os.path.join(path, "phi.dat")).T
self.phi = xr.DataArray(phi[1], dims=["r"], coords=[phi[0]])
else:
self.phi = self.F.mean(dim="theta")
self._GL0 = self._beta(1, np.radians(90))/self.L
self.decimals_r = 1
def _beta(self, r, theta):
"""
Implementation of the beta function used in the geometric function (GL).
"""
L = self.L
L24 = (L**2)/4
theta1 = np.pi - theta
beta = np.arcsin((L/2)*np.sin(theta) / np.sqrt(r**2 + L24 - r*L*np.cos(theta))) + np.arcsin((L/2)*np.sin(theta1) / np.sqrt(r**2 + L24 - r*L*np.cos(theta1)))
return beta
def _GL(self, r, theta):
rad_theta = np.radians(theta)
GL_array = self._beta(r, rad_theta)/(self.L*r*np.sin(rad_theta))
GL_array = GL_array.where(theta != 0, 1/(r**2 - (self.L**2)/4))
GL_array = GL_array.where(theta != 180, 1/(r**2 - (self.L**2)/4))
GL_array = GL_array.where(r != 0, np.nan)
return GL_array
def _dose(self, r, theta):
return self.Lambda * (self._GL(r, theta) / self._GL0) * self.gL.interp(r=r) * self.F.interp(r=r, theta=theta)
def dose(self, r, theta):
r_arr = xr.DataArray(np.array(r, ndmin=1))
theta_arr = xr.DataArray(np.array(theta, ndmin=1))
return self._dose(r_arr, theta_arr)
def grid(self, boxmin=-10, boxmax=10, voxsize=0.1, sourcepos=0, orientation=(0, 0, 1)):
boxmin = _coords3D(boxmin)
boxmax = _coords3D(boxmax)
voxsize = _coords3D(voxsize)
sourcepos = _coords3D(sourcepos)
orientation = _coords3D(orientation)
# np.arange accumulates the floating point error. Here, the error is
# kept as small as possible.
x = boxmin[0] - sourcepos[0] + np.arange(0, boxmax[0] - boxmin[0] + voxsize[0], voxsize[0])
y = boxmin[1] - sourcepos[1] + np.arange(0, boxmax[1] - boxmin[1] + voxsize[1], voxsize[1])
z = boxmin[2] - sourcepos[2] + np.arange(0, boxmax[2] - boxmin[2] + voxsize[2], voxsize[2])
x = xr.DataArray(x, dims=["x"], coords=[x])
y = xr.DataArray(y, dims=["y"], coords=[y])
z = xr.DataArray(z, dims=["z"], coords=[z])
r, theta = _get_sphere_coords(x, y, z)
grid = self._dose(r, theta)
grid = grid.drop(["r", "theta"])
grid = grid.assign_coords(x=(grid.x + sourcepos[0]), y=(grid.y + sourcepos[1]), z=(grid.z + sourcepos[2]))
return grid
@domain
def get_mean(self, array):
array90 = array.interp(z=0)
r, _ = _get_sphere_coords(array90.x, array90.y, array90.z)
array90["r"] = r.round(decimals=self.decimals_r)
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
a = array90.groupby("r").mean()
return a
@domain
def get_std(self, array):
array90 = array.interp(z=0)
r, _ = _get_sphere_coords(array90.x, array90.y, array90.z)
array90["r"] = r.round(decimals=self.decimals_r)
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
a = array90.groupby("r").std()
return a
@domain
def get_cv(self, array):
array90 = array.interp(z=0)
r, _ = _get_sphere_coords(array90.x, array90.y, array90.z)
array90["r"] = r.round(decimals=self.decimals_r)
array90_groupby_r = array90.groupby("r")
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
mean = array90_groupby_r.mean()
std = array90_groupby_r.std()
return std/abs(mean)
@domain
def get_gL(self, array):
array90 = array.interp(z=0)
r, theta = _get_sphere_coords(array90.x, array90.y, array90.z)
array90 *= self._GL0 / self._GL(r, theta)
array90["r"] = r.round(decimals=self.decimals_r)
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
a = array90.groupby("r").mean()
return a/a.sel(r=1)
@domain
def get_F(self, array, R):
r, theta = _get_sphere_coords(array.x, array.y, array.z)
new_arr = array * self._GL0 / self._GL(r, theta)
s = np.linspace(-R, R, 101)
new_arr = new_arr.interp(x=s, y=s, z=s)
r, theta = _get_sphere_coords(new_arr.x, new_arr.y, new_arr.z)
new_arr["r"] = r.round(decimals=self.decimals_r)
new_arr["theta"] = theta.round()
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
a = new_arr.where(new_arr.r == R).groupby("theta").mean()
return a/a.sel(theta=90)
def get_calibration(self, array, significant_figures=5):
array90 = array.interp(z=0)
r, theta = _get_sphere_coords(array90.x, array90.y, array90.z)
array90["r"] = r.round(decimals=3)
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
a = array90.groupby("r").mean()
C = self.Lambda/a.sel(r=1).values
C = np.around(C, decimals=significant_figures-int(np.log10(C)))
return C
from pytg43.Seed import *
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment