Commit a9ba929c authored by Dominique Piche's avatar Dominique Piche

add queue for scintillator signal

parent 4a4d48e6
import numpy as np
import time
from scipy.spatial.transform import Rotation as R
import queue
import os
from MecaRobot.meca_robot import MecaRobot
from NI_DAQ.DAQ_Volt_Tracking_module import DAQ_Volt_Tracking
class MecaRobotForDosimetry(MecaRobot):
def __init__(self,
......@@ -15,10 +15,7 @@ class MecaRobotForDosimetry(MecaRobot):
scintillators_positions: np.ndarray = np.array([0]),
source_position: np.ndarray = np.array([0,0,0]),
water_level: float = -np.inf,
measure_with_nidaq: bool = False,
daq_path_number: int = None,
analog_input_channels: tuple = None,
analog_output_channels: tuple = None):
scintillators_signal_queue: queue.Queue = None):
"""
:param ip : robot ip
......@@ -32,11 +29,8 @@ class MecaRobotForDosimetry(MecaRobot):
reference frame).
:param water_level : height of the water in relation to the WRF. If specified, this will make sure
the robot never touches the water during sequences.
:param measure_with_nidaq : enables tracking with NI data acquisition board
The next parameters are used when tracking with a NI data acquisition board.
:param : daq_path_number : number of the daq board path position (i.e. 1 for Dev1, 2 for Dev2)
:param : analog_input_channels : analog input channels to use
:param : analog_output_channels : analog output channels to use
:param scintillators_signal_queue : if specified, the robot will track the signal from this queue in
the scintilators_signal.txt file.
See https://mecademic.com/Documentation/Meca500-R3-Programming-Manual.pdf for more details
about the reference frames definition.
......@@ -47,23 +41,11 @@ class MecaRobotForDosimetry(MecaRobot):
simulation_mode = simulation_mode,
enable_log = enable_log)
self.measure_with_nidaq = measure_with_nidaq
self.scintillators_relative_positions = scintillators_positions - scintillators_positions[0]
self.number_of_scintillators = len(scintillators_positions)
self.source_position = source_position
if analog_input_channels is not None:
self.number_of_input_channels = len(analog_input_channels)
if analog_output_channels is not None:
self.number_of_output_channels = len(analog_output_channels)
# Connecting to the NI_DAQ board
if measure_with_nidaq:
self.daq_board = DAQ_Volt_Tracking(daq_path_number=daq_path_number,
input_channels=analog_input_channels,
output_channels=analog_output_channels)
self.daq_board.start()
self.scintillators_signal_queue = scintillators_signal_queue
self.source_offset = np.array([0, 0, 0])
......@@ -110,7 +92,7 @@ class MecaRobotForDosimetry(MecaRobot):
self.scintillators_distance_to_source_files.append(open(file_path, 'w+'))
# Creating scintillators' signal file
if self.measure_with_nidaq:
if self.scintillators_signal_queue is not None:
self.scintillators_signal_file = open(os.path.join(current_path, 'Data/scintillators_signal.txt'), 'w+')
self.start_time = time.time()
......@@ -133,7 +115,7 @@ class MecaRobotForDosimetry(MecaRobot):
scintillator_distance_file.write(str(scintillators_distance_to_source[i]) + '\n')
# Writing the scintillators' signal in the signal file
if self.measure_with_nidaq:
if self.scintillators_signal_queue is not None:
scintillators_signal = self.get_scintillators_signal()
self.scintillators_signal_file.write(np.array2string(scintillators_signal, separator=',')[1:-1] + '\n')
......@@ -152,22 +134,18 @@ class MecaRobotForDosimetry(MecaRobot):
distance_to_source_file.close()
# Closing signals files
if self.measure_with_nidaq:
if self.scintillators_signal_queue is not None:
self.scintillators_signal_file.close()
def deactivate(self):
super().deactivate()
if self.measure_with_nidaq:
self.daq_board.close()
def get_scintillators_signal(self) -> np.ndarray:
"""
Reads the last sample acquired from the NI DAQ board to get the scintillators' signal. The board is sampled at 10 Hz.
Returns every input channel in an array.
Reads the scintillators signal from the signal queue.
"""
# Because the DAQ board is sampled at 10 Hz, we wait 0.1 s to make sure the reading corresponds to this dwell position.
time.sleep(0.1)
scintillators_signal = self.daq_board.last_read_val
scintillators_signal = self.signal_queue.get()
return scintillators_signal
def get_scintillators_positions(self) -> np.ndarray:
......@@ -216,15 +194,6 @@ class MecaRobotForDosimetry(MecaRobot):
return scintillators_positions_relative_to_source
def set_source_position(self, source_position_relative_to_TRF: np.ndarray):
"""
Sets the source position. Takes the current source position in relation to the TRF. The user has to ensure the robot is at
a complete stop before running this method.
"""
pose = self.get_TRF_pose()
TRF_position = pose[0:3]
self.source_position = TRF_position + source_position_relative_to_TRF
def find_source_position(self, distance_to_source=20):
"""
Automatically finds the source's position by moving along every axis near the source until we get the maximum signal.
......@@ -236,9 +205,9 @@ class MecaRobotForDosimetry(MecaRobot):
"""
self.clear_buffer()
x_axis_signal = np.zeros((100,self.number_of_input_channels))
y_axis_signal = np.zeros((100,self.number_of_input_channels))
z_axis_signal = np.zeros((200,self.number_of_input_channels))
x_axis_signal = []
y_axis_signal = []
z_axis_signal = []
# Moving the robot near the source
self.move_position_relative_to_source(np.array([-5,50,0]))
......@@ -249,7 +218,7 @@ class MecaRobotForDosimetry(MecaRobot):
for i in range(100):
self.run('MoveLinRelTRF', [0.1,0,0,0,0,0])
self.wait_for('3012', 'Did not receive EOB')
x_axis_signal[i] = self.get_scintillators_signal()
x_axis_signal.append(self.get_scintillators_signal())
# Moving the robot near the source
self.move_position_relative_to_source(np.array([-10,50,0]))
......@@ -261,26 +230,29 @@ class MecaRobotForDosimetry(MecaRobot):
for i in range(100):
self.run('MoveLinRelTRF', [0,0.1,0,0,0,0])
self.wait_for('3012', 'Did not receive EOB.')
y_axis_signal[i] = self.get_scintillators_signal()
y_axis_signal.append(self.get_scintillators_signal())
# Moving the robot near the source
self.move_position_relative_to_source(np.array([-50,-5,0]))
self.move_position_relative_to_source(np.array([0,50,-10]))
self.move_position_relative_to_source(np.array([0,distance_to_source,-10]))
self.wait_for('3012', 'Did not receive EOB.')
# Moving along the z-axis
for i in range(200):
self.run('MoveLinRelTRF', [0,0,0.1,0,0,0])
self.wait_for('3012', 'Did not receive EOB.')
z_axis_signal[i] = self.get_scintillators_signal()
z_axis_signal.append(self.get_scintillators_signal())
self.move_position_relative_to_source(np.array([0,50,-10]))
self.zero_joints()
self.wait_for('3012', 'Did not receive EOB.')
x_max = np.argmax(x_axis_signal[:,0])*0.1-4.9
y_max = np.argmax(y_axis_signal[:,0])*0.1-4.9
z_max = np.argmax(z_axis_signal[:,0])*0.1-9.9
x_axis_signal, y_axis_signal, z_axis_signal = np.array(x_axis_signal), np.array(y_axis_signal), np.array(z_axis_signal)
x_max = np.argmax(x_axis_signal[:,1])*0.1-4.9
y_max = np.argmax(y_axis_signal[:,1])*0.1-4.9
z_max = np.argmax(z_axis_signal[:,1])*0.1-9.9
np.savetxt('Data/x_calibration_data.txt', x_axis_signal, delimiter = ',')
np.savetxt('Data/y_calibration_data.txt', y_axis_signal, delimiter = ',')
......@@ -307,14 +279,6 @@ class MecaRobotForDosimetry(MecaRobot):
direction_vector = direction_vector / np.linalg.norm(direction_vector)
for i in range(abs(int(total_distance/step))):
# Cheking if the next step will send the robot lower than the water level
print(self.last_robot_end_height, step * direction_vector[2], self.water_level, self.water_level_safety_buffer)
if (self.last_robot_end_height + step * direction_vector[2]) <= (self.water_level + self.water_level_safety_buffer):
if self.enable_log:
with open('log.txt', 'a+') as log:
log.write('The robot is getting too close to the water level, stopping the sequence.\n')
break
# Writing the movement in the form [x, y, z, alpha, beta, gamma] used by the robot.
lin_pose = list(step * direction_vector) + [0, 0, 0]
......@@ -327,6 +291,12 @@ class MecaRobotForDosimetry(MecaRobot):
self.wait_for('3004', 'Did not receive EOM')
self._track_data()
# Cheking if the next step will send the robot lower than the water level
if (self.last_robot_end_height + step * direction_vector[2]) <= (self.water_level + self.water_level_safety_buffer):
if self.enable_log:
with open('log.txt', 'a+') as log:
log.write('The robot is getting too close to the water level, stopping the sequence.\n')
break
def move_position_relative_to_source(self, position: np.ndarray):
"""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment