#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # SPDX-License-Identifier: GPL-3.0 # # GNU Radio Python Flow Graph # Title: QAM # Author: Pross Naoki, Halter Sara Cinzia # GNU Radio version: 3.8.2.0 from distutils.version import StrictVersion if __name__ == '__main__': import ctypes import sys if sys.platform.startswith('linux'): try: x11 = ctypes.cdll.LoadLibrary('libX11.so') x11.XInitThreads() except: print("Warning: failed to XInitThreads()") from gnuradio import blocks from gnuradio import channels from gnuradio.filter import firdes from gnuradio import digital from gnuradio import gr import sys import signal from PyQt5 import Qt from argparse import ArgumentParser from gnuradio.eng_arg import eng_float, intx from gnuradio import eng_notation import fadingui from gnuradio import qtgui class qam_nogui(gr.top_block, Qt.QWidget): def __init__(self): gr.top_block.__init__(self, "QAM") Qt.QWidget.__init__(self) self.setWindowTitle("QAM") qtgui.util.check_set_qss() try: self.setWindowIcon(Qt.QIcon.fromTheme('gnuradio-grc')) except: pass self.top_scroll_layout = Qt.QVBoxLayout() self.setLayout(self.top_scroll_layout) self.top_scroll = Qt.QScrollArea() self.top_scroll.setFrameStyle(Qt.QFrame.NoFrame) self.top_scroll_layout.addWidget(self.top_scroll) self.top_scroll.setWidgetResizable(True) self.top_widget = Qt.QWidget() self.top_scroll.setWidget(self.top_widget) self.top_layout = Qt.QVBoxLayout(self.top_widget) self.top_grid_layout = Qt.QGridLayout() self.top_layout.addLayout(self.top_grid_layout) self.settings = Qt.QSettings("GNU Radio", "qam_nogui") try: if StrictVersion(Qt.qVersion()) < StrictVersion("5.0.0"): self.restoreGeometry(self.settings.value("geometry").toByteArray()) else: self.restoreGeometry(self.settings.value("geometry")) except: pass ################################################## # Variables ################################################## self.sps = sps = 4 self.nfilts = nfilts = 32 self.excess_bw = excess_bw = 350e-3 self.timing_loop_bw = timing_loop_bw = 2 * 3.141592653589793 / 100 self.time_offset = time_offset = 1 self.samp_rate = samp_rate = 32000 self.rrc_taps = rrc_taps = firdes.root_raised_cosine(nfilts, nfilts, 1.0/float(sps), excess_bw, 45*nfilts) self.phase_bw = phase_bw = .01 self.noise_volt = noise_volt = 0 self.freq_offset = freq_offset = 0 self.frame = frame = fadingui.frame_obj(preamble=[190, 239], payload_len=32768) self.eq_ntaps = eq_ntaps = 15 self.eq_mod = eq_mod = 1 self.eq_gain = eq_gain = .002 self.const = const = digital.constellation_16qam().base() self.chn_taps = chn_taps = [1.0 + 0.0j, ] ################################################## # Blocks ################################################## self.fadingui_xor_frame_sync_0 = fadingui.xor_frame_sync(sync_pattern=frame.preamble, buffer_size=frame.length * 5) self.fadingui_deframer_0 = fadingui.deframer(frame_obj=frame) self.fadingui_datasource_0 = fadingui.datasource(frame_obj=frame, filename='/home/god/Documents/Fading/tests/fadingui/QAM/lena512color.tiff') self.digital_pfb_clock_sync_xxx_0 = digital.pfb_clock_sync_ccf(sps, timing_loop_bw, rrc_taps, nfilts, nfilts/2, 1.5, 1) self.digital_diff_decoder_bb_0 = digital.diff_decoder_bb(4) self.digital_costas_loop_cc_0 = digital.costas_loop_cc(phase_bw, 4, False) self.digital_constellation_modulator_0 = digital.generic_mod( constellation=const, differential=True, samples_per_symbol=sps, pre_diff_code=True, excess_bw=excess_bw, verbose=False, log=False) self.digital_constellation_decoder_cb_0 = digital.constellation_decoder_cb(const) self.digital_cma_equalizer_cc_0 = digital.cma_equalizer_cc(eq_ntaps, eq_mod, eq_gain, 1) self.channels_channel_model_0 = channels.channel_model( noise_voltage=noise_volt, frequency_offset=freq_offset, epsilon=time_offset, taps=chn_taps, noise_seed=0, block_tags=False) self.blocks_throttle_0 = blocks.throttle(gr.sizeof_gr_complex*1, samp_rate,True) self.blocks_null_sink_0 = blocks.null_sink(gr.sizeof_char*1) ################################################## # Connections ################################################## self.connect((self.blocks_throttle_0, 0), (self.channels_channel_model_0, 0)) self.connect((self.channels_channel_model_0, 0), (self.digital_pfb_clock_sync_xxx_0, 0)) self.connect((self.digital_cma_equalizer_cc_0, 0), (self.digital_costas_loop_cc_0, 0)) self.connect((self.digital_constellation_decoder_cb_0, 0), (self.digital_diff_decoder_bb_0, 0)) self.connect((self.digital_constellation_modulator_0, 0), (self.blocks_throttle_0, 0)) self.connect((self.digital_costas_loop_cc_0, 0), (self.digital_constellation_decoder_cb_0, 0)) self.connect((self.digital_diff_decoder_bb_0, 0), (self.fadingui_xor_frame_sync_0, 0)) self.connect((self.digital_pfb_clock_sync_xxx_0, 0), (self.digital_cma_equalizer_cc_0, 0)) self.connect((self.fadingui_datasource_0, 0), (self.digital_constellation_modulator_0, 0)) self.connect((self.fadingui_deframer_0, 0), (self.blocks_null_sink_0, 0)) self.connect((self.fadingui_xor_frame_sync_0, 0), (self.fadingui_deframer_0, 0)) def closeEvent(self, event): self.settings = Qt.QSettings("GNU Radio", "qam_nogui") self.settings.setValue("geometry", self.saveGeometry()) event.accept() def get_sps(self): return self.sps def set_sps(self, sps): self.sps = sps self.set_rrc_taps(firdes.root_raised_cosine(self.nfilts, self.nfilts, 1.0/float(self.sps), self.excess_bw, 45*self.nfilts)) def get_nfilts(self): return self.nfilts def set_nfilts(self, nfilts): self.nfilts = nfilts self.set_rrc_taps(firdes.root_raised_cosine(self.nfilts, self.nfilts, 1.0/float(self.sps), self.excess_bw, 45*self.nfilts)) def get_excess_bw(self): return self.excess_bw def set_excess_bw(self, excess_bw): self.excess_bw = excess_bw self.set_rrc_taps(firdes.root_raised_cosine(self.nfilts, self.nfilts, 1.0/float(self.sps), self.excess_bw, 45*self.nfilts)) def get_timing_loop_bw(self): return self.timing_loop_bw def set_timing_loop_bw(self, timing_loop_bw): self.timing_loop_bw = timing_loop_bw self.digital_pfb_clock_sync_xxx_0.set_loop_bandwidth(self.timing_loop_bw) def get_time_offset(self): return self.time_offset def set_time_offset(self, time_offset): self.time_offset = time_offset self.channels_channel_model_0.set_timing_offset(self.time_offset) def get_samp_rate(self): return self.samp_rate def set_samp_rate(self, samp_rate): self.samp_rate = samp_rate self.blocks_throttle_0.set_sample_rate(self.samp_rate) def get_rrc_taps(self): return self.rrc_taps def set_rrc_taps(self, rrc_taps): self.rrc_taps = rrc_taps self.digital_pfb_clock_sync_xxx_0.update_taps(self.rrc_taps) def get_phase_bw(self): return self.phase_bw def set_phase_bw(self, phase_bw): self.phase_bw = phase_bw self.digital_costas_loop_cc_0.set_loop_bandwidth(self.phase_bw) def get_noise_volt(self): return self.noise_volt def set_noise_volt(self, noise_volt): self.noise_volt = noise_volt self.channels_channel_model_0.set_noise_voltage(self.noise_volt) def get_freq_offset(self): return self.freq_offset def set_freq_offset(self, freq_offset): self.freq_offset = freq_offset self.channels_channel_model_0.set_frequency_offset(self.freq_offset) def get_frame(self): return self.frame def set_frame(self, frame): self.frame = frame def get_eq_ntaps(self): return self.eq_ntaps def set_eq_ntaps(self, eq_ntaps): self.eq_ntaps = eq_ntaps def get_eq_mod(self): return self.eq_mod def set_eq_mod(self, eq_mod): self.eq_mod = eq_mod self.digital_cma_equalizer_cc_0.set_modulus(self.eq_mod) def get_eq_gain(self): return self.eq_gain def set_eq_gain(self, eq_gain): self.eq_gain = eq_gain self.digital_cma_equalizer_cc_0.set_gain(self.eq_gain) def get_const(self): return self.const def set_const(self, const): self.const = const def get_chn_taps(self): return self.chn_taps def set_chn_taps(self, chn_taps): self.chn_taps = chn_taps self.channels_channel_model_0.set_taps(self.chn_taps) def main(top_block_cls=qam_nogui, options=None): if gr.enable_realtime_scheduling() != gr.RT_OK: print("Error: failed to enable real-time scheduling.") if StrictVersion("4.5.0") <= StrictVersion(Qt.qVersion()) < StrictVersion("5.0.0"): style = gr.prefs().get_string('qtgui', 'style', 'raster') Qt.QApplication.setGraphicsSystem(style) qapp = Qt.QApplication(sys.argv) tb = top_block_cls() tb.start() tb.show() def sig_handler(sig=None, frame=None): Qt.QApplication.quit() signal.signal(signal.SIGINT, sig_handler) signal.signal(signal.SIGTERM, sig_handler) timer = Qt.QTimer() timer.start(500) timer.timeout.connect(lambda: None) def quitting(): tb.stop() tb.wait() qapp.aboutToQuit.connect(quitting) qapp.exec_() if __name__ == '__main__': main()