aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--tests/correlator/correlator.grc104
1 files changed, 52 insertions, 52 deletions
diff --git a/tests/correlator/correlator.grc b/tests/correlator/correlator.grc
index 555ad15..c545532 100644
--- a/tests/correlator/correlator.grc
+++ b/tests/correlator/correlator.grc
@@ -472,57 +472,57 @@ blocks:
- name: epy_block_0
id: epy_block
parameters:
- _source_code: "import pmt\n\nimport numpy as np\nfrom gnuradio import gr\n\nprint\
- \ = lambda x: None\n\nclass blk(gr.sync_block):\n \"\"\"\n Apply phase\
- \ and frequency correction where there is a correlation peak tag.\n\n The\
- \ correlation peak tags are NOT propagated, and instead replaced with a\n \
- \ frame_start tag.\n \"\"\"\n def __init__(self):\n gr.sync_block.__init__(\n\
- \ self,\n name='Phase and Frequency Correction',\n \
- \ in_sig=[np.complex64],\n out_sig=[np.complex64]\n \
- \ )\n\n # tags should not be propagated, we then output our own tags\n\
- \ self.set_tag_propagation_policy(gr.TPP_DONT)\n\n # because we\
- \ do block processing, we need to keep track of the last tag\n # of the\
- \ previous block to correct the first values of the next block\n self.last\
- \ = None\n self.lastfreq = 0\n self.lastnback = 0\n self.lastnsamples\
- \ = 0\n\n def block_phase(self, start, end):\n \"\"\"\n Compute\
- \ a vector for the phase and frequency correction for the samples\n between\
- \ two tags (start and end).\n\n @param start Tag where the samples should\
- \ start to be corrected\n @param end Tag where to stop correcting\n\
- \n @return A vector of phase values for each sample. To correct the samples\n\
- \ the data should be multiplied with np.exp(-1j * phase)\n \
- \ \"\"\"\n # compute number of samples between tags\n nsamples\
- \ = end.offset - start.offset\n\n # debugging, see last lines of self.work()\n\
- \ self.lastnsamples = nsamples\n\n # unpack pmt values into start\
- \ and end phase\n sphase = pmt.to_python(start.value)\n ephase\
- \ = pmt.to_python(end.value)\n\n # compute frequency offset between start\
- \ and end\n phasediff = (ephase - sphase) % (2 * np.pi)\n freq\
- \ = phasediff / nsamples\n\n # save this one for the last block (see\
- \ variable `end' in self.work)\n self.lastfreq = freq\n\n # debugging\n\
- \ print(f\"Correction for chunk of {nsamples:2d} samples is \" \\\n \
- \ f\"sphase={sphase: .4f} rad and freq={freq*1e3: .4f}e-3 rad /\
- \ sample\")\n\n # compute chunk values\n return sphase * np.ones(nsamples)\
- \ + freq * np.arange(0, nsamples)\n\n def work(self, input_items, output_items):\n\
- \ counter = self.nitems_written(0)\n\n # nicer aliases\n \
- \ inp = input_items[0]\n out = output_items[0]\n\n # read phase\
- \ tags\n is_phase = lambda tag: pmt.to_python(tag.key) == \"phase_est\"\
- \n tags = list(filter(is_phase, self.get_tags_in_window(0, 0, len(inp))))\n\
- \n if not tags:\n print(f\"There were no tags in {len(inp)}\
- \ samples!\")\n out[:] = inp\n return len(out)\n\n \
- \ # debugging\n print(f\"Processing {len(tags)} tags = {tags[-1].offset\
- \ - tags[0].offset} \" \\\n f\"samples out of {len(inp)} input\
- \ samples\")\n\n # compute \"the middle\"\n enough_samples = lambda\
- \ pair: ((pair[1].offset - pair[0].offset) > 0)\n pairs = list(filter(enough_samples,\
- \ zip(tags, tags[1:])))\n chunks = [ self.block_phase(start, end) for\
- \ (start, end) in pairs ]\n middle = np.concatenate(chunks) if chunks\
- \ else []\n\n # compute values at the end, we do not have informations\
- \ about the future\n # but we can use the frequency of the last tag to\
- \ approximate\n nback = len(inp) - (tags[-1].offset - counter)\n \
- \ print(f\"Processing {nback} samples at the back of the buffer\")\n \
- \ end = np.ones(nback) * pmt.to_python(tags[-1].value) \\\n \
- \ + self.lastfreq * np.arange(0, nback)\n\n\n # compute the \"start\"\
- , using the last tag from the previous call\n nfront = tags[0].offset\
- \ - counter\n print(f\"Processing {nfront} samples at the front of the\
- \ buffer\")\n start = self.block_phase(self.last, tags[0])[-nfront:]\
+ _source_code: "import pmt\n\nimport numpy as np\nfrom gnuradio import gr\n\n#\
+ \ hide debugging print statements for the moment\nprint = lambda x: None\n\n\
+ class blk(gr.sync_block):\n \"\"\"\n Apply phase and frequency correction\
+ \ where there is a correlation peak tag.\n\n The correlation peak tags are\
+ \ NOT propagated, and instead replaced with a\n frame_start tag.\n \"\"\
+ \"\n def __init__(self):\n gr.sync_block.__init__(\n self,\n\
+ \ name='Phase and Frequency Correction',\n in_sig=[np.complex64],\n\
+ \ out_sig=[np.complex64]\n )\n\n # tags should not\
+ \ be propagated, we then output our own tags\n self.set_tag_propagation_policy(gr.TPP_DONT)\n\
+ \n # because we do block processing, we need to keep track of the last\
+ \ tag\n # of the previous block to correct the first values of the next\
+ \ block\n self.last = None\n self.lastfreq = 0\n self.lastnback\
+ \ = 0\n self.lastnsamples = 0\n\n def block_phase(self, start, end):\n\
+ \ \"\"\"\n Compute a vector for the phase and frequency correction\
+ \ for the samples\n between two tags (start and end).\n\n @param\
+ \ start Tag where the samples should start to be corrected\n @param end\
+ \ Tag where to stop correcting\n\n @return A vector of phase values\
+ \ for each sample. To correct the samples\n the data should be\
+ \ multiplied with np.exp(-1j * phase)\n \"\"\"\n # compute number\
+ \ of samples between tags\n nsamples = end.offset - start.offset\n\n\
+ \ # debugging, see last lines of self.work()\n self.lastnsamples\
+ \ = nsamples\n\n # unpack pmt values into start and end phase\n \
+ \ sphase = pmt.to_python(start.value)\n ephase = pmt.to_python(end.value)\n\
+ \n # compute frequency offset between start and end\n phasediff\
+ \ = (ephase - sphase) % (2 * np.pi)\n freq = phasediff / nsamples\n\n\
+ \ # save this one for the last block (see variable `end' in self.work)\n\
+ \ self.lastfreq = freq\n\n # debugging\n print(f\"Correction\
+ \ for chunk of {nsamples:2d} samples is \" \\\n f\"sphase={sphase:\
+ \ .4f} rad and freq={freq*1e3: .4f}e-3 rad / sample\")\n\n # compute\
+ \ chunk values\n return sphase * np.ones(nsamples) + freq * np.arange(0,\
+ \ nsamples)\n\n def work(self, input_items, output_items):\n counter\
+ \ = self.nitems_written(0)\n\n # nicer aliases\n inp = input_items[0]\n\
+ \ out = output_items[0]\n\n # read phase tags\n is_phase\
+ \ = lambda tag: pmt.to_python(tag.key) == \"phase_est\"\n tags = list(filter(is_phase,\
+ \ self.get_tags_in_window(0, 0, len(inp))))\n\n if not tags:\n \
+ \ print(f\"There were no tags in {len(inp)} samples!\")\n out[:]\
+ \ = inp\n return len(out)\n\n # debugging\n print(f\"\
+ Processing {len(tags)} tags = {tags[-1].offset - tags[0].offset} \" \\\n \
+ \ f\"samples out of {len(inp)} input samples\")\n\n # compute\
+ \ \"the middle\"\n enough_samples = lambda pair: ((pair[1].offset - pair[0].offset)\
+ \ > 0)\n pairs = list(filter(enough_samples, zip(tags, tags[1:])))\n\
+ \ chunks = [ self.block_phase(start, end) for (start, end) in pairs ]\n\
+ \ middle = np.concatenate(chunks) if chunks else []\n\n # compute\
+ \ values at the end, we do not have informations about the future\n #\
+ \ but we can use the frequency of the last tag to approximate\n nback\
+ \ = len(inp) - (tags[-1].offset - counter)\n print(f\"Processing {nback}\
+ \ samples at the back of the buffer\")\n end = np.ones(nback) * pmt.to_python(tags[-1].value)\
+ \ \\\n + self.lastfreq * np.arange(0, nback)\n\n\n # compute\
+ \ the \"start\", using the last tag from the previous call\n nfront =\
+ \ tags[0].offset - counter\n print(f\"Processing {nfront} samples at\
+ \ the front of the buffer\")\n start = self.block_phase(self.last, tags[0])[-nfront:]\
\ \\\n if self.last and nfront else np.zeros(nfront)\n\n \
\ # debugging\n if nfront + self.lastnback != self.lastnsamples:\n\
\ print(f\"Something went wrong: {self.lastnback + nfront} != self.lastnsamples\"\
@@ -585,7 +585,7 @@ blocks:
\ == self.tag\n tags = filter(is_frame_start, self.get_tags_in_window(0,\
\ 0, len(inp)))\n\n counter = self.nitems_written(0)\n offsets\
\ = map(lambda t: t.offset - counter, tags)\n\n print(list(offsets))\n\
- \n output_items[0][:] = inp\n return len(output_items[0])\n"
+ \n output_items[0][:] = inp.reshape(())\n return len(output_items[0])\n"
affinity: ''
alias: ''
comment: ''