diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/correlator/correlator.grc | 104 |
1 files changed, 52 insertions, 52 deletions
diff --git a/tests/correlator/correlator.grc b/tests/correlator/correlator.grc index 555ad15..c545532 100644 --- a/tests/correlator/correlator.grc +++ b/tests/correlator/correlator.grc @@ -472,57 +472,57 @@ blocks: - name: epy_block_0 id: epy_block parameters: - _source_code: "import pmt\n\nimport numpy as np\nfrom gnuradio import gr\n\nprint\ - \ = lambda x: None\n\nclass blk(gr.sync_block):\n \"\"\"\n Apply phase\ - \ and frequency correction where there is a correlation peak tag.\n\n The\ - \ correlation peak tags are NOT propagated, and instead replaced with a\n \ - \ frame_start tag.\n \"\"\"\n def __init__(self):\n gr.sync_block.__init__(\n\ - \ self,\n name='Phase and Frequency Correction',\n \ - \ in_sig=[np.complex64],\n out_sig=[np.complex64]\n \ - \ )\n\n # tags should not be propagated, we then output our own tags\n\ - \ self.set_tag_propagation_policy(gr.TPP_DONT)\n\n # because we\ - \ do block processing, we need to keep track of the last tag\n # of the\ - \ previous block to correct the first values of the next block\n self.last\ - \ = None\n self.lastfreq = 0\n self.lastnback = 0\n self.lastnsamples\ - \ = 0\n\n def block_phase(self, start, end):\n \"\"\"\n Compute\ - \ a vector for the phase and frequency correction for the samples\n between\ - \ two tags (start and end).\n\n @param start Tag where the samples should\ - \ start to be corrected\n @param end Tag where to stop correcting\n\ - \n @return A vector of phase values for each sample. To correct the samples\n\ - \ the data should be multiplied with np.exp(-1j * phase)\n \ - \ \"\"\"\n # compute number of samples between tags\n nsamples\ - \ = end.offset - start.offset\n\n # debugging, see last lines of self.work()\n\ - \ self.lastnsamples = nsamples\n\n # unpack pmt values into start\ - \ and end phase\n sphase = pmt.to_python(start.value)\n ephase\ - \ = pmt.to_python(end.value)\n\n # compute frequency offset between start\ - \ and end\n phasediff = (ephase - sphase) % (2 * np.pi)\n freq\ - \ = phasediff / nsamples\n\n # save this one for the last block (see\ - \ variable `end' in self.work)\n self.lastfreq = freq\n\n # debugging\n\ - \ print(f\"Correction for chunk of {nsamples:2d} samples is \" \\\n \ - \ f\"sphase={sphase: .4f} rad and freq={freq*1e3: .4f}e-3 rad /\ - \ sample\")\n\n # compute chunk values\n return sphase * np.ones(nsamples)\ - \ + freq * np.arange(0, nsamples)\n\n def work(self, input_items, output_items):\n\ - \ counter = self.nitems_written(0)\n\n # nicer aliases\n \ - \ inp = input_items[0]\n out = output_items[0]\n\n # read phase\ - \ tags\n is_phase = lambda tag: pmt.to_python(tag.key) == \"phase_est\"\ - \n tags = list(filter(is_phase, self.get_tags_in_window(0, 0, len(inp))))\n\ - \n if not tags:\n print(f\"There were no tags in {len(inp)}\ - \ samples!\")\n out[:] = inp\n return len(out)\n\n \ - \ # debugging\n print(f\"Processing {len(tags)} tags = {tags[-1].offset\ - \ - tags[0].offset} \" \\\n f\"samples out of {len(inp)} input\ - \ samples\")\n\n # compute \"the middle\"\n enough_samples = lambda\ - \ pair: ((pair[1].offset - pair[0].offset) > 0)\n pairs = list(filter(enough_samples,\ - \ zip(tags, tags[1:])))\n chunks = [ self.block_phase(start, end) for\ - \ (start, end) in pairs ]\n middle = np.concatenate(chunks) if chunks\ - \ else []\n\n # compute values at the end, we do not have informations\ - \ about the future\n # but we can use the frequency of the last tag to\ - \ approximate\n nback = len(inp) - (tags[-1].offset - counter)\n \ - \ print(f\"Processing {nback} samples at the back of the buffer\")\n \ - \ end = np.ones(nback) * pmt.to_python(tags[-1].value) \\\n \ - \ + self.lastfreq * np.arange(0, nback)\n\n\n # compute the \"start\"\ - , using the last tag from the previous call\n nfront = tags[0].offset\ - \ - counter\n print(f\"Processing {nfront} samples at the front of the\ - \ buffer\")\n start = self.block_phase(self.last, tags[0])[-nfront:]\ + _source_code: "import pmt\n\nimport numpy as np\nfrom gnuradio import gr\n\n#\ + \ hide debugging print statements for the moment\nprint = lambda x: None\n\n\ + class blk(gr.sync_block):\n \"\"\"\n Apply phase and frequency correction\ + \ where there is a correlation peak tag.\n\n The correlation peak tags are\ + \ NOT propagated, and instead replaced with a\n frame_start tag.\n \"\"\ + \"\n def __init__(self):\n gr.sync_block.__init__(\n self,\n\ + \ name='Phase and Frequency Correction',\n in_sig=[np.complex64],\n\ + \ out_sig=[np.complex64]\n )\n\n # tags should not\ + \ be propagated, we then output our own tags\n self.set_tag_propagation_policy(gr.TPP_DONT)\n\ + \n # because we do block processing, we need to keep track of the last\ + \ tag\n # of the previous block to correct the first values of the next\ + \ block\n self.last = None\n self.lastfreq = 0\n self.lastnback\ + \ = 0\n self.lastnsamples = 0\n\n def block_phase(self, start, end):\n\ + \ \"\"\"\n Compute a vector for the phase and frequency correction\ + \ for the samples\n between two tags (start and end).\n\n @param\ + \ start Tag where the samples should start to be corrected\n @param end\ + \ Tag where to stop correcting\n\n @return A vector of phase values\ + \ for each sample. To correct the samples\n the data should be\ + \ multiplied with np.exp(-1j * phase)\n \"\"\"\n # compute number\ + \ of samples between tags\n nsamples = end.offset - start.offset\n\n\ + \ # debugging, see last lines of self.work()\n self.lastnsamples\ + \ = nsamples\n\n # unpack pmt values into start and end phase\n \ + \ sphase = pmt.to_python(start.value)\n ephase = pmt.to_python(end.value)\n\ + \n # compute frequency offset between start and end\n phasediff\ + \ = (ephase - sphase) % (2 * np.pi)\n freq = phasediff / nsamples\n\n\ + \ # save this one for the last block (see variable `end' in self.work)\n\ + \ self.lastfreq = freq\n\n # debugging\n print(f\"Correction\ + \ for chunk of {nsamples:2d} samples is \" \\\n f\"sphase={sphase:\ + \ .4f} rad and freq={freq*1e3: .4f}e-3 rad / sample\")\n\n # compute\ + \ chunk values\n return sphase * np.ones(nsamples) + freq * np.arange(0,\ + \ nsamples)\n\n def work(self, input_items, output_items):\n counter\ + \ = self.nitems_written(0)\n\n # nicer aliases\n inp = input_items[0]\n\ + \ out = output_items[0]\n\n # read phase tags\n is_phase\ + \ = lambda tag: pmt.to_python(tag.key) == \"phase_est\"\n tags = list(filter(is_phase,\ + \ self.get_tags_in_window(0, 0, len(inp))))\n\n if not tags:\n \ + \ print(f\"There were no tags in {len(inp)} samples!\")\n out[:]\ + \ = inp\n return len(out)\n\n # debugging\n print(f\"\ + Processing {len(tags)} tags = {tags[-1].offset - tags[0].offset} \" \\\n \ + \ f\"samples out of {len(inp)} input samples\")\n\n # compute\ + \ \"the middle\"\n enough_samples = lambda pair: ((pair[1].offset - pair[0].offset)\ + \ > 0)\n pairs = list(filter(enough_samples, zip(tags, tags[1:])))\n\ + \ chunks = [ self.block_phase(start, end) for (start, end) in pairs ]\n\ + \ middle = np.concatenate(chunks) if chunks else []\n\n # compute\ + \ values at the end, we do not have informations about the future\n #\ + \ but we can use the frequency of the last tag to approximate\n nback\ + \ = len(inp) - (tags[-1].offset - counter)\n print(f\"Processing {nback}\ + \ samples at the back of the buffer\")\n end = np.ones(nback) * pmt.to_python(tags[-1].value)\ + \ \\\n + self.lastfreq * np.arange(0, nback)\n\n\n # compute\ + \ the \"start\", using the last tag from the previous call\n nfront =\ + \ tags[0].offset - counter\n print(f\"Processing {nfront} samples at\ + \ the front of the buffer\")\n start = self.block_phase(self.last, tags[0])[-nfront:]\ \ \\\n if self.last and nfront else np.zeros(nfront)\n\n \ \ # debugging\n if nfront + self.lastnback != self.lastnsamples:\n\ \ print(f\"Something went wrong: {self.lastnback + nfront} != self.lastnsamples\"\ @@ -585,7 +585,7 @@ blocks: \ == self.tag\n tags = filter(is_frame_start, self.get_tags_in_window(0,\ \ 0, len(inp)))\n\n counter = self.nitems_written(0)\n offsets\ \ = map(lambda t: t.offset - counter, tags)\n\n print(list(offsets))\n\ - \n output_items[0][:] = inp\n return len(output_items[0])\n" + \n output_items[0][:] = inp.reshape(())\n return len(output_items[0])\n" affinity: '' alias: '' comment: '' |