diff options
author | Nao Pross <np@0hm.ch> | 2021-04-07 13:12:55 +0200 |
---|---|---|
committer | Nao Pross <np@0hm.ch> | 2021-04-07 13:12:55 +0200 |
commit | d87ceb11a43fba4a67ff3d6edcef404520718a7b (patch) | |
tree | 9b9f61db9f161698f1ca55a8d7b5253d07683797 /doc/usbtmc | |
parent | Test lua function mapping (diff) | |
download | testbench-ui-d87ceb11a43fba4a67ff3d6edcef404520718a7b.tar.gz testbench-ui-d87ceb11a43fba4a67ff3d6edcef404520718a7b.zip |
Add USBTMC doc
Diffstat (limited to '')
-rw-r--r-- | doc/usbtmc/USBTMC_1_00.pdf | bin | 0 -> 412058 bytes | |||
-rw-r--r-- | doc/usbtmc/USBTMC_usb488_subclass_1_00.pdf | bin | 0 -> 321908 bytes | |||
-rw-r--r-- | doc/usbtmc/open_close.pcapng | bin | 0 -> 1572 bytes | |||
-rw-r--r-- | doc/usbtmc/usbtmc.py | 997 |
4 files changed, 997 insertions, 0 deletions
diff --git a/doc/usbtmc/USBTMC_1_00.pdf b/doc/usbtmc/USBTMC_1_00.pdf Binary files differnew file mode 100644 index 0000000..201142e --- /dev/null +++ b/doc/usbtmc/USBTMC_1_00.pdf diff --git a/doc/usbtmc/USBTMC_usb488_subclass_1_00.pdf b/doc/usbtmc/USBTMC_usb488_subclass_1_00.pdf Binary files differnew file mode 100644 index 0000000..c1bd6fc --- /dev/null +++ b/doc/usbtmc/USBTMC_usb488_subclass_1_00.pdf diff --git a/doc/usbtmc/open_close.pcapng b/doc/usbtmc/open_close.pcapng Binary files differnew file mode 100644 index 0000000..d6ac13c --- /dev/null +++ b/doc/usbtmc/open_close.pcapng diff --git a/doc/usbtmc/usbtmc.py b/doc/usbtmc/usbtmc.py new file mode 100644 index 0000000..93b7d55 --- /dev/null +++ b/doc/usbtmc/usbtmc.py @@ -0,0 +1,997 @@ +""" + +Python USBTMC driver + +Copyright (c) 2012-2017 Alex Forencich + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. + +""" + +import usb.core +import usb.util +import struct +import time +import os +import re +import sys + +# constants +USBTMC_bInterfaceClass = 0xFE +USBTMC_bInterfaceSubClass = 3 +USBTMC_bInterfaceProtocol = 0 +USB488_bInterfaceProtocol = 1 + +USBTMC_MSGID_DEV_DEP_MSG_OUT = 1 +USBTMC_MSGID_REQUEST_DEV_DEP_MSG_IN = 2 +USBTMC_MSGID_DEV_DEP_MSG_IN = 2 +USBTMC_MSGID_VENDOR_SPECIFIC_OUT = 126 +USBTMC_MSGID_REQUEST_VENDOR_SPECIFIC_IN = 127 +USBTMC_MSGID_VENDOR_SPECIFIC_IN = 127 +USB488_MSGID_TRIGGER = 128 + +USBTMC_STATUS_SUCCESS = 0x01 +USBTMC_STATUS_PENDING = 0x02 +USBTMC_STATUS_FAILED = 0x80 +USBTMC_STATUS_TRANSFER_NOT_IN_PROGRESS = 0x81 +USBTMC_STATUS_SPLIT_NOT_IN_PROGRESS = 0x82 +USBTMC_STATUS_SPLIT_IN_PROGRESS = 0x83 +USB488_STATUS_INTERRUPT_IN_BUSY = 0x20 + +USBTMC_REQUEST_INITIATE_ABORT_BULK_OUT = 1 +USBTMC_REQUEST_CHECK_ABORT_BULK_OUT_STATUS = 2 +USBTMC_REQUEST_INITIATE_ABORT_BULK_IN = 3 +USBTMC_REQUEST_CHECK_ABORT_BULK_IN_STATUS = 4 +USBTMC_REQUEST_INITIATE_CLEAR = 5 +USBTMC_REQUEST_CHECK_CLEAR_STATUS = 6 +USBTMC_REQUEST_GET_CAPABILITIES = 7 +USBTMC_REQUEST_INDICATOR_PULSE = 64 + +USB488_READ_STATUS_BYTE = 128 +USB488_REN_CONTROL = 160 +USB488_GOTO_LOCAL = 161 +USB488_LOCAL_LOCKOUT = 162 + +USBTMC_HEADER_SIZE = 12 + +RIGOL_QUIRK_PIDS = [0x04ce, 0x0588] + + +def parse_visa_resource_string(resource_string): + # valid resource strings: + # USB::1234::5678::INSTR + # USB::1234::5678::SERIAL::INSTR + # USB0::0x1234::0x5678::INSTR + # USB0::0x1234::0x5678::SERIAL::INSTR + m = re.match('^(?P<prefix>(?P<type>USB)\d*)(::(?P<arg1>[^\s:]+))' + '(::(?P<arg2>[^\s:]+(\[.+\])?))(::(?P<arg3>[^\s:]+))?' + '(::(?P<suffix>INSTR))$', resource_string, re.I) + + if m is not None: + return dict( + type=m.group('type').upper(), + prefix=m.group('prefix'), + arg1=m.group('arg1'), + arg2=m.group('arg2'), + arg3=m.group('arg3'), + suffix=m.group('suffix') + ) + + +# Exceptions +class UsbtmcException(Exception): + em = {0: "No error"} + + def __init__(self, err=None, note=None): + self.err = err + self.note = note + self.msg = '' + + if err is None: + self.msg = note + else: + if type(err) is int: + if err in self.em: + self.msg = "%d: %s" % (err, self.em[err]) + else: + self.msg = "%d: Unknown error" % err + else: + self.msg = err + if note is not None: + self.msg = "%s [%s]" % (self.msg, note) + + def __str__(self): + return self.msg + + +def list_devices(): + "List all connected USBTMC devices" + + def is_usbtmc_device(dev): + for cfg in dev: + d = usb.util.find_descriptor(cfg, bInterfaceClass=USBTMC_bInterfaceClass, + bInterfaceSubClass=USBTMC_bInterfaceSubClass) + if d is not None: + return True + + if dev.idVendor == 0x1334: + # Advantest + return True + + if dev.idVendor == 0x0957: + # Agilent + if dev.idProduct in [0x2818, 0x4218, 0x4418]: + # Agilent U27xx modular devices in firmware update mode + # 0x2818 for U2701A/U2702A (firmware update mode on power up) + # 0x4218 for U2722A (firmware update mode on power up) + # 0x4418 for U2723A (firmware update mode on power up) + return True + + return False + + return list(usb.core.find(find_all=True, custom_match=is_usbtmc_device)) + + +def list_resources(): + "List resource strings for all connected USBTMC devices" + + res = [] + + for dev in list_devices(): + idVendor = dev.idVendor + idProduct = dev.idProduct + + # "fix" IDs for devices in firmware update mode + if idVendor == 0x0957 and idProduct == 0x2818: + # Agilent U2701A/U2702A firmware update mode + idProduct = 0x2918 + + if idVendor == 0x0957 and idProduct == 0x4218: + # Agilent U2722A firmware update mode + idProduct = 0x4118 + + if idVendor == 0x0957 and idProduct == 0x4418: + # Agilent U2723A firmware update mode + idProduct = 0x4318 + + # attempt to read serial number + iSerial = None + try: + iSerial = dev.serial_number + except: + pass + + # append formatted resource string to list + if iSerial is None: + res.append("USB::%d::%d::INSTR" % (idVendor, idProduct)) + else: + res.append("USB::%d::%d::%s::INSTR" % (idVendor, idProduct, iSerial)) + + return res + + +def find_device(idVendor=None, idProduct=None, iSerial=None): + "Find USBTMC instrument" + + devs = list_devices() + + if len(devs) == 0: + return None + + for dev in devs: + # match VID and PID + found = dev.idVendor == idVendor and dev.idProduct == idProduct + + if idVendor == 0x0957 and idProduct == 0x2918: + # Agilent U2701A/U2702A firmware update mode + if dev.idVendor == idVendor and dev.idProduct == 0x2818: + found = True + + if idVendor == 0x0957 and idProduct == 0x4118: + # Agilent U2722A firmware update mode + if dev.idVendor == idVendor and dev.idProduct == 0x4218: + found = True + + if idVendor == 0x0957 and idProduct == 0x4318: + # Agilent U2723A firmware update mode + if dev.idVendor == idVendor and dev.idProduct == 0x4418: + found = True + + if not found: + continue + + if iSerial is None: + return dev + else: + s = '' + + # try reading serial number + try: + s = dev.serial_number + except: + pass + + if iSerial == s: + return dev + + return None + + +class Instrument(object): + "USBTMC instrument interface client" + def __init__(self, *args, **kwargs): + "Create new USBTMC instrument object" + self.idVendor = 0 + self.idProduct = 0 + self.iSerial = None + self.device = None + self.cfg = None + self.iface = None + self.term_char = None + + self.bcdUSBTMC = 0 + self.support_pulse = False + self.support_talk_only = False + self.support_listen_only = False + self.support_term_char = False + + self.bcdUSB488 = 0 + self.support_USB4882 = False + self.support_remote_local = False + self.support_trigger = False + self.support_scpi = False + self.support_SR = False + self.support_RL = False + self.support_DT = False + + self.max_transfer_size = 1024*1024 + + self.timeout = 5.0 + + self.bulk_in_ep = None + self.bulk_out_ep = None + self.interrupt_in_ep = None + + self.last_btag = 0 + self.last_rstb_btag = 0 + + self.connected = False + self.reattach = [] + self.old_cfg = None + + # quirks + self.advantest_quirk = False + self.advantest_locked = False + + self.rigol_quirk = False + self.rigol_quirk_ieee_block = False + + resource = None + + # process arguments + if len(args) == 1: + if type(args[0]) == str: + resource = args[0] + else: + self.device = args[0] + if len(args) >= 2: + self.idVendor = args[0] + self.idProduct = args[1] + if len(args) >= 3: + self.iSerial = args[2] + + for op in kwargs: + val = kwargs[op] + if op == 'idVendor': + self.idVendor = val + elif op == 'idProduct': + self.idProduct = val + elif op == 'iSerial': + self.iSerial = val + elif op == 'device': + self.device = val + elif op == 'dev': + self.device = val + elif op == 'term_char': + self.term_char = val + elif op == 'resource': + resource = val + + if resource is not None: + res = parse_visa_resource_string(resource) + + if res is None: + raise UsbtmcException("Invalid resource string", 'init') + + if res['arg1'] is None and res['arg2'] is None: + raise UsbtmcException("Invalid resource string", 'init') + + self.idVendor = int(res['arg1'], 0) + self.idProduct = int(res['arg2'], 0) + self.iSerial = res['arg3'] + + # find device + if self.device is None: + if self.idVendor is None or self.idProduct is None: + raise UsbtmcException("No device specified", 'init') + else: + self.device = find_device(self.idVendor, self.idProduct, self.iSerial) + if self.device is None: + raise UsbtmcException("Device not found", 'init') + + def __del__(self): + if self.connected: + self.close() + + @property + def timeout(self): + return self._timeout + + @timeout.setter + def timeout(self, val): + self._timeout = val + self._timeout_ms = int(val * 1000) + + def open(self): + if self.connected: + return + + # initialize device + + if self.device.idVendor == 0x0957 and self.device.idProduct in [0x2818, 0x4218, 0x4418]: + # Agilent U27xx modular devices + # U2701A/U2702A, U2722A/U2723A + # These devices require a short initialization sequence, presumably + # to take them out of 'firmware update' mode after confirming + # that the firmware version is correct. This is required once + # on every power-on before the device can be used. + # Note that the device will reset and the product ID will change. + # U2701A/U2702A boot 0x2818, usbtmc 0x2918 + # U2722A boot 0x4218, usbtmc 0x4118 + # U2723A boot 0x4418, usbtmc 0x4318 + + serial = self.device.serial_number + + new_id = 0 + + if self.device.idProduct == 0x2818: + # U2701A/U2702A + new_id = 0x2918 + self.device.ctrl_transfer(bmRequestType=0xC0, bRequest=0x0C, wValue=0x0000, wIndex=0x047E, data_or_wLength=0x0001) + self.device.ctrl_transfer(bmRequestType=0xC0, bRequest=0x0C, wValue=0x0000, wIndex=0x047D, data_or_wLength=0x0006) + self.device.ctrl_transfer(bmRequestType=0xC0, bRequest=0x0C, wValue=0x0000, wIndex=0x0484, data_or_wLength=0x0005) + self.device.ctrl_transfer(bmRequestType=0xC0, bRequest=0x0C, wValue=0x0000, wIndex=0x0472, data_or_wLength=0x000C) + self.device.ctrl_transfer(bmRequestType=0xC0, bRequest=0x0C, wValue=0x0000, wIndex=0x047A, data_or_wLength=0x0001) + self.device.ctrl_transfer(bmRequestType=0x40, bRequest=0x0C, wValue=0x0000, wIndex=0x0475, data_or_wLength=b'\x00\x00\x01\x01\x00\x00\x08\x01') + + if self.device.idProduct in [0x4218, 0x4418]: + # U2722A/U2723A + if self.device.idProduct == 0x4218: + # U2722A + new_id = 0x4118 + elif self.device.idProduct == 0x4418: + # U2723A + new_id = 0x4318 + self.device.ctrl_transfer(bmRequestType=0xC0, bRequest=0x0C, wValue=0x0000, wIndex=0x047E, data_or_wLength=0x0001) + self.device.ctrl_transfer(bmRequestType=0xC0, bRequest=0x0C, wValue=0x0000, wIndex=0x047D, data_or_wLength=0x0006) + self.device.ctrl_transfer(bmRequestType=0xC0, bRequest=0x0C, wValue=0x0000, wIndex=0x0487, data_or_wLength=0x0005) + self.device.ctrl_transfer(bmRequestType=0xC0, bRequest=0x0C, wValue=0x0000, wIndex=0x0472, data_or_wLength=0x000C) + self.device.ctrl_transfer(bmRequestType=0xC0, bRequest=0x0C, wValue=0x0000, wIndex=0x047A, data_or_wLength=0x0001) + self.device.ctrl_transfer(bmRequestType=0x40, bRequest=0x0C, wValue=0x0000, wIndex=0x0475, data_or_wLength=b'\x00\x00\x01\x01\x00\x00\x08\x01') + + usb.util.dispose_resources(self.device) + self.device = None + + for i in range(40): + self.device = find_device(0x0957, new_id, serial) + if self.device is not None: + break + time.sleep(0.5) + + if self.device is None: + print("Agilent U27xx modular device initialization failed") + + # find first USBTMC interface + for cfg in self.device: + for iface in cfg: + if (iface.bInterfaceClass == USBTMC_bInterfaceClass and + iface.bInterfaceSubClass == USBTMC_bInterfaceSubClass): + # USBTMC device + self.cfg = cfg + self.iface = iface + break + elif (self.device.idVendor == 0x1334): + # Advantest + self.cfg = cfg + self.iface = iface + break + else: + continue + break + + if self.iface is None: + raise UsbtmcException("Not a USBTMC device", 'init') + + try: + self.old_cfg = self.device.get_active_configuration() + except usb.core.USBError: + # ignore exception if configuration is not set + pass + + if self.old_cfg is not None and self.old_cfg.bConfigurationValue == self.cfg.bConfigurationValue: + # already set to correct configuration + + # release kernel driver on USBTMC interface + self._release_kernel_driver(self.iface.bInterfaceNumber) + else: + # wrong configuration or configuration not set + + # release all kernel drivers + if self.old_cfg is not None: + for iface in self.old_cfg: + self._release_kernel_driver(iface.bInterfaceNumber) + + # set proper configuration + self.device.set_configuration(self.cfg) + + # claim interface + usb.util.claim_interface(self.device, self.iface) + + # don't need to set altsetting - USBTMC devices have 1 altsetting as per the spec + + # find endpoints + for ep in self.iface: + ep_dir = usb.util.endpoint_direction(ep.bEndpointAddress) + ep_type = usb.util.endpoint_type(ep.bmAttributes) + + if (ep_type == usb.util.ENDPOINT_TYPE_BULK): + if (ep_dir == usb.util.ENDPOINT_IN): + self.bulk_in_ep = ep + elif (ep_dir == usb.util.ENDPOINT_OUT): + self.bulk_out_ep = ep + elif (ep_type == usb.util.ENDPOINT_TYPE_INTR): + if (ep_dir == usb.util.ENDPOINT_IN): + self.interrupt_in_ep = ep + + if self.bulk_in_ep is None or self.bulk_out_ep is None: + raise UsbtmcException("Invalid endpoint configuration", 'init') + + # set quirk flags if necessary + if self.device.idVendor == 0x1334: + # Advantest/ADCMT devices have a very odd USBTMC implementation + # which requires max 63 byte reads and never signals EOI on read + self.max_transfer_size = 63 + self.advantest_quirk = True + + if self.device.idVendor == 0x1ab1 and self.device.idProduct in RIGOL_QUIRK_PIDS: + self.rigol_quirk = True + + if self.device.idProduct == 0x04ce: + self.rigol_quirk_ieee_block = True + + self.connected = True + + self.clear() + + self.get_capabilities() + + def close(self): + if not self.connected: + return + + usb.util.dispose_resources(self.device) + + try: + # reset configuration + if self.cfg.bConfigurationValue != self.old_cfg.bConfigurationValue: + self.device.set_configuration(self.old_cfg) + + # try to reattach kernel driver + for iface in self.reattach: + try: + self.device.attach_kernel_driver(iface) + except: + pass + except: + pass + + self.reattach = [] + + self.connected = False + + def is_usb488(self): + return self.iface.bInterfaceProtocol == USB488_bInterfaceProtocol + + def get_capabilities(self): + + if not self.connected: + self.open() + + b = self.device.ctrl_transfer( + usb.util.build_request_type(usb.util.CTRL_IN, usb.util.CTRL_TYPE_CLASS, usb.util.CTRL_RECIPIENT_INTERFACE), + USBTMC_REQUEST_GET_CAPABILITIES, + 0x0000, + self.iface.index, + 0x0018, + timeout=self._timeout_ms) + if (b[0] == USBTMC_STATUS_SUCCESS): + self.bcdUSBTMC = (b[3] << 8) + b[2] + self.support_pulse = b[4] & 4 != 0 + self.support_talk_only = b[4] & 2 != 0 + self.support_listen_only = b[4] & 1 != 0 + self.support_term_char = b[5] & 1 != 0 + + if self.is_usb488(): + self.bcdUSB488 = (b[13] << 8) + b[12] + self.support_USB4882 = b[4] & 4 != 0 + self.support_remote_local = b[4] & 2 != 0 + self.support_trigger = b[4] & 1 != 0 + self.support_scpi = b[4] & 8 != 0 + self.support_SR = b[4] & 4 != 0 + self.support_RL = b[4] & 2 != 0 + self.support_DT = b[4] & 1 != 0 + else: + raise UsbtmcException("Get capabilities failed", 'get_capabilities') + + def pulse(self): + """ + Send a pulse indicator request, this should blink a light + for 500-1000ms and then turn off again. (Only if supported) + """ + if not self.connected: + self.open() + + if self.support_pulse: + b = self.device.ctrl_transfer( + usb.util.build_request_type(usb.util.CTRL_IN, usb.util.CTRL_TYPE_CLASS, usb.util.CTRL_RECIPIENT_INTERFACE), + USBTMC_REQUEST_INDICATOR_PULSE, + 0x0000, + self.iface.index, + 0x0001, + timeout=self._timeout_ms) + if (b[0] != USBTMC_STATUS_SUCCESS): + raise UsbtmcException("Pulse failed", 'pulse') + + # message header management + def pack_bulk_out_header(self, msgid): + self.last_btag = btag = (self.last_btag % 255) + 1 + return struct.pack('BBBx', msgid, btag, ~btag & 0xFF) + + def pack_dev_dep_msg_out_header(self, transfer_size, eom = True): + hdr = self.pack_bulk_out_header(USBTMC_MSGID_DEV_DEP_MSG_OUT) + return hdr+struct.pack("<LBxxx", transfer_size, eom) + + def pack_dev_dep_msg_in_header(self, transfer_size, term_char = None): + hdr = self.pack_bulk_out_header(USBTMC_MSGID_DEV_DEP_MSG_IN) + transfer_attributes = 0 + if term_char is None: + term_char = 0 + else: + transfer_attributes = 2 + term_char = self.term_char + return hdr+struct.pack("<LBBxx", transfer_size, transfer_attributes, term_char) + + def pack_vendor_specific_out_header(self, transfer_size): + hdr = self.pack_bulk_out_header(USBTMC_MSGID_VENDOR_SPECIFIC_OUT) + return hdr+struct.pack("<Lxxxx", transfer_size) + + def pack_vendor_specific_in_header(self, transfer_size): + hdr = self.pack_bulk_out_header(USBTMC_MSGID_VENDOR_SPECIFIC_IN) + return hdr+struct.pack("<Lxxxx", transfer_size) + + def pack_usb488_trigger(self): + hdr = self.pack_bulk_out_header(USB488_MSGID_TRIGGER) + return hdr+b'\x00'*8 + + def unpack_bulk_in_header(self, data): + msgid, btag, btaginverse = struct.unpack_from('BBBx', data) + return (msgid, btag, btaginverse) + + def unpack_dev_dep_resp_header(self, data): + msgid, btag, btaginverse = self.unpack_bulk_in_header(data) + transfer_size, transfer_attributes = struct.unpack_from('<LBxxx', data, 4) + data = data[USBTMC_HEADER_SIZE:transfer_size+USBTMC_HEADER_SIZE] + return (msgid, btag, btaginverse, transfer_size, transfer_attributes, data) + + def write_raw(self, data): + "Write binary data to instrument" + + if not self.connected: + self.open() + + eom = False + + num = len(data) + + offset = 0 + + try: + while num > 0: + if num <= self.max_transfer_size: + eom = True + + block = data[offset:offset+self.max_transfer_size] + size = len(block) + + req = self.pack_dev_dep_msg_out_header(size, eom) + block + b'\0'*((4 - (size % 4)) % 4) + self.bulk_out_ep.write(req, timeout=self._timeout_ms) + + offset += size + num -= size + except usb.core.USBError: + exc = sys.exc_info()[1] + if exc.errno == 110: + # timeout, abort transfer + self._abort_bulk_out() + raise + + def read_raw(self, num=-1): + "Read binary data from instrument" + + if not self.connected: + self.open() + + read_len = self.max_transfer_size + if 0 < num < read_len: + read_len = num + + eom = False + + term_char = None + + if self.term_char is not None: + term_char = self.term_char + + read_data = b'' + + try: + while not eom: + if not self.rigol_quirk or read_data == b'': + + # if the rigol sees this again, it will restart the transfer + # so only send it the first time + + req = self.pack_dev_dep_msg_in_header(read_len, term_char) + self.bulk_out_ep.write(req, timeout=self._timeout_ms) + + resp = self.bulk_in_ep.read(read_len+USBTMC_HEADER_SIZE+3, timeout=self._timeout_ms) + + if sys.version_info >= (3, 2): + resp = resp.tobytes() + else: + resp = resp.tostring() + + if self.rigol_quirk and read_data: + pass # do nothing, the packet has no header if it isn't the first + else: + msgid, btag, btaginverse, transfer_size, transfer_attributes, data = self.unpack_dev_dep_resp_header(resp) + + + if self.rigol_quirk: + # rigol devices only send the header in the first packet, and they lie about whether the transaction is complete + if read_data: + read_data += resp + else: + if self.rigol_quirk_ieee_block and data.startswith(b"#"): + + # ieee block incoming, the transfer_size usbtmc header is lying about the transaction size + l = int(chr(data[1])) + n = int(data[2:l+2]) + + transfer_size = n + (l+2) # account for ieee header + + read_data += data + + if len(read_data) >= transfer_size: + read_data = read_data[:transfer_size] # as per usbtmc spec section 3.2 note 2 + eom = True + else: + eom = False + else: + eom = transfer_attributes & 1 + read_data += data + + # Advantest devices never signal EOI and may only send one read packet + if self.advantest_quirk: + break + + if num > 0: + num = num - len(data) + if num <= 0: + break + if num < read_len: + read_len = num + except usb.core.USBError: + exc = sys.exc_info()[1] + if exc.errno == 110: + # timeout, abort transfer + self._abort_bulk_in() + raise + + return read_data + + def ask_raw(self, data, num=-1): + "Write then read binary data" + # Advantest/ADCMT hardware won't respond to a command unless it's in Local Lockout mode + was_locked = self.advantest_locked + try: + if self.advantest_quirk and not was_locked: + self.lock() + self.write_raw(data) + return self.read_raw(num) + finally: + if self.advantest_quirk and not was_locked: + self.unlock() + + def write(self, message, encoding='utf-8'): + "Write string to instrument" + if type(message) is tuple or type(message) is list: + # recursive call for a list of commands + for message_i in message: + self.write(message_i, encoding) + return + + self.write_raw(str(message).encode(encoding)) + + def read(self, num=-1, encoding='utf-8'): + "Read string from instrument" + return self.read_raw(num).decode(encoding).rstrip('\r\n') + + def ask(self, message, num=-1, encoding='utf-8'): + "Write then read string" + if type(message) is tuple or type(message) is list: + # recursive call for a list of commands + val = list() + for message_i in message: + val.append(self.ask(message_i, num, encoding)) + return val + + # Advantest/ADCMT hardware won't respond to a command unless it's in Local Lockout mode + was_locked = self.advantest_locked + try: + if self.advantest_quirk and not was_locked: + self.lock() + self.write(message, encoding) + return self.read(num, encoding) + finally: + if self.advantest_quirk and not was_locked: + self.unlock() + + def read_stb(self): + "Read status byte" + + if not self.connected: + self.open() + + if self.is_usb488(): + rstb_btag = (self.last_rstb_btag % 128) + 1 + if rstb_btag < 2: + rstb_btag = 2 + self.last_rstb_btag = rstb_btag + + b = self.device.ctrl_transfer( + bmRequestType=usb.util.build_request_type(usb.util.CTRL_IN, usb.util.CTRL_TYPE_CLASS, usb.util.CTRL_RECIPIENT_INTERFACE), + bRequest=USB488_READ_STATUS_BYTE, + wValue=rstb_btag, + wIndex=self.iface.index, + data_or_wLength=0x0003, + timeout=self._timeout_ms + ) + if (b[0] == USBTMC_STATUS_SUCCESS): + # check btag + if rstb_btag != b[1]: + raise UsbtmcException("Read status byte btag mismatch", 'read_stb') + if self.interrupt_in_ep is None: + # no interrupt channel, value is here + return b[2] + else: + # read response from interrupt channel + resp = self.interrupt_in_ep.read(2, timeout=self._timeout_ms) + if resp[0] != rstb_btag + 128: + raise UsbtmcException("Read status byte btag mismatch", 'read_stb') + else: + return resp[1] + else: + raise UsbtmcException("Read status failed", 'read_stb') + else: + return int(self.ask("*STB?")) + + def trigger(self): + "Send trigger command" + + if not self.connected: + self.open() + + if self.support_trigger: + data = self.pack_usb488_trigger() + print(repr(data)) + self.bulk_out_ep.write(data, timeout=self._timeout_ms) + else: + self.write("*TRG") + + def clear(self): + "Send clear command" + + if not self.connected: + self.open() + + # Send INITIATE_CLEAR + b = self.device.ctrl_transfer( + bmRequestType=usb.util.build_request_type(usb.util.CTRL_IN, usb.util.CTRL_TYPE_CLASS, usb.util.CTRL_RECIPIENT_INTERFACE), + bRequest=USBTMC_REQUEST_INITIATE_CLEAR, + wValue=0x0000, + wIndex=self.iface.index, + data_or_wLength=0x0001, + timeout=self._timeout_ms + ) + if (b[0] == USBTMC_STATUS_SUCCESS): + # Initiate clear succeeded, wait for completion + while True: + # Check status + b = self.device.ctrl_transfer( + bmRequestType=usb.util.build_request_type(usb.util.CTRL_IN, usb.util.CTRL_TYPE_CLASS, usb.util.CTRL_RECIPIENT_INTERFACE), + bRequest=USBTMC_REQUEST_CHECK_CLEAR_STATUS, + wValue=0x0000, + wIndex=self.iface.index, + data_or_wLength=0x0002, + timeout=self._timeout_ms + ) + time.sleep(0.1) + if (b[0] != USBTMC_STATUS_PENDING): + break + # Clear halt condition + self.bulk_out_ep.clear_halt() + else: + raise UsbtmcException("Clear failed", 'clear') + + def _abort_bulk_out(self, btag=None): + "Abort bulk out" + + if not self.connected: + return + + if btag is None: + btag = self.last_btag + + # Send INITIATE_ABORT_BULK_OUT + b = self.device.ctrl_transfer( + bmRequestType=usb.util.build_request_type(usb.util.CTRL_IN, usb.util.CTRL_TYPE_CLASS, usb.util.CTRL_RECIPIENT_ENDPOINT), + bRequest=USBTMC_REQUEST_INITIATE_ABORT_BULK_OUT, + wValue=btag, + wIndex=self.bulk_out_ep.bEndpointAddress, + data_or_wLength=0x0002, + timeout=self._timeout_ms + ) + if (b[0] == USBTMC_STATUS_SUCCESS): + # Initiate abort bulk out succeeded, wait for completion + while True: + # Check status + b = self.device.ctrl_transfer( + bmRequestType=usb.util.build_request_type(usb.util.CTRL_IN, usb.util.CTRL_TYPE_CLASS, usb.util.CTRL_RECIPIENT_ENDPOINT), + bRequest=USBTMC_REQUEST_CHECK_ABORT_BULK_OUT_STATUS, + wValue=0x0000, + wIndex=self.bulk_out_ep.bEndpointAddress, + data_or_wLength=0x0008, + timeout=self._timeout_ms + ) + time.sleep(0.1) + if (b[0] != USBTMC_STATUS_PENDING): + break + else: + # no transfer in progress; nothing to do + pass + + def _abort_bulk_in(self, btag=None): + "Abort bulk in" + + if not self.connected: + return + + if btag is None: + btag = self.last_btag + + # Send INITIATE_ABORT_BULK_IN + b = self.device.ctrl_transfer( + bmRequestType=usb.util.build_request_type(usb.util.CTRL_IN, usb.util.CTRL_TYPE_CLASS, usb.util.CTRL_RECIPIENT_ENDPOINT), + bRequest=USBTMC_REQUEST_INITIATE_ABORT_BULK_IN, + wValue=btag, + wIndex=self.bulk_in_ep.bEndpointAddress, + data_or_wLength=0x0002, + timeout=self._timeout_ms + ) + if (b[0] == USBTMC_STATUS_SUCCESS): + # Initiate abort bulk in succeeded, wait for completion + while True: + # Check status + b = self.device.ctrl_transfer( + bmRequestType=usb.util.build_request_type(usb.util.CTRL_IN, usb.util.CTRL_TYPE_CLASS, usb.util.CTRL_RECIPIENT_ENDPOINT), + bRequest=USBTMC_REQUEST_CHECK_ABORT_BULK_IN_STATUS, + wValue=0x0000, + wIndex=self.bulk_in_ep.bEndpointAddress, + data_or_wLength=0x0008, + timeout=self._timeout_ms + ) + time.sleep(0.1) + if (b[0] != USBTMC_STATUS_PENDING): + break + else: + # no transfer in progress; nothing to do + pass + + def remote(self): + "Send remote command" + raise NotImplementedError() + + def local(self): + "Send local command" + raise NotImplementedError() + + def lock(self): + "Send lock command" + + if not self.connected: + self.open() + + if self.advantest_quirk: + # This Advantest/ADCMT vendor-specific control command enables remote control and must be sent before any commands are exchanged + # (otherwise READ commands will only retrieve the latest measurement) + self.advantest_locked = True + self.device.ctrl_transfer(bmRequestType=0xA1, bRequest=0xA0, wValue=0x0001, wIndex=0x0000, data_or_wLength=1) + else: + raise NotImplementedError() + + def unlock(self): + "Send unlock command" + + if not self.connected: + self.open() + + if self.advantest_quirk: + # This Advantest/ADCMT vendor-specific control command enables remote control and must be sent before any commands are exchanged + # (otherwise READ commands will only retrieve the latest measurement) + self.advantest_locked = False + self.device.ctrl_transfer(bmRequestType=0xA1, bRequest=0xA0, wValue=0x0000, wIndex=0x0000, data_or_wLength=1) + else: + raise NotImplementedError() + + def advantest_read_myid(self): + + if not self.connected: + self.open() + + "Read MyID value from Advantest and ADCMT devices" + if self.advantest_quirk: + # This Advantest/ADCMT vendor-specific control command reads the "MyID" identifier + try: + return int(self.device.ctrl_transfer(bmRequestType=0xC1, bRequest=0xF5, wValue=0x0000, wIndex=0x0000, data_or_wLength=1)[0]) + except: + return None + else: + raise NotImplementedError() + + def _release_kernel_driver(self, interface_number): + if os.name == 'posix': + if self.device.is_kernel_driver_active(interface_number): + self.reattach.append(interface_number) + try: + self.device.detach_kernel_driver(interface_number) + except usb.core.USBError as e: + sys.exit( + "Could not detach kernel driver from interface({0}): {1}".format(interface_number, + str(e))) |