dumber/software/monitor/monitor-python-qt/monitor-python.py

425 lines
16 KiB
Python
Executable file

#!/usr/bin/env python3
"""Monitor-python : a monitor tool for robot control, using QT5
Usage: monitor-python.py
"""
from email.message import Message
from email.policy import Policy
import os
import time
import sys
from tkinter import Image
from PyQt5 import (QtCore, QtWidgets, QtGui)
from main_window import Ui_MainWindow
from log_dialog import Ui_Dialog
from network import *
from globvar import GlobVar
import base64
__author__ = "Sebastien DI MERCURIO"
__copyright__ = "Copyright 2024, INSA Toulouse"
__credits__ = ["Sebastien DI MERCURIO"]
__license__ = "GPL"
__version__ = "1.1"
__maintainer__ = "Sebastien DI MERCURIO"
__email__ = "dimercur@insa-toulouse.fr"
__status__ = "Production"
"""
The main application class.
Instantiate it for opening main window
"""
class Window(QtWidgets.QMainWindow, Ui_MainWindow):
_msg_dialog= None
_batteryTimer=None
_FPSTimer=None
fps=0
def __init__(self, parent=None):
super().__init__(parent)
self.setupUi(self)
self.lineEdit_address.setText(GlobVar.address)
self.lineEdit_port.setText(str(GlobVar.port))
self._msg_dialog=QtWidgets.QDialog()
self._msg_dialog.ui= Ui_Dialog()
self._msg_dialog.ui.setupUi(self._msg_dialog)
self.fps=0
self.networkThread = Network()
self.connectSignalSlots()
# Start network thread and wait for event indicating monitor is connected to a target
self.networkThread.start()
# Create battery timer
self._batteryTimer = QtCore.QTimer()
self._batteryTimer.timeout.connect(self.OnBatteryTimeout)
# Create fps timer
self._FPSTimer = QtCore.QTimer()
self._FPSTimer.timeout.connect(self.OnFPSTimeout)
self.DisableUIWidgets("Network")
self.DisableUIWidgets("Robot")
def connectSignalSlots(self):
# Buttons
self.pushButton_start.pressed.connect(self.OnButtonPress_Start)
self.pushButton_confirmArena.pressed.connect(self.OnButtonPress_ConfirmArena)
self.pushButton_up.pressed.connect(self.OnButtonPress_Up)
self.pushButton_down.pressed.connect(self.OnButtonPress_Down)
self.pushButton_stop.pressed.connect(self.OnButtonPress_Stop)
self.pushButton_left.pressed.connect(self.OnButtonPress_Left)
self.pushButton_right.pressed.connect(self.OnButtonPress_Right)
# Checkbox
self.checkBox_enableCamera.stateChanged.connect(self.OnCheckBoxChanged_EnableCamera)
self.checkBox_enableFPS.stateChanged.connect(self.OnCheckBoxChanged_EnableFPS)
self.checkBox_enablePosition.stateChanged.connect(self.OnCheckBoxChanged_EnablePosition)
self.checkBox_getBattery.stateChanged.connect(self.OnCheckBoxChanged_GetBattery)
# LineEdit
self.lineEdit_address.textChanged.connect(self.OnLineEditChange_Address)
self.lineEdit_port.textChanged.connect(self.OnLineEditChange_Port)
# Menu
self.action_OpenMessageLog.triggered.connect(self.OnMenu_OpenMessageLog)
self.action_Quitter.triggered.connect(self.OnMenu_Quitter)
# Message Dialog
self._msg_dialog.ui.pushButton_clearLog.pressed.connect(self.OnButtonPress_ClearLog)
self._msg_dialog.ui.pushButton_closeLog.pressed.connect(self.OnButtonPress_CloseLog)
# Network signals
self.networkThread.receptionEvent.connect(self.OnReceptionEvent)
self.networkThread.connectionEvent.connect(self.OnConnectionEvent)
self.networkThread.logEvent.connect(self.OnLogEvent)
self.networkThread.answerEvent.connect(self.OnAnswerEvent)
def EnableUIWidgets(self, area):
if area == "Network":
# Widget for robot connection
self.checkBox_watchdog.setDisabled(False)
self.pushButton_start.setDisabled(False)
# Widget for camera and position
self.label_Image.setDisabled(False)
self.pushButton_confirmArena.setDisabled(False)
self.checkBox_enableCamera.setDisabled(False)
self.checkBox_enableFPS.setDisabled(False)
self.checkBox_enablePosition.setDisabled(False)
self.label_RobotID.setDisabled(False)
self.label_RobotAngle.setDisabled(False)
self.label_RobotPos.setDisabled(False)
self.label_RobotDirection.setDisabled(False)
else: # area Robot
# Widget for robot mouvement and battery
self.groupBox_mouvments.setDisabled(False)
self.groupBox_AnswerandBattery.setDisabled(False)
def DisableUIWidgets(self, area):
if area == "Network":
# Widget for robot connection
self.checkBox_watchdog.setDisabled(True)
self.pushButton_start.setDisabled(True)
# Widget for camera and position
self.label_Image.setDisabled(True)
self.pushButton_confirmArena.setDisabled(True)
self.checkBox_enableCamera.setDisabled(True)
self.checkBox_enableFPS.setDisabled(True)
self.checkBox_enablePosition.setDisabled(True)
self.label_RobotID.setDisabled(True)
self.label_RobotAngle.setDisabled(True)
self.label_RobotPos.setDisabled(True)
self.label_RobotDirection.setDisabled(True)
else: # area Robot
# Widget for robot mouvement and battery
self.groupBox_mouvments.setDisabled(True)
self.groupBox_AnswerandBattery.setDisabled(True)
@QtCore.pyqtSlot(str)
def OnLineEditChange_Address(self, text):
if self.networkThread.checkAddressIsValid(text):
GlobVar.address = text
@QtCore.pyqtSlot(str)
def OnLineEditChange_Port(self, text):
GlobVar.port = int(text)
@QtCore.pyqtSlot()
def OnButtonPress_Start (self):
if self.pushButton_start.text() == "Start r&obot":
if self.networkThread.robotOpenCom() == NetworkAnswers.ACK:
self.networkThread.robotReset()
# com opened successfully, start robot
if self.checkBox_watchdog.isChecked():
# start with watchdog
if self.networkThread.robotStartWithWatchdog() == NetworkAnswers.ACK:
self.pushButton_start.setText("Reset r&obot")
self.EnableUIWidgets("Robot")
else:
# start without watchdog
if self.networkThread.robotStartWithoutWatchdog() == NetworkAnswers.ACK:
self.pushButton_start.setText("Reset r&obot")
self.EnableUIWidgets("Robot")
else:
if self.networkThread.robotCloseCom() != NetworkAnswers.ACK:
msg= QtWidgets.QMessageBox
msg.warning(self,'Invalid answer', 'Server answer was not acknowledged. Maybe robot is still running', msg.Ok)
self.pushButton_start.setText("Start r&obot")
self.DisableUIWidgets("Robot")
@QtCore.pyqtSlot()
def OnButtonPress_ConfirmArena(self):
self.networkThread.cameraAskArena()
msg= QtWidgets.QMessageBox
ret = msg.question(self, '', 'Arena boundaries are correctly detected ?',msg.Yes| msg.No)
if ret == msg.Yes:
self.networkThread.cameraConfirmArena()
print ("Answer is YES")
else:
self.networkThread.cameraInfirmArena()
print ("Answer is NO")
@QtCore.pyqtSlot()
def OnButtonPress_Up(self):
self.networkThread.robotGoForward()
@QtCore.pyqtSlot()
def OnButtonPress_Down(self):
self.networkThread.robotGoBackward()
@QtCore.pyqtSlot()
def OnButtonPress_Stop(self):
self.networkThread.robotStop()
@QtCore.pyqtSlot()
def OnButtonPress_Left(self):
self.networkThread.robotGoLeft()
@QtCore.pyqtSlot()
def OnButtonPress_Right(self):
self.networkThread.robotGoRight()
@QtCore.pyqtSlot(int)
def OnCheckBoxChanged_EnableCamera(self, state):
if self.checkBox_enableCamera.isChecked():
self.networkThread.cameraOpen()
else:
self.networkThread.cameraClose()
@QtCore.pyqtSlot(int)
def OnCheckBoxChanged_EnableFPS(self, state):
if state !=0:
self._FPSTimer.start(1000)
self.checkBox_enableFPS.setText("FPS (0)")
else:
self._FPSTimer.stop()
self.checkBox_enableFPS.setText("Enable FPS")
@QtCore.pyqtSlot(int)
def OnCheckBoxChanged_EnablePosition(self, state):
if self.checkBox_enablePosition.isChecked():
self.networkThread.cameraGetPosition()
else:
self.networkThread.cameraStopPosition()
@QtCore.pyqtSlot(int)
def OnCheckBoxChanged_GetBattery(self, state):
if state !=0:
self._batteryTimer.start(5000)
else:
self._batteryTimer.stop()
@QtCore.pyqtSlot()
def OnMenu_OpenMessageLog(self):
self._msg_dialog.show()
@QtCore.pyqtSlot()
def OnMenu_Quitter(self):
self._msg_dialog.hide()
self.close()
def closeEvent(self, event):
self._msg_dialog.hide()
event.accept()
@QtCore.pyqtSlot()
def OnButtonPress_ClearLog(self):
plainTextEdit=self._msg_dialog.ui.plainTextEdit
plainTextEdit.document().clear()
@QtCore.pyqtSlot()
def OnButtonPress_CloseLog(self):
self._msg_dialog.hide()
# Callback used to decode non answer message from server (mainly battery level and log message)
@QtCore.pyqtSlot(str)
def OnReceptionEvent(self, s) -> None:
if Network.ROBOT_BATTERY_LEVEL in s:
str_split = s.split(':')
try:
batteryLevel = int(str_split[1])
except:
batteryLevel = -1
if batteryLevel == 0:
self.checkBox_getBattery.setText ("Get battery (0 = empty)")
elif batteryLevel == 1:
self.checkBox_getBattery.setText ("Get battery (1 = low)")
elif batteryLevel == 2:
self.checkBox_getBattery.setText ("Get battery (2 = full)")
else:
self.checkBox_getBattery.setText ("Get battery (invalid value)")
elif Network.CAMERA_IMAGE in s:
#print ("Image received")
#print ("Date received: " + s)
self.fps=self.fps+1
str_split = s.split(':')
try:
image_jpg= base64.b64decode(str_split[1])
img = QtGui.QImage.fromData(image_jpg, "jpg")
im_pixmap = QtGui.QPixmap(QtGui.QPixmap.fromImage(img))
#print ("Image size: " + str(im_pixmap.width()) + "x" + str(im_pixmap.height()))
self.label_Image.setPixmap(im_pixmap)
self.label_Image.setScaledContents(True)
self.label_Image.setSizePolicy(QtWidgets.QSizePolicy.Fixed,QtWidgets.QSizePolicy.Fixed)
except:
pass
#print ("Invalid image received")
elif Network.CAMERA_POSITION in s:
#CPOS:-1;0.000000;0.000000;0.000000;0.000000;0.000000
str_split = s.split(':')
values_split = str_split[1].split(';')
try:
robot_ID = int(values_split[0]) # Id of robot
except:
robot_ID = -1
try:
robot_Angle = float(values_split[1]) # angle of robot
except:
robot_Angle = 0.0
try:
robot_Coord_X = float(values_split[2]) # X coord of robot
except:
robot_Coord_X = 0.0
try:
robot_Coord_Y = float(values_split[3]) # Y coord of robot
except:
robot_Coord_Y = 0.0
try:
robot_Cap_X = float(values_split[4]) # X cap of robot
except:
robot_Cap_X = 0.0
try:
robot_Cap_Y = float(values_split[5]) # Y cap of robot
except:
robot_Cap_Y = 0.0
if robot_ID == -1:
self.label_RobotID.setText("No robot (-1)")
else:
self.label_RobotID.setText(values_split[0])
self.label_RobotAngle.setText("%.2f°" % (robot_Angle))
self.label_RobotPos.setText("(%.2f, %.2f)" % (robot_Coord_X, robot_Coord_Y))
self.label_RobotDirection.setText("(%.2f, %.2f)" % (robot_Cap_X, robot_Cap_Y))
# Callback for battery timeout
@QtCore.pyqtSlot()
def OnBatteryTimeout(self) -> None:
# Send a request for battery level. Answer will be done in OnReceptionEvent callback
self.networkThread.robotGetBattery()
# Callback for FPS timeout
@QtCore.pyqtSlot()
def OnFPSTimeout(self) -> None:
# Display current FPS
self.checkBox_enableFPS.setText("FPS (" + str(self.fps)+")")
self.fps=0
# Callback for connection/deconnection event from network manager
@QtCore.pyqtSlot(int)
def OnConnectionEvent(self, event) -> None:
if event == NetworkEvents.EVENT_CONNECTED:
GlobVar.connectedToPi = True
print ("Connected to server")
self.label_connectionStatus.setText("Connected")
self.EnableUIWidgets("Network")
elif event == NetworkEvents.EVENT_CONNECTION_LOST:
GlobVar.connectedToPi = False
print ("Disconnected from server")
self.label_connectionStatus.setText("Not connected")
self.pushButton_start.setText("Start r&obot")
self.DisableUIWidgets("Network")
self.DisableUIWidgets("Robot")
# Callback for answer event from network manager
@QtCore.pyqtSlot(int)
def OnAnswerEvent(self, ans) -> None:
if ans == NetworkAnswers.ACK:
self.label_lastAnswer.setText("Acknowledged (AACK)")
elif ans == NetworkAnswers.NACK:
self.label_lastAnswer.setText("Not acknowledged (ANAK)")
elif ans == NetworkAnswers.COM_ERROR:
self.label_lastAnswer.setText("Command error (ACER)")
elif ans == NetworkAnswers.TIMEOUT_ERROR:
self.label_lastAnswer.setText("Timeout - no answer")
elif ans == NetworkAnswers.CMD_REJECTED:
self.label_lastAnswer.setText("Command rejected (ACRJ)")
else:
self.label_lastAnswer.setText("Unknown answer")
# Callback for log event from network manager
@QtCore.pyqtSlot(str)
def OnLogEvent(self, txt) -> None:
self._msg_dialog.ui.plainTextEdit.textCursor().insertText(txt)
try:
if len(sys.argv)>=2:
GlobVar.address = sys.argv[1]
else:
print ("No target address specified: using localhost:5544")
print ("Usage: monitor-python.py address [port]")
#exit (-1) # Comment this line for connecting to localhost
if len(sys.argv)>=3:
GlobVar.port = int(sys.argv[2])
app = QtWidgets.QApplication(sys.argv)
window = Window()
window.show()
app.exec_()
except KeyboardInterrupt: # exception when pressing CTRL-C
print ("Bye bye")