#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # SPDX-License-Identifier: GPL-3.0 # # GNU Radio Python Flow Graph # Title: Simulation of a fading channel # Author: Naoki Pross # GNU Radio version: 3.8.2.0 from distutils.version import StrictVersion if __name__ == '__main__': import ctypes import sys if sys.platform.startswith('linux'): try: x11 = ctypes.cdll.LoadLibrary('libX11.so') x11.XInitThreads() except: print("Warning: failed to XInitThreads()") from PyQt5 import Qt from gnuradio import qtgui from gnuradio.filter import firdes import sip from gnuradio import analog from gnuradio import blocks from gnuradio import fosphor from gnuradio.fft import window from gnuradio import gr import sys import signal from argparse import ArgumentParser from gnuradio.eng_arg import eng_float, intx from gnuradio import eng_notation from gnuradio import qtgui class Simluation(gr.top_block, Qt.QWidget): def __init__(self): gr.top_block.__init__(self, "Simulation of a fading channel") Qt.QWidget.__init__(self) self.setWindowTitle("Simulation of a fading channel") qtgui.util.check_set_qss() try: self.setWindowIcon(Qt.QIcon.fromTheme('gnuradio-grc')) except: pass self.top_scroll_layout = Qt.QVBoxLayout() self.setLayout(self.top_scroll_layout) self.top_scroll = Qt.QScrollArea() self.top_scroll.setFrameStyle(Qt.QFrame.NoFrame) self.top_scroll_layout.addWidget(self.top_scroll) self.top_scroll.setWidgetResizable(True) self.top_widget = Qt.QWidget() self.top_scroll.setWidget(self.top_widget) self.top_layout = Qt.QVBoxLayout(self.top_widget) self.top_grid_layout = Qt.QGridLayout() self.top_layout.addLayout(self.top_grid_layout) self.settings = Qt.QSettings("GNU Radio", "Simluation") try: if StrictVersion(Qt.qVersion()) < StrictVersion("5.0.0"): self.restoreGeometry(self.settings.value("geometry").toByteArray()) else: self.restoreGeometry(self.settings.value("geometry")) except: pass ################################################## # Variables ################################################## self.samp_rate = samp_rate = 128e3 self.modulation_factor = modulation_factor = .7 self.carrier_freq = carrier_freq = 5e3 ################################################## # Blocks ################################################## self.qtgui_time_sink_x_0 = qtgui.time_sink_f( 1024, #size samp_rate, #samp_rate "Demodulated AM signal", #name 2 #number of inputs ) self.qtgui_time_sink_x_0.set_update_time(1e-1) self.qtgui_time_sink_x_0.set_y_axis(-1, 1) self.qtgui_time_sink_x_0.set_y_label('Amplitude', "") self.qtgui_time_sink_x_0.enable_tags(True) self.qtgui_time_sink_x_0.set_trigger_mode(qtgui.TRIG_MODE_NORM, qtgui.TRIG_SLOPE_POS, 0.0, 1e-3, 0, "") self.qtgui_time_sink_x_0.enable_autoscale(True) self.qtgui_time_sink_x_0.enable_grid(True) self.qtgui_time_sink_x_0.enable_axis_labels(True) self.qtgui_time_sink_x_0.enable_control_panel(False) self.qtgui_time_sink_x_0.enable_stem_plot(False) labels = ['Original', 'Demodulated', 'Signal 3', 'Signal 4', 'Signal 5', 'Signal 6', 'Signal 7', 'Signal 8', 'Signal 9', 'Signal 10'] widths = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] colors = ['blue', 'red', 'green', 'black', 'cyan', 'magenta', 'yellow', 'dark red', 'dark green', 'dark blue'] alphas = [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0] styles = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] markers = [-1, -1, -1, -1, -1, -1, -1, -1, -1, -1] for i in range(2): if len(labels[i]) == 0: self.qtgui_time_sink_x_0.set_line_label(i, "Data {0}".format(i)) else: self.qtgui_time_sink_x_0.set_line_label(i, labels[i]) self.qtgui_time_sink_x_0.set_line_width(i, widths[i]) self.qtgui_time_sink_x_0.set_line_color(i, colors[i]) self.qtgui_time_sink_x_0.set_line_style(i, styles[i]) self.qtgui_time_sink_x_0.set_line_marker(i, markers[i]) self.qtgui_time_sink_x_0.set_line_alpha(i, alphas[i]) self._qtgui_time_sink_x_0_win = sip.wrapinstance(self.qtgui_time_sink_x_0.pyqwidget(), Qt.QWidget) self.top_grid_layout.addWidget(self._qtgui_time_sink_x_0_win) self.fosphor_glfw_sink_c_0 = fosphor.glfw_sink_c() self.fosphor_glfw_sink_c_0.set_fft_window(firdes.WIN_BLACKMAN_hARRIS) self.fosphor_glfw_sink_c_0.set_frequency_range(0, samp_rate) self.blocks_throttle_0 = blocks.throttle(gr.sizeof_gr_complex*1, samp_rate * 2,True) self.blocks_multiply_xx_1 = blocks.multiply_vcc(1) self.blocks_multiply_xx_0 = blocks.multiply_vcc(1) self.blocks_complex_to_real_0 = blocks.complex_to_real(1) self.blocks_add_xx_0 = blocks.add_vcc(1) self.analog_sig_source_x_1 = analog.sig_source_c(samp_rate, analog.GR_SQR_WAVE, 50, 1, 0, 0) self.analog_sig_source_x_0 = analog.sig_source_c(samp_rate, analog.GR_COS_WAVE, carrier_freq, 1, 0, 0) self.analog_const_source_x_1 = analog.sig_source_c(0, analog.GR_CONST_WAVE, 0, 0, modulation_factor) self.analog_const_source_x_0 = analog.sig_source_c(0, analog.GR_CONST_WAVE, 0, 0, 1) self.analog_am_demod_cf_0 = analog.am_demod_cf( channel_rate=samp_rate, audio_decim=1, audio_pass=5000, audio_stop=5500, ) ################################################## # Connections ################################################## self.connect((self.analog_am_demod_cf_0, 0), (self.qtgui_time_sink_x_0, 1)) self.connect((self.analog_const_source_x_0, 0), (self.blocks_add_xx_0, 1)) self.connect((self.analog_const_source_x_1, 0), (self.blocks_multiply_xx_1, 0)) self.connect((self.analog_sig_source_x_0, 0), (self.blocks_multiply_xx_0, 1)) self.connect((self.analog_sig_source_x_1, 0), (self.blocks_multiply_xx_1, 1)) self.connect((self.blocks_add_xx_0, 0), (self.blocks_complex_to_real_0, 0)) self.connect((self.blocks_add_xx_0, 0), (self.blocks_multiply_xx_0, 0)) self.connect((self.blocks_complex_to_real_0, 0), (self.qtgui_time_sink_x_0, 0)) self.connect((self.blocks_multiply_xx_0, 0), (self.blocks_throttle_0, 0)) self.connect((self.blocks_multiply_xx_1, 0), (self.blocks_add_xx_0, 0)) self.connect((self.blocks_throttle_0, 0), (self.analog_am_demod_cf_0, 0)) self.connect((self.blocks_throttle_0, 0), (self.fosphor_glfw_sink_c_0, 0)) def closeEvent(self, event): self.settings = Qt.QSettings("GNU Radio", "Simluation") self.settings.setValue("geometry", self.saveGeometry()) event.accept() def get_samp_rate(self): return self.samp_rate def set_samp_rate(self, samp_rate): self.samp_rate = samp_rate self.analog_sig_source_x_0.set_sampling_freq(self.samp_rate) self.analog_sig_source_x_1.set_sampling_freq(self.samp_rate) self.blocks_throttle_0.set_sample_rate(self.samp_rate * 2) self.fosphor_glfw_sink_c_0.set_frequency_range(0, self.samp_rate) self.qtgui_time_sink_x_0.set_samp_rate(self.samp_rate) def get_modulation_factor(self): return self.modulation_factor def set_modulation_factor(self, modulation_factor): self.modulation_factor = modulation_factor self.analog_const_source_x_1.set_offset(self.modulation_factor) def get_carrier_freq(self): return self.carrier_freq def set_carrier_freq(self, carrier_freq): self.carrier_freq = carrier_freq self.analog_sig_source_x_0.set_frequency(self.carrier_freq) def main(top_block_cls=Simluation, options=None): if gr.enable_realtime_scheduling() != gr.RT_OK: print("Error: failed to enable real-time scheduling.") if StrictVersion("4.5.0") <= StrictVersion(Qt.qVersion()) < StrictVersion("5.0.0"): style = gr.prefs().get_string('qtgui', 'style', 'raster') Qt.QApplication.setGraphicsSystem(style) qapp = Qt.QApplication(sys.argv) tb = top_block_cls() tb.start() tb.show() def sig_handler(sig=None, frame=None): Qt.QApplication.quit() signal.signal(signal.SIGINT, sig_handler) signal.signal(signal.SIGTERM, sig_handler) timer = Qt.QTimer() timer.start(500) timer.timeout.connect(lambda: None) def quitting(): tb.stop() tb.wait() qapp.aboutToQuit.connect(quitting) qapp.exec_() if __name__ == '__main__': main()