]> mj.ucw.cz Git - home-hw.git/blob - test-modbus/py/test.py
Modbus: Tests with python-modbus
[home-hw.git] / test-modbus / py / test.py
1 #!/usr/bin/env python3
2 """
3 Pymodbus Synchronous Client Extended Examples
4 --------------------------------------------------------------------------
5
6 The following is an example of how to use the synchronous modbus client
7 implementation from pymodbus to perform the extended portions of the
8 modbus protocol.
9 """
10 # --------------------------------------------------------------------------- #
11 # import the various server implementations
12 # --------------------------------------------------------------------------- #
13 # from pymodbus.client.sync import ModbusTcpClient as ModbusClient
14 # from pymodbus.client.sync import ModbusUdpClient as ModbusClient
15 from pymodbus.client.sync import ModbusSerialClient as ModbusClient
16
17
18 # --------------------------------------------------------------------------- # 
19 # import the extended messages to perform
20 # --------------------------------------------------------------------------- #
21 from pymodbus.diag_message import *
22 from pymodbus.file_message import *
23 from pymodbus.other_message import *
24 from pymodbus.mei_message import *
25 from pymodbus.register_read_message import *
26
27 import time
28
29 # --------------------------------------------------------------------------- #
30 # configure the client logging
31 # --------------------------------------------------------------------------- #
32 import logging
33 FORMAT = ('%(asctime)-15s %(threadName)-15s '
34           '%(levelname)-8s %(module)-15s:%(lineno)-8s %(message)s')
35 logging.basicConfig(format=FORMAT)
36 log = logging.getLogger()
37 log.setLevel(logging.DEBUG)
38
39 UNIT = 42
40
41
42 def execute_extended_requests():
43     # ------------------------------------------------------------------------#
44     # choose the client you want
45     # ------------------------------------------------------------------------#
46     # make sure to start an implementation to hit against. For this
47     # you can use an existing device, the reference implementation in the tools
48     # directory, or start a pymodbus server.
49     #
50     # It should be noted that you can supply an ipv4 or an ipv6 host address
51     # for both the UDP and TCP clients.
52     # ------------------------------------------------------------------------# 
53     client = ModbusClient(method='rtu', port="/dev/ttyUSB0", baudrate=19200, parity='E')
54     # client = ModbusClient(method='ascii', port="/dev/ptyp0")
55     # client = ModbusClient(method='binary', port="/dev/ptyp0")
56     # client = ModbusClient('127.0.0.1', port=5020)
57     # from pymodbus.transaction import ModbusRtuFramer
58     # client = ModbusClient('127.0.0.1', port=5020, framer=ModbusRtuFramer)
59     client.connect()
60
61     #log.debug("Running ReadDeviceInformationRequest")
62     #rq = ReadHoldingRegistersRequest(0, 1, unit=UNIT)
63     #rr = client.execute(rq)
64     #log.debug(rr)
65
66     # ----------------------------------------------------------------------- #
67     # extra requests
68     # ----------------------------------------------------------------------- #
69     # If you are performing a request that is not available in the client
70     # mixin, you have to perform the request like this instead::
71     #
72     # from pymodbus.diag_message import ClearCountersRequest
73     # from pymodbus.diag_message import ClearCountersResponse
74     #
75     # request  = ClearCountersRequest()
76     # response = client.execute(request)
77     # if isinstance(response, ClearCountersResponse):
78     #     ... do something with the response
79     #
80     #
81     # What follows is a listing of all the supported methods. Feel free to
82     # comment, uncomment, or modify each result set to match with your ref.
83     # ----------------------------------------------------------------------- #
84
85     # ----------------------------------------------------------------------- #
86     # information requests
87     # ----------------------------------------------------------------------- #
88     log.debug("Running ReadDeviceInformationRequest")
89     rq = ReadDeviceInformationRequest(unit=UNIT)
90     rq.get_response_pdu_size = lambda: 100
91     rr = client.execute(rq)
92     log.debug(rr)
93     # assert(rr == None)              # not supported by reference
94     # assert (not rr.isError())  # test that we are not an error
95     # assert (rr.information[0] == b'Pymodbus')  # test the vendor name
96     # assert (rr.information[1] == b'PM')  # test the product code
97     # assert (rr.information[2] == b'1.0')  # test the code revision
98
99     # log.debug("Running ReportSlaveIdRequest")
100     # rq = ReportSlaveIdRequest(unit=UNIT)
101     # rr = client.execute(rq)
102     # log.debug(rr)
103     # # assert(rr == None)                        # not supported by reference
104     # # assert(not rr.isError())           # test that we are not an error
105     # # assert(rr.identifier  == 0x00)            # test the slave identifier
106     # # assert(rr.status  == 0x00)                # test that the status is ok
107
108     # log.debug("Running ReadExceptionStatusRequest")
109     # rq = ReadExceptionStatusRequest(unit=UNIT)
110     # rr = client.execute(rq)
111     # log.debug(rr)
112     # # assert(rr == None)                        # not supported by reference
113     # # assert(not rr.isError())           # test that we are not an error
114     # # assert(rr.status == 0x55)                 # test the status code
115
116     # log.debug("Running GetCommEventCounterRequest")
117     # rq = GetCommEventCounterRequest(unit=UNIT)
118     # rr = client.execute(rq)
119     # log.debug(rr)
120     # # assert(rr == None)                       # not supported by reference
121     # # assert(not rr.isError())          # test that we are not an error
122     # # assert(rr.status == True)                # test the status code
123     # # assert(rr.count == 0x00)                 # test the status code
124
125     # log.debug("Running GetCommEventLogRequest")
126     # rq = GetCommEventLogRequest(unit=UNIT)
127     # rr = client.execute(rq)
128     # log.debug(rr)
129     # # assert(rr == None)                       # not supported by reference
130     # # assert(not rr.isError())          # test that we are not an error
131     # # assert(rr.status == True)                # test the status code
132     # # assert(rr.event_count == 0x00)           # test the number of events
133     # # assert(rr.message_count == 0x00)         # test the number of messages
134     # # assert(len(rr.events) == 0x00)           # test the number of events
135
136     # # ------------------------------------------------------------------------#
137     # # diagnostic requests
138     # # ------------------------------------------------------------------------#
139     # log.debug("Running ReturnQueryDataRequest")
140     # rq = ReturnQueryDataRequest(unit=UNIT)
141     # rr = client.execute(rq)
142     # log.debug(rr)
143     # # assert(rr == None)                      # not supported by reference
144     # # assert(rr.message[0] == 0x0000)         # test the resulting message
145
146     # log.debug("Running RestartCommunicationsOptionRequest")
147     # rq = RestartCommunicationsOptionRequest(unit=UNIT)
148     # rr = client.execute(rq)
149     # log.debug(rr)
150     # # assert(rr == None)                     # not supported by reference
151     # # assert(rr.message == 0x0000)           # test the resulting message
152
153     # log.debug("Running ReturnDiagnosticRegisterRequest")
154     # rq = ReturnDiagnosticRegisterRequest(unit=UNIT)
155     # rr = client.execute(rq)
156     # log.debug(rr)
157     # # assert(rr == None)                     # not supported by reference
158
159     # log.debug("Running ChangeAsciiInputDelimiterRequest")
160     # rq = ChangeAsciiInputDelimiterRequest(unit=UNIT)
161     # rr = client.execute(rq)
162     # log.debug(rr)
163     # # assert(rr == None)                    # not supported by reference
164
165     # log.debug("Running ForceListenOnlyModeRequest")
166     # rq = ForceListenOnlyModeRequest(unit=UNIT)
167     # rr = client.execute(rq)  # does not send a response
168     # log.debug(rr)
169
170     # log.debug("Running ClearCountersRequest")
171     # rq = ClearCountersRequest()
172     # rr = client.execute(rq)
173     # log.debug(rr)
174     # # assert(rr == None)                   # not supported by reference
175
176     # log.debug("Running ReturnBusCommunicationErrorCountRequest")
177     # rq = ReturnBusCommunicationErrorCountRequest(unit=UNIT)
178     # rr = client.execute(rq)
179     # log.debug(rr)
180     # # assert(rr == None)                    # not supported by reference
181
182     # log.debug("Running ReturnBusExceptionErrorCountRequest")
183     # rq = ReturnBusExceptionErrorCountRequest(unit=UNIT)
184     # rr = client.execute(rq)
185     # log.debug(rr)
186     # # assert(rr == None)                   # not supported by reference
187
188     # log.debug("Running ReturnSlaveMessageCountRequest")
189     # rq = ReturnSlaveMessageCountRequest(unit=UNIT)
190     # rr = client.execute(rq)
191     # log.debug(rr)
192     # # assert(rr == None)                  # not supported by reference
193
194     # log.debug("Running ReturnSlaveNoResponseCountRequest")
195     # rq = ReturnSlaveNoResponseCountRequest(unit=UNIT)
196     # rr = client.execute(rq)
197     # log.debug(rr)
198     # # assert(rr == None)                  # not supported by reference
199
200     # log.debug("Running ReturnSlaveNAKCountRequest")
201     # rq = ReturnSlaveNAKCountRequest(unit=UNIT)
202     # rr = client.execute(rq)
203     # log.debug(rr)
204     # # assert(rr == None)               # not supported by reference
205
206     # log.debug("Running ReturnSlaveBusyCountRequest")
207     # rq = ReturnSlaveBusyCountRequest(unit=UNIT)
208     # rr = client.execute(rq)
209     # log.debug(rr)
210     # # assert(rr == None)               # not supported by reference
211
212     # log.debug("Running ReturnSlaveBusCharacterOverrunCountRequest")
213     # rq = ReturnSlaveBusCharacterOverrunCountRequest(unit=UNIT)
214     # rr = client.execute(rq)
215     # log.debug(rr)
216     # # assert(rr == None)               # not supported by reference
217
218     # log.debug("Running ReturnIopOverrunCountRequest")
219     # rq = ReturnIopOverrunCountRequest(unit=UNIT)
220     # rr = client.execute(rq)
221     # log.debug(rr)
222     # # assert(rr == None)               # not supported by reference
223
224     # log.debug("Running ClearOverrunCountRequest")
225     # rq = ClearOverrunCountRequest(unit=UNIT)
226     # rr = client.execute(rq)
227     # log.debug(rr)
228     # # assert(rr == None)               # not supported by reference
229
230     # log.debug("Running GetClearModbusPlusRequest")
231     # rq = GetClearModbusPlusRequest(unit=UNIT)
232     # rr = client.execute(rq)
233     # log.debug(rr)
234     # # assert(rr == None)               # not supported by reference
235
236     # ------------------------------------------------------------------------#
237     # close the client
238     # ------------------------------------------------------------------------#
239     client.close()
240
241
242 if __name__ == "__main__":
243     execute_extended_requests()