class Register(object): def __init__(self): self.wert = None # int 0..255 self.OnChange = None # Callback(art 0..2,wert int) self.bus = None self.name = None self.enable = 0 # int 0..1 self.load = 0 # int 0..1 def getWert(self): return self.wert def setWert(self, w): if self.wert != w: self.wert = w if (self.OnChange != None): self.OnChange(0,self.wert) if (self.enable == 1) and (self.bus != None): self.bus.setWert(self.wert) def setEnable(self,w): # einschalten if (self.enable == 0) and (w == 1): self.enable = 1 if self.OnChange != None: self.OnChange(1,1) if self.bus != None: self.bus.anmeldenS(self) # self.bus.setWert(self.wert) # ausschalten if (self.enable == 1) and (w == 0): self.enable = 0 if self.OnChange != None: self.OnChange(1,0) if self.bus != None: self.bus.abmeldenS(self) def setLoad(self,w): # einschalten if (self.load == 0) and (w == 1): self.load = 1 if self.OnChange != None: self.OnChange(2,1) if self.bus != None: self.bus.anmeldenL(self) # ausschalten if (self.load == 1) and (w == 0): self.load = 0 if self.OnChange != None: self.OnChange(2,0) if self.bus != None: self.bus.abmeldenL(self) class BusError(Exception): pass class Bus(object): def __init__(self): self.wert = None self.OnChange = None # Callback(wert int) self.OnError = None # Callback(Schreiberliste) self.leser = [] self.schreiber = [] def setWert(self, w): if self.wert != w: # Bus-Wert setzen self.wert = w # Callback aufrufen if self.OnChange != None: self.OnChange(self.wert) # Leser benachrichtigen for k in self.leser: k.setWert(self.wert) def anmeldenL(self, reg): self.leser = self.leser + [reg] reg.setWert(self.wert) def abmeldenL(self, reg): self.leser.remove(reg) def anmeldenS(self, reg): self.schreiber = self.schreiber + [reg] if len(self.schreiber) > 1: # raise BusError # print('Fehler: Zu viele Schreiber!') if self.OnError != None: self.OnError(self.schreiber) else: self.setWert(reg.wert) def abmeldenS(self, reg): self.schreiber.remove(reg) if len(self.schreiber) == 0: self.setWert(None) if __name__ == "__main__": # Test der Klasse Register allein # Test-Callback für Konsole def a_debug(art,wert): print('a: art: ',art,', wert: ',wert) print('Tests zu Register allein') a = Register() a.wert = 17 a.OnChange = a_debug a.setWert(42) a.setWert(42) a.setEnable(0) a.setEnable(0) a.setEnable(1) a.setEnable(1) a.setEnable(0) a.setLoad(0) a.setLoad(0) a.setLoad(1) a.setLoad(1) a.setLoad(0) del(a) print() # Tests von Transfers über den Bus # weitere Test-Callbacks für Konsole def b_debug(art,wert): print('b: art: ',art,', wert: ',wert) def c_debug(art,wert): print('c: art: ',art,', wert: ',wert) def bus_debug(wert): print('Bus: ',wert) print('Tests von Transfers über den Bus') bus = Bus() bus.OnChange = bus_debug a = Register() a.wert = 17 a.bus = bus a.OnChange = a_debug # a.bus = bus b = Register() b.OnChange = b_debug b.bus = bus c = Register() c.bus = bus c.OnChange = c_debug # c.bus = bus print('Transfer von a nach c') a.setEnable(1) c.setLoad(1) c.setLoad(0) a.setEnable(0) print('c: ',c.wert,'\n') print('b liest vom leeren Bus') b.setLoad(1) b.setLoad(0) print('b: ',b.wert,'\n') print('Transfer von a nach b mit verzögertem Schreiben') b.setLoad(1) print('b: ',b.wert) a.setEnable(1) b.setLoad(0) a.setEnable(0) print('b: ',b.wert,'\n') print('Buserror') try: a.setEnable(1) b.setEnable(1) except BusError: print('Buskonflikt: a und b versuchen auf Bus zu schreiben!')