From 20e54b3cf198f17e05a3c7e8716138717447aa32 Mon Sep 17 00:00:00 2001 From: Nao Pross Date: Wed, 13 Oct 2021 18:12:08 +0200 Subject: Create QPKS simulation The current flow diagram does the following: - Generate a QPKS constellation from a random bit source. The constellation is parametrized by the 'Constellation Rect. Object' and the variables 'sps' (Samples Per Symbol) and 'excess_bw' (Excess Bandwidth). - Send the modulated signal through a AWGN channel. Noise channel taps are controlled with the 'taps' variable, the other paramters are given through the 'QT GUI Range' objects. - Display the constellation diagram as well as the time and frequency domain plots of the signal after the channel (receiver side). - Synchronize the clock using a polyphase clock sync with root raised cosine filters. The number of filters is stored in 'nfilts' and the filter themselves are in 'rcc_taps'. - Finally, display the constellation after the RCC filters. What is still missing: - Multipath fading taps in the channel model (for later) - Equalization on the receiver side. PROBLEM: The wiki tutorial targets GNU Radio 3.9, and uses 'Linear Equlizer' and 'Adaptive Algorithm' blocks which are not available in GNU Radio 3.8. We need to use 'CMA Equalizer' and 'LMS DD Equalizer' and figure out their parameters instead. Though this should not be hard. - Phase and Fine Frequency Correction. - Decode the message. --- tests/Simulation/QPKS/qpks.py | 460 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 460 insertions(+) create mode 100755 tests/Simulation/QPKS/qpks.py (limited to 'tests/Simulation/QPKS/qpks.py') diff --git a/tests/Simulation/QPKS/qpks.py b/tests/Simulation/QPKS/qpks.py new file mode 100755 index 0000000..4027eb3 --- /dev/null +++ b/tests/Simulation/QPKS/qpks.py @@ -0,0 +1,460 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +# +# SPDX-License-Identifier: GPL-3.0 +# +# GNU Radio Python Flow Graph +# Title: QPSK +# GNU Radio version: 3.8.2.0 + +from distutils.version import StrictVersion + +if __name__ == '__main__': + import ctypes + import sys + if sys.platform.startswith('linux'): + try: + x11 = ctypes.cdll.LoadLibrary('libX11.so') + x11.XInitThreads() + except: + print("Warning: failed to XInitThreads()") + +from PyQt5 import Qt +from gnuradio import qtgui +from gnuradio.filter import firdes +import sip +from gnuradio import blocks +import numpy +from gnuradio import channels +from gnuradio import digital +from gnuradio import gr +import sys +import signal +from argparse import ArgumentParser +from gnuradio.eng_arg import eng_float, intx +from gnuradio import eng_notation +from gnuradio.qtgui import Range, RangeWidget + +from gnuradio import qtgui + +class qpks(gr.top_block, Qt.QWidget): + + def __init__(self): + gr.top_block.__init__(self, "QPSK") + Qt.QWidget.__init__(self) + self.setWindowTitle("QPSK") + qtgui.util.check_set_qss() + try: + self.setWindowIcon(Qt.QIcon.fromTheme('gnuradio-grc')) + except: + pass + self.top_scroll_layout = Qt.QVBoxLayout() + self.setLayout(self.top_scroll_layout) + self.top_scroll = Qt.QScrollArea() + self.top_scroll.setFrameStyle(Qt.QFrame.NoFrame) + self.top_scroll_layout.addWidget(self.top_scroll) + self.top_scroll.setWidgetResizable(True) + self.top_widget = Qt.QWidget() + self.top_scroll.setWidget(self.top_widget) + self.top_layout = Qt.QVBoxLayout(self.top_widget) + self.top_grid_layout = Qt.QGridLayout() + self.top_layout.addLayout(self.top_grid_layout) + + self.settings = Qt.QSettings("GNU Radio", "qpks") + + try: + if StrictVersion(Qt.qVersion()) < StrictVersion("5.0.0"): + self.restoreGeometry(self.settings.value("geometry").toByteArray()) + else: + self.restoreGeometry(self.settings.value("geometry")) + except: + pass + + ################################################## + # Variables + ################################################## + self.sps = sps = 4 + self.nfilts = nfilts = 32 + self.timing_loop_bw = timing_loop_bw = 2 * 3.141592653589793 / 100 + self.time_offset = time_offset = 1.0 + self.taps = taps = [1.0 + 0.0j, ] + self.samp_rate = samp_rate = 32000 + self.rrc_taps = rrc_taps = firdes.root_raised_cosine(nfilts, nfilts, 1.0/float(sps), 0.35, 45*nfilts) + self.qpsk_const = qpsk_const = digital.constellation_rect([0.707+0.707j, -0.707+0.707j, -0.707-0.707j, 0.707-0.707j], [0, 1, 3, 2], + 4, 2, 2, 1, 1).base() + self.noise_volt = noise_volt = 0.0001 + self.freq_offset = freq_offset = 0 + self.excess_bw = excess_bw = 350e-3 + + ################################################## + # Blocks + ################################################## + self._timing_loop_bw_range = Range(0, 200e-3, 10e-3, 2 * 3.141592653589793 / 100, 200) + self._timing_loop_bw_win = RangeWidget(self._timing_loop_bw_range, self.set_timing_loop_bw, 'Time: BW', "counter_slider", float) + self.top_grid_layout.addWidget(self._timing_loop_bw_win, 1, 1, 1, 1) + for r in range(1, 2): + self.top_grid_layout.setRowStretch(r, 1) + for c in range(1, 2): + self.top_grid_layout.setColumnStretch(c, 1) + self._time_offset_range = Range(0.999, 1.001, 0.0001, 1.0, 200) + self._time_offset_win = RangeWidget(self._time_offset_range, self.set_time_offset, 'Timing Offset', "counter_slider", float) + self.top_grid_layout.addWidget(self._time_offset_win, 0, 1, 1, 1) + for r in range(0, 1): + self.top_grid_layout.setRowStretch(r, 1) + for c in range(1, 2): + self.top_grid_layout.setColumnStretch(c, 1) + self._noise_volt_range = Range(0, 1, 0.01, 0.0001, 200) + self._noise_volt_win = RangeWidget(self._noise_volt_range, self.set_noise_volt, 'Noise Voltage', "counter_slider", float) + self.top_grid_layout.addWidget(self._noise_volt_win, 0, 0, 1, 1) + for r in range(0, 1): + self.top_grid_layout.setRowStretch(r, 1) + for c in range(0, 1): + self.top_grid_layout.setColumnStretch(c, 1) + self._freq_offset_range = Range(-100e-3, 100e-3, 1e-3, 0, 200) + self._freq_offset_win = RangeWidget(self._freq_offset_range, self.set_freq_offset, 'Frequency Offset', "counter_slider", float) + self.top_grid_layout.addWidget(self._freq_offset_win, 1, 0, 1, 1) + for r in range(1, 2): + self.top_grid_layout.setRowStretch(r, 1) + for c in range(0, 1): + self.top_grid_layout.setColumnStretch(c, 1) + self.qtgui_time_sink_x_0 = qtgui.time_sink_c( + 1024, #size + samp_rate, #samp_rate + "Channel Input Spectrum", #name + 1 #number of inputs + ) + self.qtgui_time_sink_x_0.set_update_time(0.10) + self.qtgui_time_sink_x_0.set_y_axis(-1, 1) + + self.qtgui_time_sink_x_0.set_y_label('Amplitude', "") + + self.qtgui_time_sink_x_0.enable_tags(True) + self.qtgui_time_sink_x_0.set_trigger_mode(qtgui.TRIG_MODE_FREE, qtgui.TRIG_SLOPE_POS, 0.0, 0, 0, "") + self.qtgui_time_sink_x_0.enable_autoscale(False) + self.qtgui_time_sink_x_0.enable_grid(False) + self.qtgui_time_sink_x_0.enable_axis_labels(True) + self.qtgui_time_sink_x_0.enable_control_panel(False) + self.qtgui_time_sink_x_0.enable_stem_plot(False) + + + labels = ['Signal 1', 'Signal 2', 'Signal 3', 'Signal 4', 'Signal 5', + 'Signal 6', 'Signal 7', 'Signal 8', 'Signal 9', 'Signal 10'] + widths = [1, 1, 1, 1, 1, + 1, 1, 1, 1, 1] + colors = ['blue', 'red', 'green', 'black', 'cyan', + 'magenta', 'yellow', 'dark red', 'dark green', 'dark blue'] + alphas = [1.0, 1.0, 1.0, 1.0, 1.0, + 1.0, 1.0, 1.0, 1.0, 1.0] + styles = [1, 1, 1, 1, 1, + 1, 1, 1, 1, 1] + markers = [-1, -1, -1, -1, -1, + -1, -1, -1, -1, -1] + + + for i in range(2): + if len(labels[i]) == 0: + if (i % 2 == 0): + self.qtgui_time_sink_x_0.set_line_label(i, "Re{{Data {0}}}".format(i/2)) + else: + self.qtgui_time_sink_x_0.set_line_label(i, "Im{{Data {0}}}".format(i/2)) + else: + self.qtgui_time_sink_x_0.set_line_label(i, labels[i]) + self.qtgui_time_sink_x_0.set_line_width(i, widths[i]) + self.qtgui_time_sink_x_0.set_line_color(i, colors[i]) + self.qtgui_time_sink_x_0.set_line_style(i, styles[i]) + self.qtgui_time_sink_x_0.set_line_marker(i, markers[i]) + self.qtgui_time_sink_x_0.set_line_alpha(i, alphas[i]) + + self._qtgui_time_sink_x_0_win = sip.wrapinstance(self.qtgui_time_sink_x_0.pyqwidget(), Qt.QWidget) + self.top_grid_layout.addWidget(self._qtgui_time_sink_x_0_win, 2, 0, 1, 1) + for r in range(2, 3): + self.top_grid_layout.setRowStretch(r, 1) + for c in range(0, 1): + self.top_grid_layout.setColumnStretch(c, 1) + self.qtgui_freq_sink_x_0 = qtgui.freq_sink_c( + 1024, #size + firdes.WIN_BLACKMAN_hARRIS, #wintype + 0, #fc + samp_rate, #bw + "Channel Input", #name + 1 + ) + self.qtgui_freq_sink_x_0.set_update_time(0.10) + self.qtgui_freq_sink_x_0.set_y_axis(-140, 10) + self.qtgui_freq_sink_x_0.set_y_label('Relative Gain', 'dB') + self.qtgui_freq_sink_x_0.set_trigger_mode(qtgui.TRIG_MODE_FREE, 0.0, 0, "") + self.qtgui_freq_sink_x_0.enable_autoscale(False) + self.qtgui_freq_sink_x_0.enable_grid(False) + self.qtgui_freq_sink_x_0.set_fft_average(1.0) + self.qtgui_freq_sink_x_0.enable_axis_labels(True) + self.qtgui_freq_sink_x_0.enable_control_panel(False) + + + + labels = ['', '', '', '', '', + '', '', '', '', ''] + widths = [1, 1, 1, 1, 1, + 1, 1, 1, 1, 1] + colors = ["blue", "red", "green", "black", "cyan", + "magenta", "yellow", "dark red", "dark green", "dark blue"] + alphas = [1.0, 1.0, 1.0, 1.0, 1.0, + 1.0, 1.0, 1.0, 1.0, 1.0] + + for i in range(1): + if len(labels[i]) == 0: + self.qtgui_freq_sink_x_0.set_line_label(i, "Data {0}".format(i)) + else: + self.qtgui_freq_sink_x_0.set_line_label(i, labels[i]) + self.qtgui_freq_sink_x_0.set_line_width(i, widths[i]) + self.qtgui_freq_sink_x_0.set_line_color(i, colors[i]) + self.qtgui_freq_sink_x_0.set_line_alpha(i, alphas[i]) + + self._qtgui_freq_sink_x_0_win = sip.wrapinstance(self.qtgui_freq_sink_x_0.pyqwidget(), Qt.QWidget) + self.top_grid_layout.addWidget(self._qtgui_freq_sink_x_0_win, 3, 0, 1, 1) + for r in range(3, 4): + self.top_grid_layout.setRowStretch(r, 1) + for c in range(0, 1): + self.top_grid_layout.setColumnStretch(c, 1) + self.qtgui_const_sink_x_0_0 = qtgui.const_sink_c( + 2048, #size + "After Clock Sync", #name + 1 #number of inputs + ) + self.qtgui_const_sink_x_0_0.set_update_time(0.10) + self.qtgui_const_sink_x_0_0.set_y_axis(-2, 2) + self.qtgui_const_sink_x_0_0.set_x_axis(-2, 2) + self.qtgui_const_sink_x_0_0.set_trigger_mode(qtgui.TRIG_MODE_FREE, qtgui.TRIG_SLOPE_POS, 0.0, 0, "") + self.qtgui_const_sink_x_0_0.enable_autoscale(False) + self.qtgui_const_sink_x_0_0.enable_grid(False) + self.qtgui_const_sink_x_0_0.enable_axis_labels(True) + + + labels = ['', '', '', '', '', + '', '', '', '', ''] + widths = [1, 1, 1, 1, 1, + 1, 1, 1, 1, 1] + colors = ["blue", "red", "red", "red", "red", + "red", "red", "red", "red", "red"] + styles = [0, 0, 0, 0, 0, + 0, 0, 0, 0, 0] + markers = [0, 0, 0, 0, 0, + 0, 0, 0, 0, 0] + alphas = [1.0, 1.0, 1.0, 1.0, 1.0, + 1.0, 1.0, 1.0, 1.0, 1.0] + + for i in range(1): + if len(labels[i]) == 0: + self.qtgui_const_sink_x_0_0.set_line_label(i, "Data {0}".format(i)) + else: + self.qtgui_const_sink_x_0_0.set_line_label(i, labels[i]) + self.qtgui_const_sink_x_0_0.set_line_width(i, widths[i]) + self.qtgui_const_sink_x_0_0.set_line_color(i, colors[i]) + self.qtgui_const_sink_x_0_0.set_line_style(i, styles[i]) + self.qtgui_const_sink_x_0_0.set_line_marker(i, markers[i]) + self.qtgui_const_sink_x_0_0.set_line_alpha(i, alphas[i]) + + self._qtgui_const_sink_x_0_0_win = sip.wrapinstance(self.qtgui_const_sink_x_0_0.pyqwidget(), Qt.QWidget) + self.top_grid_layout.addWidget(self._qtgui_const_sink_x_0_0_win, 3, 1, 1, 1) + for r in range(3, 4): + self.top_grid_layout.setRowStretch(r, 1) + for c in range(1, 2): + self.top_grid_layout.setColumnStretch(c, 1) + self.qtgui_const_sink_x_0 = qtgui.const_sink_c( + 2048, #size + "After Channel", #name + 1 #number of inputs + ) + self.qtgui_const_sink_x_0.set_update_time(0.10) + self.qtgui_const_sink_x_0.set_y_axis(-2, 2) + self.qtgui_const_sink_x_0.set_x_axis(-2, 2) + self.qtgui_const_sink_x_0.set_trigger_mode(qtgui.TRIG_MODE_FREE, qtgui.TRIG_SLOPE_POS, 0.0, 0, "") + self.qtgui_const_sink_x_0.enable_autoscale(False) + self.qtgui_const_sink_x_0.enable_grid(False) + self.qtgui_const_sink_x_0.enable_axis_labels(True) + + + labels = ['', '', '', '', '', + '', '', '', '', ''] + widths = [1, 1, 1, 1, 1, + 1, 1, 1, 1, 1] + colors = ["blue", "red", "red", "red", "red", + "red", "red", "red", "red", "red"] + styles = [0, 0, 0, 0, 0, + 0, 0, 0, 0, 0] + markers = [0, 0, 0, 0, 0, + 0, 0, 0, 0, 0] + alphas = [1.0, 1.0, 1.0, 1.0, 1.0, + 1.0, 1.0, 1.0, 1.0, 1.0] + + for i in range(1): + if len(labels[i]) == 0: + self.qtgui_const_sink_x_0.set_line_label(i, "Data {0}".format(i)) + else: + self.qtgui_const_sink_x_0.set_line_label(i, labels[i]) + self.qtgui_const_sink_x_0.set_line_width(i, widths[i]) + self.qtgui_const_sink_x_0.set_line_color(i, colors[i]) + self.qtgui_const_sink_x_0.set_line_style(i, styles[i]) + self.qtgui_const_sink_x_0.set_line_marker(i, markers[i]) + self.qtgui_const_sink_x_0.set_line_alpha(i, alphas[i]) + + self._qtgui_const_sink_x_0_win = sip.wrapinstance(self.qtgui_const_sink_x_0.pyqwidget(), Qt.QWidget) + self.top_grid_layout.addWidget(self._qtgui_const_sink_x_0_win, 2, 1, 1, 1) + for r in range(2, 3): + self.top_grid_layout.setRowStretch(r, 1) + for c in range(1, 2): + self.top_grid_layout.setColumnStretch(c, 1) + self.digital_pfb_clock_sync_xxx_0 = digital.pfb_clock_sync_ccf(sps * 1.001, timing_loop_bw, rrc_taps, nfilts, nfilts/2, 1.5, 1) + self.digital_constellation_modulator_0 = digital.generic_mod( + constellation=qpsk_const, + differential=True, + samples_per_symbol=sps, + pre_diff_code=True, + excess_bw=excess_bw, + verbose=False, + log=False) + self.channels_channel_model_0 = channels.channel_model( + noise_voltage=noise_volt, + frequency_offset=freq_offset, + epsilon=time_offset, + taps=taps, + noise_seed=0, + block_tags=False) + self.blocks_throttle_0 = blocks.throttle(gr.sizeof_gr_complex*1, samp_rate,True) + self.analog_random_source_x_0 = blocks.vector_source_b(list(map(int, numpy.random.randint(0, 2, 1000))), True) + + + + ################################################## + # Connections + ################################################## + self.connect((self.analog_random_source_x_0, 0), (self.digital_constellation_modulator_0, 0)) + self.connect((self.blocks_throttle_0, 0), (self.channels_channel_model_0, 0)) + self.connect((self.channels_channel_model_0, 0), (self.digital_pfb_clock_sync_xxx_0, 0)) + self.connect((self.channels_channel_model_0, 0), (self.qtgui_const_sink_x_0, 0)) + self.connect((self.channels_channel_model_0, 0), (self.qtgui_freq_sink_x_0, 0)) + self.connect((self.channels_channel_model_0, 0), (self.qtgui_time_sink_x_0, 0)) + self.connect((self.digital_constellation_modulator_0, 0), (self.blocks_throttle_0, 0)) + self.connect((self.digital_pfb_clock_sync_xxx_0, 0), (self.qtgui_const_sink_x_0_0, 0)) + + + def closeEvent(self, event): + self.settings = Qt.QSettings("GNU Radio", "qpks") + self.settings.setValue("geometry", self.saveGeometry()) + event.accept() + + def get_sps(self): + return self.sps + + def set_sps(self, sps): + self.sps = sps + self.set_rrc_taps(firdes.root_raised_cosine(self.nfilts, self.nfilts, 1.0/float(self.sps), 0.35, 45*self.nfilts)) + + def get_nfilts(self): + return self.nfilts + + def set_nfilts(self, nfilts): + self.nfilts = nfilts + self.set_rrc_taps(firdes.root_raised_cosine(self.nfilts, self.nfilts, 1.0/float(self.sps), 0.35, 45*self.nfilts)) + + def get_timing_loop_bw(self): + return self.timing_loop_bw + + def set_timing_loop_bw(self, timing_loop_bw): + self.timing_loop_bw = timing_loop_bw + self.digital_pfb_clock_sync_xxx_0.set_loop_bandwidth(self.timing_loop_bw) + + def get_time_offset(self): + return self.time_offset + + def set_time_offset(self, time_offset): + self.time_offset = time_offset + self.channels_channel_model_0.set_timing_offset(self.time_offset) + + def get_taps(self): + return self.taps + + def set_taps(self, taps): + self.taps = taps + self.channels_channel_model_0.set_taps(self.taps) + + def get_samp_rate(self): + return self.samp_rate + + def set_samp_rate(self, samp_rate): + self.samp_rate = samp_rate + self.blocks_throttle_0.set_sample_rate(self.samp_rate) + self.qtgui_freq_sink_x_0.set_frequency_range(0, self.samp_rate) + self.qtgui_time_sink_x_0.set_samp_rate(self.samp_rate) + + def get_rrc_taps(self): + return self.rrc_taps + + def set_rrc_taps(self, rrc_taps): + self.rrc_taps = rrc_taps + self.digital_pfb_clock_sync_xxx_0.update_taps(self.rrc_taps) + + def get_qpsk_const(self): + return self.qpsk_const + + def set_qpsk_const(self, qpsk_const): + self.qpsk_const = qpsk_const + + def get_noise_volt(self): + return self.noise_volt + + def set_noise_volt(self, noise_volt): + self.noise_volt = noise_volt + self.channels_channel_model_0.set_noise_voltage(self.noise_volt) + + def get_freq_offset(self): + return self.freq_offset + + def set_freq_offset(self, freq_offset): + self.freq_offset = freq_offset + self.channels_channel_model_0.set_frequency_offset(self.freq_offset) + + def get_excess_bw(self): + return self.excess_bw + + def set_excess_bw(self, excess_bw): + self.excess_bw = excess_bw + + + + + +def main(top_block_cls=qpks, options=None): + if gr.enable_realtime_scheduling() != gr.RT_OK: + print("Error: failed to enable real-time scheduling.") + + if StrictVersion("4.5.0") <= StrictVersion(Qt.qVersion()) < StrictVersion("5.0.0"): + style = gr.prefs().get_string('qtgui', 'style', 'raster') + Qt.QApplication.setGraphicsSystem(style) + qapp = Qt.QApplication(sys.argv) + + tb = top_block_cls() + + tb.start() + + tb.show() + + def sig_handler(sig=None, frame=None): + Qt.QApplication.quit() + + signal.signal(signal.SIGINT, sig_handler) + signal.signal(signal.SIGTERM, sig_handler) + + timer = Qt.QTimer() + timer.start(500) + timer.timeout.connect(lambda: None) + + def quitting(): + tb.stop() + tb.wait() + + qapp.aboutToQuit.connect(quitting) + qapp.exec_() + +if __name__ == '__main__': + main() -- cgit v1.2.1