Source code for NovoptelTCP

# Downloaded from https://www.novoptel.de/Home/Downloads_de.php on 07.02.2025
# Copyright Novoptel GmbH

import socket
import math
from struct import unpack
import numpy as np
import time

[docs] class NovoptelTCP(): #Parameters ip = '127.0.0.1' port = 5025 s = None debug = False def __init__(self, ip='127.0.0.1', port=5025, debug=False): self.ip = ip self.port = port self.debug = debug self.connect()
[docs] def connect(self): if (self.s!=None): self.close() self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.s.settimeout(2) try: self.s.connect((self.ip, self.port)) #print("connected: ", self.s) except socket.error as e: print("Error during socket connect: %s" % e)
[docs] def close(self): self.s.close() self.s = None
[docs] def reconnect(self): print("Reconnecting!") self.s.close() time.sleep(0.5) self.connect()
[docs] def socket_write(self, data: bytes): rxok = False tries = 0 while (rxok==False) and (tries<10): try: self.s.send(data) time.sleep(0.001) # sleep 1 ms rxok = True except: tries = tries + 1 if tries>5: self.reconnect() return
[docs] def socket_read(self, data: bytes): rxok = False tries = 0 while (rxok==False) and (tries<10): self.socket_write(data) try: ans = self.s.recv(2) res = int.from_bytes(ans, byteorder='big') rxok = True except: res = 0 tries = tries + 1 if tries>5: self.reconnect() return res
[docs] def read(self, addr: int): cmd=[0x52] # 'R' cmd.append((addr>>8)&0xFF) cmd.append(addr&0xFF) res = self.socket_read(bytes(cmd)) return res
[docs] def write(self, addr: int, data: int): cmd=[0x57] # 'W' cmd.append((addr>>8)&0xFF) cmd.append(addr&0xFF) cmd.append((data>>8)&0xFF) cmd.append(data&0xFF) self.socket_write(bytes(cmd))
[docs] def readsdram_sendrequest(self, startaddrseq: int, packetsinthissequence: int, cycles: int): # set transfer parameters in one command cmdlist = [ [512 + 105, startaddrseq & 0xFFFF], [512 + 106, int((startaddrseq >> 16) + math.log2(cycles) * 2**12)], [512 + 107, round(packetsinthissequence/cycles)], [512 + 104, 39294]] #print(cmdlist) #print("cycles: ", cycles) cmd = [] for i in range(len(cmdlist)): cmd.append(0x57) # 'W' for x in range(2): cmd.append((cmdlist[i][x]>>8)&0xFF) cmd.append(cmdlist[i][x]&0xFF) #print(cmd) self.socket_write(bytes(cmd))
[docs] def readsdram_getpackets_raw(self, startaddrseq: int, packetsinthissequence: int, cycles: int): try: self.readsdram_sendrequest(startaddrseq, packetsinthissequence, cycles) except: return b'' #print("packetsinthissequence %d" % (packetsinthissequence)) debug = False rx_len = 0 rxbytes = b'' while (rx_len<packetsinthissequence): newbytes = b'' try: newbytes = self.s.recv(2**14) rxbytes += newbytes if debug: print("newbytes length: %d" % len(newbytes)) if newbytes==0: return b'' except: #print("newbytes length: %d" % len(newbytes)) #print("rxbytes length: %d" % len(rxbytes)) #input("Exception in self.s.recv!") ## test communication #print("ATE: %d" % self.read(512+1)) #self.readsdram_sendrequest(startaddrseq, packetsinthissequence, cycles) #rxbytes = b'' #debug = True return b'' rx_len = len(rxbytes) / 8 #print("rx_len %d" % (rx_len)) #print("data length: %d" % len(rxbytes)) #print(rxbytes) return rxbytes
[docs] def readsdram_getpackets(self, startaddrseq: int, packetsinthissequence: int, cycles: int): packetsreceived = 0 rxbytes = b'' while packetsreceived<packetsinthissequence: try: rxbytes = self.readsdram_getpackets_raw(startaddrseq, packetsinthissequence, cycles) #print("readsdram_getpackets_raw length: %d" % len(rxbytes)) packetsreceived = len(rxbytes) / 8 if packetsreceived==0: #input("readsdram_getpackets_raw returned 0 bytes!") self.reconnect() except: print("Exception in readsdram_getpackets_raw!") self.reconnect() try: data = unpack('>'+'H'*(len(rxbytes)//2),rxbytes) # bytes to uint16 except: print("data length: %d" % len(rxbytes)) print("Exception in readsdram_getpackets_raw!") arr = np.array(list(data)) rows = arr.shape[0] // 4 if rows * 4 != arr.shape[0]: rows -= 1 arr = arr[0:rows * 4].reshape(rows, 4) arr = arr[0:rows * 4].reshape(rows, 4) return arr
[docs] def readsdram_raw(self, startaddr: int, numaddr: int): buffersize_bytes = 2**14; cycles = max(1, min(32, math.ceil(numaddr*8/buffersize_bytes))) cycles = 2**math.ceil(math.log2(cycles)) numaddr = math.ceil(numaddr/cycles)*cycles buffersize_addr = int(round(buffersize_bytes/8*cycles)) packets_transferred = 0 res = np.empty((0, 4)) while (packets_transferred<numaddr): packetsinthissequence = min(buffersize_addr, numaddr-packets_transferred) startaddrseq = startaddr + packets_transferred rx = self.readsdram_getpackets(startaddrseq, packetsinthissequence, cycles) packets_transferred = packets_transferred+packetsinthissequence; res = np.concatenate((res, rx), axis=0) return res