aboutsummaryrefslogtreecommitdiffstats
path: root/tests/zmq/zmqtest.py
diff options
context:
space:
mode:
authorNao Pross <np@0hm.ch>2021-11-11 19:45:58 +0100
committerNao Pross <np@0hm.ch>2021-11-11 19:45:58 +0100
commitbf41ed4934742edf85e9973286b04f023f660c2f (patch)
tree0603ea6257033258f2efeaaa1a90f6961321a006 /tests/zmq/zmqtest.py
parentCreate QAM without gui (diff)
downloadFading-bf41ed4934742edf85e9973286b04f023f660c2f.tar.gz
Fading-bf41ed4934742edf85e9973286b04f023f660c2f.zip
Create ZMQ test
Diffstat (limited to '')
-rwxr-xr-xtests/zmq/zmqtest.py85
1 files changed, 85 insertions, 0 deletions
diff --git a/tests/zmq/zmqtest.py b/tests/zmq/zmqtest.py
new file mode 100755
index 0000000..a046c13
--- /dev/null
+++ b/tests/zmq/zmqtest.py
@@ -0,0 +1,85 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+#
+# SPDX-License-Identifier: GPL-3.0
+#
+# GNU Radio Python Flow Graph
+# Title: ZMQ test
+# Author: Naoki Pross
+# GNU Radio version: 3.8.2.0
+
+from gnuradio import blocks
+from gnuradio import gr
+from gnuradio.filter import firdes
+import sys
+import signal
+from argparse import ArgumentParser
+from gnuradio.eng_arg import eng_float, intx
+from gnuradio import eng_notation
+from gnuradio import zeromq
+
+
+class zmqtest(gr.top_block):
+
+ def __init__(self):
+ gr.top_block.__init__(self, "ZMQ test")
+
+ ##################################################
+ # Variables
+ ##################################################
+ self.samp_rate = samp_rate = 32000
+
+ ##################################################
+ # Blocks
+ ##################################################
+ self.zeromq_req_source_0 = zeromq.req_source(gr.sizeof_gr_complex, 1, '', 100, False, -1)
+ self.zeromq_rep_sink_0 = zeromq.rep_sink(gr.sizeof_gr_complex, 1, '', 100, False, -1)
+ self.blocks_throttle_0 = blocks.throttle(gr.sizeof_gr_complex*1, samp_rate,True)
+
+
+
+ ##################################################
+ # Connections
+ ##################################################
+ self.connect((self.blocks_throttle_0, 0), (self.zeromq_rep_sink_0, 0))
+ self.connect((self.zeromq_req_source_0, 0), (self.blocks_throttle_0, 0))
+
+
+ def get_samp_rate(self):
+ return self.samp_rate
+
+ def set_samp_rate(self, samp_rate):
+ self.samp_rate = samp_rate
+ self.blocks_throttle_0.set_sample_rate(self.samp_rate)
+
+
+
+
+
+def main(top_block_cls=zmqtest, options=None):
+ if gr.enable_realtime_scheduling() != gr.RT_OK:
+ print("Error: failed to enable real-time scheduling.")
+ tb = top_block_cls()
+
+ def sig_handler(sig=None, frame=None):
+ tb.stop()
+ tb.wait()
+
+ sys.exit(0)
+
+ signal.signal(signal.SIGINT, sig_handler)
+ signal.signal(signal.SIGTERM, sig_handler)
+
+ tb.start()
+
+ try:
+ input('Press Enter to quit: ')
+ except EOFError:
+ pass
+ tb.stop()
+ tb.wait()
+
+
+if __name__ == '__main__':
+ main()