aboutsummaryrefslogtreecommitdiffstats
path: root/tests/correlator/correlator.grc
diff options
context:
space:
mode:
Diffstat (limited to 'tests/correlator/correlator.grc')
-rw-r--r--tests/correlator/correlator.grc82
1 files changed, 39 insertions, 43 deletions
diff --git a/tests/correlator/correlator.grc b/tests/correlator/correlator.grc
index f54430c..57c1762 100644
--- a/tests/correlator/correlator.grc
+++ b/tests/correlator/correlator.grc
@@ -526,49 +526,45 @@ blocks:
_source_code: "import pmt\n\nimport numpy as np\nfrom gnuradio import gr\n\n\n\
class blk(gr.sync_block):\n def __init__(self):\n gr.sync_block.__init__(\n\
\ self,\n name='Phase Lock',\n in_sig=[np.complex64],\n\
- \ out_sig=[np.complex64]\n )\n\n # keep track of how\
- \ many samples were processed,\n # because tags have an absolute offset\n\
- \ self.sampnr: np.complex64 = 0\n\n # because of block processing\
- \ a tagged block could\n # be split in half so we need to keep track\
- \ of the\n # \"previous\" values\n self.last_tag = None\n\n \
- \ def work(self, input_items, output_items):\n # nicer aliases\n \
- \ out = output_items[0]\n inp = input_items[0]\n\n def print_phase_correction(start,\
- \ end, phase, freq):\n print(f\"Correction for block {start:3d} to\
- \ {end:3d} is phase = {phase: 2.4f} rad, freq = {freq * 1e3: 2.4f} milli rad/samp\"\
- )\n\n # read only phase tags\n tags = self.get_tags_in_window(0,\
- \ 0, len(inp))\n\n is_phase = lambda tag: pmt.to_python(tag.key) == \"\
- phase_est\"\n phase_tags = list(filter(is_phase, tags))\n\n #\
- \ FIXME: what if there are no tags? check that!\n\n print(f\"Processing\
- \ {len(inp)} samples, with {len(phase_tags)} tags\")\n\n # create a phase\
- \ correction vector\n phase = np.zeros(len(inp), dtype=np.float64)\n\n\
- \ # compute correction from previous block (if present)\n if self.last_tag:\n\
- \ # variables for first and last phase values\n lval =\
- \ pmt.to_python(self.last_tag.value)\n fval = pmt.to_python(phase_tags[0].value)\n\
- \n # compute index for phase vector\n fidx = phase_tags[0].offset\
- \ - self.sampnr\n\n # compute frequency offset\n nsamples\
- \ = phase_tags[0].offset - self.last_tag.offset\n freq = (fval -\
- \ lval) / nsamples\n\n # total phase correction is: phase + freq\
- \ * time\n phase[:fidx] = lval * np.ones(fidx) + freq * np.arange(0,\
- \ fidx)\n\n # compute correction\n print_phase_correction(0,\
- \ fidx, lval, freq)\n\n # iterate phase tags \"in the middle\"\n \
- \ # FIXME: what if there are less than 2 tags?\n # the code\
- \ below will probably crash\n for prev_tag, next_tag in zip(phase_tags,\
- \ phase_tags[1:]):\n # unpack pmt values\n pval = pmt.to_python(prev_tag.value)\n\
- \ nval = pmt.to_python(next_tag.value)\n\n # compute indexes\
- \ in phase vector\n pidx = prev_tag.offset - self.sampnr\n \
- \ nidx = next_tag.offset - self.sampnr\n\n # compute frquency\
- \ correction for block by linearly interpolating\n # frame values\n\
- \ nsamples = nidx - pidx\n freq = (nval - pval) / nsamples\n\
- \n # total correction is: phase + freq * time\n phase[pidx:nidx]\
- \ = pval * np.ones(nsamples) + freq * np.arange(0, nsamples)\n print_phase_correction(pidx,\
- \ nidx, pval, freq)\n\n # for the last block because the next tag is\
- \ unknown (in the future) we\n # cannot predict the frequency offset.\
- \ Thus we save the last tag for\n # the next call.\n self.last_tag\
- \ = phase_tags[-1]\n val = pmt.to_python(self.last_tag.value)\n \
- \ idx = self.last_tag.offset - self.sampnr\n\n phase[idx:] = val\n\n\
- \ # compute correction\n out[:] = inp * np.exp(-1j * phase)\n\n\
- \ # increment processed samples counter\n self.sampnr += len(inp)\n\
- \ return len(phase)\n"
+ \ out_sig=[np.complex64]\n )\n\n # we need to keep\
+ \ track of the aboslute number of samples that have\n # been processed,\
+ \ because tags have an absolute offset\n self.counter: np.uint64 = 0\n\
+ \n # because we do block processing, we need to keep track of the last\
+ \ tag\n # of the previous block to correct the first values of the next\
+ \ block\n self.last = None\n\n def block_phase(self, start, end):\n\
+ \ # compute number of samples in block\n nsamples = end.offset\
+ \ - start.offset\n\n # unpack pmt values into start and end phase\n \
+ \ sphase = pmt.to_python(start.value)\n ephase = pmt.to_python(end.value)\n\
+ \n # compute frequency offset between start and end\n freq = (sphase\
+ \ - ephase) / nsamples\n\n # debugging\n print(f\"Correction for\
+ \ block of {nsamples:2d} samples is \" \\\n f\"phase={sphase: .4f}\
+ \ rad and freq={freq*1e3: .4f} milli rad / sample\")\n\n # compute block\
+ \ values\n return sphase * np.ones(nsamples) + freq * np.arange(0, nsamples)\n\
+ \n def work(self, input_items, output_items):\n # FIXME: replace class\
+ \ counter with local variable\n # self.counter = self.nitems_written(0)\n\
+ \n # nicer aliases\n inp = input_items[0]\n out = output_items[0]\n\
+ \n # read phase tags\n is_phase = lambda tag: pmt.to_python(tag.key)\
+ \ == \"phase_est\"\n tags = list(filter(is_phase, self.get_tags_in_window(0,\
+ \ 0, len(inp))))\n\n # debugging\n print(f\"Processing {len(tags)}\
+ \ tags = {tags[-1].offset - tags[0].offset} \" \\\n f\"samples\
+ \ out of {len(inp)} input samples\")\n\n # compute \"the middle\"\n \
+ \ enough_samples = lambda pair: ((pair[1].offset - pair[0].offset) > 0)\n\
+ \ pairs = list(filter(enough_samples, zip(tags, tags[1:])))\n \
+ \ blocks = [ self.block_phase(start, end) for (start, end) in pairs ]\n \
+ \ middle = np.concatenate(blocks) if blocks else []\n\n # compute\
+ \ the remainder from the previous call\n nfront = tags[0].offset - self.counter\n\
+ \ print(f\"Processing {nfront} samples at the front of the buffer\")\n\
+ \ start = self.block_phase(self.last, tags[0])[-nfront:] \\\n \
+ \ if self.last else np.zeros(nfront)\n\n # compute values at\
+ \ the end\n nback = len(inp) - (tags[-1].offset - self.counter)\n \
+ \ print(f\"Processing {nback} samples at the back of the buffer\")\n \
+ \ end = np.ones(nback) * pmt.to_python(tags[-1].value)\n\n # compute\
+ \ correction\n correction = np.exp(-1j * np.concatenate([start, middle,\
+ \ end]))\n length = len(correction)\n\n # write outputs\n \
+ \ out[:length] = inp[:length] * correction\n self.counter += len(inp)\n\
+ \n # save last tag for next call\n self.last = tags[-1]\n\n \
+ \ # FIXME: should return `length' but then the last sample is not\n \
+ \ # included and self.last does something weird\n return len(out)\n"
affinity: ''
alias: ''
comment: ''