reoc/simu-lora/simlib/radio.py

122 lines
5.6 KiB
Python

#!/usr/bin/env python3
from simlib.defaults import *
from simlib.eventscheduler import *
from simlib.receptioncount import *
class Radio(object):
def __init__(self, channels, device, duty_cycle=DEFAULT.ENDDEVICE.duty_cycle, txonly=False):
if not isinstance(channels, tuple) or channels == ():
raise TypeError('channels must be arranged in a tuple and their number cannot be null')
self.__state = DEFAULT.RADIO.STATE.SLEEP
self.__outgoing_packet = None
self.__incoming_packet = None
self.__receptions = dict([(channel, ReceptionCount(self, channel)) for channel in channels])
self.__txonly = txonly
self.__current_channel = None
self.__device = device
self.__duty_cycle = duty_cycle
self.__duty_cycle_active = False
self.eventscheduler = EventScheduler()
def is_channel_available(self, channel):
return channel in self.__receptions.keys()
def get_fixed_channel(self):
if len(self.__receptions.keys()) == 1:
return list(self.__receptions.keys())[0]
def __set_channel(self, channel):
assert self.__current_channel is None
assert channel in self.__receptions.keys() or (len(self.__receptions) == 1 and channel is None)
self.__current_channel = channel if len(self.__receptions) > 1 else list(self.__receptions.keys())[0]
def __unset_channel(self):
assert self.__current_channel is not None
self.__current_channel = None
def is_sleep(self):
return self.__state == DEFAULT.RADIO.STATE.SLEEP
def can_transmit(self):
return self.is_sleep() and not self.__duty_cycle_active
def transmit(self, packet, channel=None):
assert self.__state == DEFAULT.RADIO.STATE.SLEEP
assert self.__outgoing_packet is None
assert not self.__duty_cycle_active
self.__outgoing_packet = packet
self.__set_channel(channel)
self.eventscheduler.schedule_event(packet.get_toa(), self.transmit_completed_cb)
packet.start_transmission(self.__current_channel)
self.__state = DEFAULT.RADIO.STATE.TRANSMITTING
def transmit_completed_cb(self):
assert self.__state == DEFAULT.RADIO.STATE.TRANSMITTING
assert self.__outgoing_packet is not None
packet = self.__outgoing_packet
self.__outgoing_packet = None
channel = self.__current_channel
self.__unset_channel()
packet.stop_transmission()
self.__device.transmit_completed_cb(packet, channel)
self.__duty_cycle_active = True
self.eventscheduler.schedule_event(packet.get_toa() * (100 / self.__duty_cycle - 1), self.__start_availability)
self.__state = DEFAULT.RADIO.STATE.SLEEP
def start_listening(self, channel=None):
if self.__txonly:
raise Exception('starting listening on a txonly channel should never happen')
assert self.__state == DEFAULT.RADIO.STATE.SLEEP
self.__set_channel(channel)
self.__state = (DEFAULT.RADIO.STATE.LISTENING
if self.__receptions[self.__current_channel].get() == 0
else DEFAULT.RADIO.STATE.COLLISION)
def stop_listening(self):
if self.__txonly:
raise Exception('stopping listening on a txonly channel should never happen')
assert self.__state not in [DEFAULT.RADIO.STATE.SLEEP, DEFAULT.RADIO.STATE.TRANSMITTING]
self.__incoming_packet = None
self.__unset_channel()
self.__state = DEFAULT.RADIO.STATE.SLEEP
def receive(self, packet, channel):
if self.__txonly:
raise Exception('starting receiveing on a txonly channel should never happen')
self.__receptions[channel].increment(packet.get_toa())
if self.__current_channel != channel or self.__state == DEFAULT.RADIO.STATE.TRANSMITTING:
return
device_id = self.__device.get_attributes().id_device
packet.start_reception(device_id=device_id)
if self.__state == DEFAULT.RADIO.STATE.LISTENING:
assert self.__incoming_packet is None
self.__incoming_packet = packet
self.__state = DEFAULT.RADIO.STATE.RECEIVING
else:
assert (self.__incoming_packet is not None) == (self.__state == DEFAULT.RADIO.STATE.RECEIVING)
if self.__incoming_packet is not None:
self.__incoming_packet.stop_reception(device_id=device_id, outcome=False)
self.__incoming_packet = None
packet.stop_reception(device_id=device_id, outcome=False)
self.__state = DEFAULT.RADIO.STATE.COLLISION
def receive_completed_cb(self, channel):
if self.__txonly:
raise Exception('receiving cannot be completed on a txonly channel')
if self.__current_channel != channel or self.__state == DEFAULT.RADIO.STATE.TRANSMITTING:
return
assert self.__state != DEFAULT.RADIO.STATE.LISTENING
if self.__state == DEFAULT.RADIO.STATE.RECEIVING:
assert self.__receptions[channel].get() == 0 and self.__incoming_packet is not None
self.__incoming_packet.stop_reception(device_id=self.__device.get_attributes().id_device, outcome=True)
self.__device.receive_completed_cb(self.__incoming_packet, channel)
self.__incoming_packet = None
self.__state = DEFAULT.RADIO.STATE.LISTENING if self.__receptions[
channel].get() == 0 else DEFAULT.RADIO.STATE.COLLISION
def __start_availability(self):
assert self.__duty_cycle_active
self.__duty_cycle_active = False
self.__device.start_availability_cb()