1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
import select
import socket
from urllib.parse import urlparse
import numpy as np
from numpy_ringbuffer import RingBuffer
import dearpygui.dearpygui as dpg
class udpsource:
"""
Creates an UDP listening socket
"""
def __init__(self, url):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.url = urlparse(url)
def __del__(self):
self.sock.close()
def bind(self):
self.sock.setblocking(False)
self.sock.bind((self.url.hostname, self.url.port))
# self.sock.listen()
def read(self, nbytes):
ready_to_read, ready_to_write, in_err = \
select.select([self.sock], [], [], 1)
if ready_to_read:
data = sock.recv(nbytes)
print(data)
else:
return None
class network_plot(udpsource):
def __init__(self, url, nsamples, **kwargs):
udpsource.__init__(self, url)
self.nsamples = nsamples
self.plot = dpg.plot(**kwargs)
# create buffer and fill with zeroes
self.buffer = RingBuffer(capacity=nsamples, dtype=(float, 2))
for i in range(nsamples):
# TODO: remove random data used for testing
self.buffer.append(np.array([i, 1 + np.random.rand() / 5]))
self.bind()
def __enter__(self):
return self.plot.__enter__()
def __exit__(self, t, val, tb):
self.plot.__exit__(t, val, tb)
@property
def x_data(self):
return np.array(self.buffer[:,0])
@property
def y_data(self):
return np.array(self.buffer[:,1])
def refresh_series(self, tag):
dpg.set_value(tag, [self.x_data, self.y_data])
pass
|