# model import serial def scan(): """ gibt eine Liste von Strings der Namen der verfuegbaren Ports zurueck """ L = [] for i in range(10): try: s = serial.Serial('/dev/ttyS'+str(i)) L.append(s.portstr) s.close() except serial.SerialException: pass for i in range(10): try: s = serial.Serial('/dev/ttyUSB'+str(i)) L.append(s.portstr) s.close() except serial.SerialException: pass for i in range(10): try: s = serial.Serial('COM'+str(i)) L.append(s.portstr) s.close() except serial.SerialException: pass for i in range(10): try: s = serial.Serial(i) L.append(i) s.close() except serial.SerialException: pass return L def ByteToBin(b): """ gibt einen String der Länge 8 zurück, der das Byte b darstellt, Beispiel: 65 --> '01000001' """ return bin(b)[2:].zfill(8) def BinToByte(s): """ gibt ein Byte zurück, das aus dem String s ermiitelt wurde """ return int(s,2) import time def warteFlanke01(getLeitung,stopflag): """ die Funktion blockiert bis über 'getLeitung' ein 0-1-Flankenwechsel entdeckt wurde, die Blockade wird vorzeitig abgebrochen, wenn enabled gleich False wird """ alt = getLeitung() neu = getLeitung() while (alt or not neu) and (not stopflag.is_set()): time.sleep(0.0005) alt = neu neu = getLeitung() # print('Flanke entdeckt bzw. Schleife abgebrochen') # DEBUG def receiveByte(bitzeit,getLeitung): """ im miniRS232-Protokoll wird mit der Bitzeit bitzeit in s und der Funktion getLeitung zum Abtasten der Leitung ein Bitmuster bm (str) zurueckgegeben getLeitung hat den Typ bool """ bm = '' # Anfang Startbit time.sleep(1.5*bitzeit) # mitten im ersten Datenbit # Datenbits abtasten for i in range(8): if getLeitung(): bm = bm + '1' else: bm = bm + '0' time.sleep(bitzeit) # mitten im Stoppbit return bm # Bitmuster --> Byte def receive(bitzeit,getLeitung,puffer,stopflag): """ die Funktion haengt empfangene Bytes dem bytearray puffer an, dabei wird das miniRS232-Protokoll mit der Bitzeit bitzeit in s und die Funktion getLeitung (Rueckgabe bool) zum Abtasten der Leitung verwendet, die Funktion arbeitet solange enabled True ist """ print('receive gestartet') # DEBUG while not stopflag.is_set(): warteFlanke01(getLeitung,stopflag) if not stopflag.is_set(): bm = receiveByte(bitzeit,getLeitung) b = int(bm,2) print('empfangen:',bm,chr(b),b) # DEBUG puffer.append(b) print('receive gestoppt') # DEBUG import threading class Model(object): def __init__(self): self.scan = scan() self.s = serial.Serial(self.scan[0]) self.s.setRTS(False) self.puffer = bytearray([]) self.stopflag = threading.Event() self.bitzeit = 0.01 self.empfThread = None def stop(self): self.stopflag.set() def start(self,bitzeit): self.stopflag.set() # zuerst alten Thread sicher stoppen """ if (self.empfThread != None) and self.empfThread.is_alive(): self.empfThread.join() """ self.stopflag.clear() self.bitzeit = bitzeit self.empfThread = threading.Thread(target=receive,args=(self.bitzeit,self.s.getCTS,self.puffer,self.stopflag)) self.empfThread.start() def __del__(self): # schliesst sicher die Schnittstelle self.s.close() # view def decodiert(bf): """ wandelt eine Bytefolge bf in einen Pythonstring um, dabei wird aus einem Byte b das Zeichen chr(b) """ return ''.join([chr(b) for b in bf]) import tkinter import time class View(tkinter.Tk): def __init__(self,cbSetPort,cbStart,cbStop,cbGet,cbClear,cbHalt): tkinter.Tk.__init__(self) # Callbacks self.cbSetPort = cbSetPort # setzt Port self.cbStart = cbStart # startet Empfang self.cbStop = cbStop # stoppt Empfang self.cbGetNachricht = cbGet # gibt Nachricht als bytes zurueck self.cbClear = cbClear # loescht Nachricht self.protocol("WM_DELETE_WINDOW",cbHalt) # cbHalt wird aufgerufen, wenn das 'X' gedrueckt wird # Fenster self.title("Empfänger 1") self.geometry('325x200+600+200') # Frame oben ---------------------------------------------- self.fo = tkinter.Frame(self) self.fo.pack(fill='x',padx=10,ipady=10) # Label Port self.lP = tkinter.Label(self.fo, text='Port:') self.lP.pack(side='left') # Optionmenu self.vS = tkinter.StringVar(self.fo) self.scan = scan() self.vS.set(self.scan[0]) self.oS = tkinter.OptionMenu(self.fo,self.vS,*self.scan,command=self.cbOption) self.oS.configure(width=7) self.oS.pack(side='left') # Label Einheit self.lS = tkinter.Label(self.fo, text='s') self.lS.pack(side='right') # Entry self.eA = tkinter.Entry(self.fo,width=5) self.eA.insert(0, '0.05') self.eA.pack(side='right') self.eA.bind('',self.cbEntry) # Label Bitzeit self.lB = tkinter.Label(self.fo, text='Bitzeit:') self.lB.pack(side='right') # Ende Frame oben ----------------------------------------- # Frame unten --------------------------------------------- self.fu = tkinter.Frame(self) self.fu.pack(side='bottom',fill='x',padx=10,ipady=10) # Buttons self.bStart = tkinter.Button(self.fu, text="start", command=self.cbHilf) self.bStart.pack(side='left') self.bStop = tkinter.Button(self.fu, text="stop", command=self.cbStop) self.bStop.pack(side='left',padx=5) self.bClear = tkinter.Button(self.fu, text="clear", command=self.cbClear) self.bClear.pack(side='left') # Ende Frame unten ---------------------------------------- # Text self.tA = tkinter.Text(self) self.tA.delete('1.0',tkinter.END) self.tA.pack(padx=10) # Empfang starten self.cbEntry(None) # Polling starten self.after(0,self.poll) def poll(self): s = decodiert(self.cbGetNachricht()) # s = 'Test' # DEBUG self.tA.delete('1.0',tkinter.END) self.tA.insert(tkinter.END,s) self.after(20,self.poll) def cbOption(self, wert): # print(wert) # DEBUG self.cbSetPort(wert) def cbEntry(self,event): self.focus_force() # print('Return im Entry-Widget, String:',self.eA.get()) # DEBUG self.cbStop() try: bitzeit = float(self.eA.get()) self.cbStart(bitzeit) except: self.eA.delete(0,tkinter.END) self.eA.insert(0,'0.01') def cbHilf(self): self.cbEntry(None) def cbClear(self): self.cbClear() # controller class Controller(object): def __init__(self): self.model = Model() self.view = View(self.setPort,self.model.start,self.model.stop,self.Get,self.Clear,self.Halt) self.view.mainloop() def setPort(self,port): self.model.s.setPort(port) self.model.s.setRTS(0) def Get(self): return self.model.puffer def Clear(self): while len(self.model.puffer) > 0: self.model.puffer.pop() def Halt(self): # Aufraeumarbeiten self.model.stop() # Thread stoppen time.sleep(0.01) """ th = self.model.empfThread if (th != None) and th.is_alive(): # falls Thread noch lebt th.join() # auf Ende warten """ self.model.s.close() # Schnittstelle schliessen self.view.quit() # mainloop beenden self.view.destroy() # Fenster beseitigen # Hauptprogramm c = Controller()