]> mj.ucw.cz Git - home-hw.git/commitdiff
Modbus: Tests with python-modbus
authorMartin Mares <mj@ucw.cz>
Wed, 10 Jul 2019 08:17:08 +0000 (10:17 +0200)
committerMartin Mares <mj@ucw.cz>
Wed, 10 Jul 2019 08:17:08 +0000 (10:17 +0200)
test-modbus/py/test.py [new file with mode: 0755]

diff --git a/test-modbus/py/test.py b/test-modbus/py/test.py
new file mode 100755 (executable)
index 0000000..85faa0c
--- /dev/null
@@ -0,0 +1,243 @@
+#!/usr/bin/env python3
+"""
+Pymodbus Synchronous Client Extended Examples
+--------------------------------------------------------------------------
+
+The following is an example of how to use the synchronous modbus client
+implementation from pymodbus to perform the extended portions of the
+modbus protocol.
+"""
+# --------------------------------------------------------------------------- #
+# import the various server implementations
+# --------------------------------------------------------------------------- #
+# from pymodbus.client.sync import ModbusTcpClient as ModbusClient
+# from pymodbus.client.sync import ModbusUdpClient as ModbusClient
+from pymodbus.client.sync import ModbusSerialClient as ModbusClient
+
+
+# --------------------------------------------------------------------------- # 
+# import the extended messages to perform
+# --------------------------------------------------------------------------- #
+from pymodbus.diag_message import *
+from pymodbus.file_message import *
+from pymodbus.other_message import *
+from pymodbus.mei_message import *
+from pymodbus.register_read_message import *
+
+import time
+
+# --------------------------------------------------------------------------- #
+# configure the client logging
+# --------------------------------------------------------------------------- #
+import logging
+FORMAT = ('%(asctime)-15s %(threadName)-15s '
+          '%(levelname)-8s %(module)-15s:%(lineno)-8s %(message)s')
+logging.basicConfig(format=FORMAT)
+log = logging.getLogger()
+log.setLevel(logging.DEBUG)
+
+UNIT = 42
+
+
+def execute_extended_requests():
+    # ------------------------------------------------------------------------#
+    # choose the client you want
+    # ------------------------------------------------------------------------#
+    # make sure to start an implementation to hit against. For this
+    # you can use an existing device, the reference implementation in the tools
+    # directory, or start a pymodbus server.
+    #
+    # It should be noted that you can supply an ipv4 or an ipv6 host address
+    # for both the UDP and TCP clients.
+    # ------------------------------------------------------------------------# 
+    client = ModbusClient(method='rtu', port="/dev/ttyUSB0", baudrate=19200, parity='E')
+    # client = ModbusClient(method='ascii', port="/dev/ptyp0")
+    # client = ModbusClient(method='binary', port="/dev/ptyp0")
+    # client = ModbusClient('127.0.0.1', port=5020)
+    # from pymodbus.transaction import ModbusRtuFramer
+    # client = ModbusClient('127.0.0.1', port=5020, framer=ModbusRtuFramer)
+    client.connect()
+
+    #log.debug("Running ReadDeviceInformationRequest")
+    #rq = ReadHoldingRegistersRequest(0, 1, unit=UNIT)
+    #rr = client.execute(rq)
+    #log.debug(rr)
+
+    # ----------------------------------------------------------------------- #
+    # extra requests
+    # ----------------------------------------------------------------------- #
+    # If you are performing a request that is not available in the client
+    # mixin, you have to perform the request like this instead::
+    #
+    # from pymodbus.diag_message import ClearCountersRequest
+    # from pymodbus.diag_message import ClearCountersResponse
+    #
+    # request  = ClearCountersRequest()
+    # response = client.execute(request)
+    # if isinstance(response, ClearCountersResponse):
+    #     ... do something with the response
+    #
+    #
+    # What follows is a listing of all the supported methods. Feel free to
+    # comment, uncomment, or modify each result set to match with your ref.
+    # ----------------------------------------------------------------------- #
+
+    # ----------------------------------------------------------------------- #
+    # information requests
+    # ----------------------------------------------------------------------- #
+    log.debug("Running ReadDeviceInformationRequest")
+    rq = ReadDeviceInformationRequest(unit=UNIT)
+    rq.get_response_pdu_size = lambda: 100
+    rr = client.execute(rq)
+    log.debug(rr)
+    # assert(rr == None)              # not supported by reference
+    # assert (not rr.isError())  # test that we are not an error
+    # assert (rr.information[0] == b'Pymodbus')  # test the vendor name
+    # assert (rr.information[1] == b'PM')  # test the product code
+    # assert (rr.information[2] == b'1.0')  # test the code revision
+
+    # log.debug("Running ReportSlaveIdRequest")
+    # rq = ReportSlaveIdRequest(unit=UNIT)
+    # rr = client.execute(rq)
+    # log.debug(rr)
+    # # assert(rr == None)                        # not supported by reference
+    # # assert(not rr.isError())           # test that we are not an error
+    # # assert(rr.identifier  == 0x00)            # test the slave identifier
+    # # assert(rr.status  == 0x00)                # test that the status is ok
+
+    # log.debug("Running ReadExceptionStatusRequest")
+    # rq = ReadExceptionStatusRequest(unit=UNIT)
+    # rr = client.execute(rq)
+    # log.debug(rr)
+    # # assert(rr == None)                        # not supported by reference
+    # # assert(not rr.isError())           # test that we are not an error
+    # # assert(rr.status == 0x55)                 # test the status code
+
+    # log.debug("Running GetCommEventCounterRequest")
+    # rq = GetCommEventCounterRequest(unit=UNIT)
+    # rr = client.execute(rq)
+    # log.debug(rr)
+    # # assert(rr == None)                       # not supported by reference
+    # # assert(not rr.isError())          # test that we are not an error
+    # # assert(rr.status == True)                # test the status code
+    # # assert(rr.count == 0x00)                 # test the status code
+
+    # log.debug("Running GetCommEventLogRequest")
+    # rq = GetCommEventLogRequest(unit=UNIT)
+    # rr = client.execute(rq)
+    # log.debug(rr)
+    # # assert(rr == None)                       # not supported by reference
+    # # assert(not rr.isError())          # test that we are not an error
+    # # assert(rr.status == True)                # test the status code
+    # # assert(rr.event_count == 0x00)           # test the number of events
+    # # assert(rr.message_count == 0x00)         # test the number of messages
+    # # assert(len(rr.events) == 0x00)           # test the number of events
+
+    # # ------------------------------------------------------------------------#
+    # # diagnostic requests
+    # # ------------------------------------------------------------------------#
+    # log.debug("Running ReturnQueryDataRequest")
+    # rq = ReturnQueryDataRequest(unit=UNIT)
+    # rr = client.execute(rq)
+    # log.debug(rr)
+    # # assert(rr == None)                      # not supported by reference
+    # # assert(rr.message[0] == 0x0000)         # test the resulting message
+
+    # log.debug("Running RestartCommunicationsOptionRequest")
+    # rq = RestartCommunicationsOptionRequest(unit=UNIT)
+    # rr = client.execute(rq)
+    # log.debug(rr)
+    # # assert(rr == None)                     # not supported by reference
+    # # assert(rr.message == 0x0000)           # test the resulting message
+
+    # log.debug("Running ReturnDiagnosticRegisterRequest")
+    # rq = ReturnDiagnosticRegisterRequest(unit=UNIT)
+    # rr = client.execute(rq)
+    # log.debug(rr)
+    # # assert(rr == None)                     # not supported by reference
+
+    # log.debug("Running ChangeAsciiInputDelimiterRequest")
+    # rq = ChangeAsciiInputDelimiterRequest(unit=UNIT)
+    # rr = client.execute(rq)
+    # log.debug(rr)
+    # # assert(rr == None)                    # not supported by reference
+
+    # log.debug("Running ForceListenOnlyModeRequest")
+    # rq = ForceListenOnlyModeRequest(unit=UNIT)
+    # rr = client.execute(rq)  # does not send a response
+    # log.debug(rr)
+
+    # log.debug("Running ClearCountersRequest")
+    # rq = ClearCountersRequest()
+    # rr = client.execute(rq)
+    # log.debug(rr)
+    # # assert(rr == None)                   # not supported by reference
+
+    # log.debug("Running ReturnBusCommunicationErrorCountRequest")
+    # rq = ReturnBusCommunicationErrorCountRequest(unit=UNIT)
+    # rr = client.execute(rq)
+    # log.debug(rr)
+    # # assert(rr == None)                    # not supported by reference
+
+    # log.debug("Running ReturnBusExceptionErrorCountRequest")
+    # rq = ReturnBusExceptionErrorCountRequest(unit=UNIT)
+    # rr = client.execute(rq)
+    # log.debug(rr)
+    # # assert(rr == None)                   # not supported by reference
+
+    # log.debug("Running ReturnSlaveMessageCountRequest")
+    # rq = ReturnSlaveMessageCountRequest(unit=UNIT)
+    # rr = client.execute(rq)
+    # log.debug(rr)
+    # # assert(rr == None)                  # not supported by reference
+
+    # log.debug("Running ReturnSlaveNoResponseCountRequest")
+    # rq = ReturnSlaveNoResponseCountRequest(unit=UNIT)
+    # rr = client.execute(rq)
+    # log.debug(rr)
+    # # assert(rr == None)                  # not supported by reference
+
+    # log.debug("Running ReturnSlaveNAKCountRequest")
+    # rq = ReturnSlaveNAKCountRequest(unit=UNIT)
+    # rr = client.execute(rq)
+    # log.debug(rr)
+    # # assert(rr == None)               # not supported by reference
+
+    # log.debug("Running ReturnSlaveBusyCountRequest")
+    # rq = ReturnSlaveBusyCountRequest(unit=UNIT)
+    # rr = client.execute(rq)
+    # log.debug(rr)
+    # # assert(rr == None)               # not supported by reference
+
+    # log.debug("Running ReturnSlaveBusCharacterOverrunCountRequest")
+    # rq = ReturnSlaveBusCharacterOverrunCountRequest(unit=UNIT)
+    # rr = client.execute(rq)
+    # log.debug(rr)
+    # # assert(rr == None)               # not supported by reference
+
+    # log.debug("Running ReturnIopOverrunCountRequest")
+    # rq = ReturnIopOverrunCountRequest(unit=UNIT)
+    # rr = client.execute(rq)
+    # log.debug(rr)
+    # # assert(rr == None)               # not supported by reference
+
+    # log.debug("Running ClearOverrunCountRequest")
+    # rq = ClearOverrunCountRequest(unit=UNIT)
+    # rr = client.execute(rq)
+    # log.debug(rr)
+    # # assert(rr == None)               # not supported by reference
+
+    # log.debug("Running GetClearModbusPlusRequest")
+    # rq = GetClearModbusPlusRequest(unit=UNIT)
+    # rr = client.execute(rq)
+    # log.debug(rr)
+    # # assert(rr == None)               # not supported by reference
+
+    # ------------------------------------------------------------------------#
+    # close the client
+    # ------------------------------------------------------------------------#
+    client.close()
+
+
+if __name__ == "__main__":
+    execute_extended_requests()