redpitaya_scpi.py 3.70 KiB
"""SCPI access to Red Pitaya."""
import socket
__author__ = "Luka Golinar, Iztok Jeras"
__copyright__ = "Copyright 2015, Red Pitaya"
class scpi (object):
"""SCPI class used to access Red Pitaya over an IP network."""
delimiter = '\r\n'
def __init__(self, host, timeout=None, port=5000):
"""Initialize object and open IP connection.
Host IP should be a string in parentheses, like '192.168.1.100'.
"""
self.host = host
self.port = port
self.timeout = timeout
try:
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if timeout is not None:
self._socket.settimeout(timeout)
self._socket.connect((host, port))
except socket.error as e:
print('SCPI >> connect({:s}:{:d}) failed: {:s}'.format(host, port, e))
def __del__(self):
if self._socket is not None:
self._socket.close()
self._socket = None
def close(self):
"""Close IP connection."""
self.__del__()
def rx_txt(self, chunksize = 4096):
"""Receive text string and return it after removing the delimiter."""
msg = ''
while 1:
chunk = self._socket.recv(chunksize + len(self.delimiter)).decode('utf-8') # Receive chunk size of 2^n preferably
msg += chunk
if (len(chunk) and chunk[-2:] == self.delimiter):
break
return msg[:-2]
def rx_arb(self):
numOfBytes = 0
""" Recieve binary data from scpi server"""
buf=''
while (len(buf) != 1):
buf = str(self._socket.recv(1))
if not (buf == b'#'):
return False
buf=''
while (len(buf) != 1):
buf = str(self._socket.recv(1))
numOfNumBytes = int(buf)
if not (numOfNumBytes > 0):
return False
buf=''
while (len(buf) != numOfNumBytes):
buf += str(self._socket.recv(1))
numOfBytes = int(buf)
buf=''
while (len(buf) != numOfBytes):
buf += str(self._socket.recv(1))
return buf
def tx_txt(self, msg):
"""Send text string ending and append delimiter."""
return self._socket.send((msg + self.delimiter).encode('utf-8'))
def txrx_txt(self, msg):
"""Send/receive text string."""
self.tx_txt(msg)
return self.rx_txt()
# IEEE Mandated Commands
def cls(self):
"""Clear Status Command"""
return self.tx_txt('*CLS')
def ese(self, value: int):
"""Standard Event Status Enable Command"""
return self.tx_txt('*ESE {}'.format(value))
def ese_q(self):
"""Standard Event Status Enable Query"""
return self.txrx_txt('*ESE?')
def esr_q(self):
"""Standard Event Status Register Query"""
return self.txrx_txt('*ESR?')
def idn_q(self):
"""Identification Query"""
return self.txrx_txt('*IDN?')
def opc(self):
"""Operation Complete Command"""
return self.tx_txt('*OPC')
def opc_q(self):
"""Operation Complete Query"""
return self.txrx_txt('*OPC?')
def rst(self):
"""Reset Command"""
return self.tx_txt('*RST')
def sre(self):
"""Service Request Enable Command"""
return self.tx_txt('*SRE')
def sre_q(self):
"""Service Request Enable Query"""
return self.txrx_txt('*SRE?')
def stb_q(self):
"""Read Status Byte Query"""
return self.txrx_txt('*STB?')
# :SYSTem
def err_c(self):
"""Error count."""
return rp.txrx_txt('SYST:ERR:COUN?')
def err_c(self):
"""Error next."""
return rp.txrx_txt('SYST:ERR:NEXT?')