aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--tests/correlator/correlator.grc82
-rw-r--r--tests/correlator/epy_block_0.py124
2 files changed, 95 insertions, 111 deletions
diff --git a/tests/correlator/correlator.grc b/tests/correlator/correlator.grc
index f54430c..57c1762 100644
--- a/tests/correlator/correlator.grc
+++ b/tests/correlator/correlator.grc
@@ -526,49 +526,45 @@ blocks:
_source_code: "import pmt\n\nimport numpy as np\nfrom gnuradio import gr\n\n\n\
class blk(gr.sync_block):\n def __init__(self):\n gr.sync_block.__init__(\n\
\ self,\n name='Phase Lock',\n in_sig=[np.complex64],\n\
- \ out_sig=[np.complex64]\n )\n\n # keep track of how\
- \ many samples were processed,\n # because tags have an absolute offset\n\
- \ self.sampnr: np.complex64 = 0\n\n # because of block processing\
- \ a tagged block could\n # be split in half so we need to keep track\
- \ of the\n # \"previous\" values\n self.last_tag = None\n\n \
- \ def work(self, input_items, output_items):\n # nicer aliases\n \
- \ out = output_items[0]\n inp = input_items[0]\n\n def print_phase_correction(start,\
- \ end, phase, freq):\n print(f\"Correction for block {start:3d} to\
- \ {end:3d} is phase = {phase: 2.4f} rad, freq = {freq * 1e3: 2.4f} milli rad/samp\"\
- )\n\n # read only phase tags\n tags = self.get_tags_in_window(0,\
- \ 0, len(inp))\n\n is_phase = lambda tag: pmt.to_python(tag.key) == \"\
- phase_est\"\n phase_tags = list(filter(is_phase, tags))\n\n #\
- \ FIXME: what if there are no tags? check that!\n\n print(f\"Processing\
- \ {len(inp)} samples, with {len(phase_tags)} tags\")\n\n # create a phase\
- \ correction vector\n phase = np.zeros(len(inp), dtype=np.float64)\n\n\
- \ # compute correction from previous block (if present)\n if self.last_tag:\n\
- \ # variables for first and last phase values\n lval =\
- \ pmt.to_python(self.last_tag.value)\n fval = pmt.to_python(phase_tags[0].value)\n\
- \n # compute index for phase vector\n fidx = phase_tags[0].offset\
- \ - self.sampnr\n\n # compute frequency offset\n nsamples\
- \ = phase_tags[0].offset - self.last_tag.offset\n freq = (fval -\
- \ lval) / nsamples\n\n # total phase correction is: phase + freq\
- \ * time\n phase[:fidx] = lval * np.ones(fidx) + freq * np.arange(0,\
- \ fidx)\n\n # compute correction\n print_phase_correction(0,\
- \ fidx, lval, freq)\n\n # iterate phase tags \"in the middle\"\n \
- \ # FIXME: what if there are less than 2 tags?\n # the code\
- \ below will probably crash\n for prev_tag, next_tag in zip(phase_tags,\
- \ phase_tags[1:]):\n # unpack pmt values\n pval = pmt.to_python(prev_tag.value)\n\
- \ nval = pmt.to_python(next_tag.value)\n\n # compute indexes\
- \ in phase vector\n pidx = prev_tag.offset - self.sampnr\n \
- \ nidx = next_tag.offset - self.sampnr\n\n # compute frquency\
- \ correction for block by linearly interpolating\n # frame values\n\
- \ nsamples = nidx - pidx\n freq = (nval - pval) / nsamples\n\
- \n # total correction is: phase + freq * time\n phase[pidx:nidx]\
- \ = pval * np.ones(nsamples) + freq * np.arange(0, nsamples)\n print_phase_correction(pidx,\
- \ nidx, pval, freq)\n\n # for the last block because the next tag is\
- \ unknown (in the future) we\n # cannot predict the frequency offset.\
- \ Thus we save the last tag for\n # the next call.\n self.last_tag\
- \ = phase_tags[-1]\n val = pmt.to_python(self.last_tag.value)\n \
- \ idx = self.last_tag.offset - self.sampnr\n\n phase[idx:] = val\n\n\
- \ # compute correction\n out[:] = inp * np.exp(-1j * phase)\n\n\
- \ # increment processed samples counter\n self.sampnr += len(inp)\n\
- \ return len(phase)\n"
+ \ out_sig=[np.complex64]\n )\n\n # we need to keep\
+ \ track of the aboslute number of samples that have\n # been processed,\
+ \ because tags have an absolute offset\n self.counter: np.uint64 = 0\n\
+ \n # because we do block processing, we need to keep track of the last\
+ \ tag\n # of the previous block to correct the first values of the next\
+ \ block\n self.last = None\n\n def block_phase(self, start, end):\n\
+ \ # compute number of samples in block\n nsamples = end.offset\
+ \ - start.offset\n\n # unpack pmt values into start and end phase\n \
+ \ sphase = pmt.to_python(start.value)\n ephase = pmt.to_python(end.value)\n\
+ \n # compute frequency offset between start and end\n freq = (sphase\
+ \ - ephase) / nsamples\n\n # debugging\n print(f\"Correction for\
+ \ block of {nsamples:2d} samples is \" \\\n f\"phase={sphase: .4f}\
+ \ rad and freq={freq*1e3: .4f} milli rad / sample\")\n\n # compute block\
+ \ values\n return sphase * np.ones(nsamples) + freq * np.arange(0, nsamples)\n\
+ \n def work(self, input_items, output_items):\n # FIXME: replace class\
+ \ counter with local variable\n # self.counter = self.nitems_written(0)\n\
+ \n # nicer aliases\n inp = input_items[0]\n out = output_items[0]\n\
+ \n # read phase tags\n is_phase = lambda tag: pmt.to_python(tag.key)\
+ \ == \"phase_est\"\n tags = list(filter(is_phase, self.get_tags_in_window(0,\
+ \ 0, len(inp))))\n\n # debugging\n print(f\"Processing {len(tags)}\
+ \ tags = {tags[-1].offset - tags[0].offset} \" \\\n f\"samples\
+ \ out of {len(inp)} input samples\")\n\n # compute \"the middle\"\n \
+ \ enough_samples = lambda pair: ((pair[1].offset - pair[0].offset) > 0)\n\
+ \ pairs = list(filter(enough_samples, zip(tags, tags[1:])))\n \
+ \ blocks = [ self.block_phase(start, end) for (start, end) in pairs ]\n \
+ \ middle = np.concatenate(blocks) if blocks else []\n\n # compute\
+ \ the remainder from the previous call\n nfront = tags[0].offset - self.counter\n\
+ \ print(f\"Processing {nfront} samples at the front of the buffer\")\n\
+ \ start = self.block_phase(self.last, tags[0])[-nfront:] \\\n \
+ \ if self.last else np.zeros(nfront)\n\n # compute values at\
+ \ the end\n nback = len(inp) - (tags[-1].offset - self.counter)\n \
+ \ print(f\"Processing {nback} samples at the back of the buffer\")\n \
+ \ end = np.ones(nback) * pmt.to_python(tags[-1].value)\n\n # compute\
+ \ correction\n correction = np.exp(-1j * np.concatenate([start, middle,\
+ \ end]))\n length = len(correction)\n\n # write outputs\n \
+ \ out[:length] = inp[:length] * correction\n self.counter += len(inp)\n\
+ \n # save last tag for next call\n self.last = tags[-1]\n\n \
+ \ # FIXME: should return `length' but then the last sample is not\n \
+ \ # included and self.last does something weird\n return len(out)\n"
affinity: ''
alias: ''
comment: ''
diff --git a/tests/correlator/epy_block_0.py b/tests/correlator/epy_block_0.py
index 90ad75e..9cb25bb 100644
--- a/tests/correlator/epy_block_0.py
+++ b/tests/correlator/epy_block_0.py
@@ -13,88 +13,76 @@ class blk(gr.sync_block):
out_sig=[np.complex64]
)
- # keep track of how many samples were processed,
- # because tags have an absolute offset
- self.sampnr: np.complex64 = 0
+ # we need to keep track of the aboslute number of samples that have
+ # been processed, because tags have an absolute offset
+ self.counter: np.uint64 = 0
- # because of block processing a tagged block could
- # be split in half so we need to keep track of the
- # "previous" values
- self.last_tag = None
+ # because we do block processing, we need to keep track of the last tag
+ # of the previous block to correct the first values of the next block
+ self.last = None
- def work(self, input_items, output_items):
- # nicer aliases
- out = output_items[0]
- inp = input_items[0]
-
- def print_phase_correction(start, end, phase, freq):
- print(f"Correction for block {start:3d} to {end:3d} is phase = {phase: 2.4f} rad, freq = {freq * 1e3: 2.4f} milli rad/samp")
-
- # read only phase tags
- tags = self.get_tags_in_window(0, 0, len(inp))
-
- is_phase = lambda tag: pmt.to_python(tag.key) == "phase_est"
- phase_tags = list(filter(is_phase, tags))
+ def block_phase(self, start, end):
+ # compute number of samples in block
+ nsamples = end.offset - start.offset
- # FIXME: what if there are no tags? check that!
+ # unpack pmt values into start and end phase
+ sphase = pmt.to_python(start.value)
+ ephase = pmt.to_python(end.value)
- print(f"Processing {len(inp)} samples, with {len(phase_tags)} tags")
+ # compute frequency offset between start and end
+ freq = (sphase - ephase) / nsamples
- # create a phase correction vector
- phase = np.zeros(len(inp), dtype=np.float64)
+ # debugging
+ print(f"Correction for block of {nsamples:2d} samples is " \
+ f"phase={sphase: .4f} rad and freq={freq*1e3: .4f} milli rad / sample")
- # compute correction from previous block (if present)
- if self.last_tag:
- # variables for first and last phase values
- lval = pmt.to_python(self.last_tag.value)
- fval = pmt.to_python(phase_tags[0].value)
+ # compute block values
+ return sphase * np.ones(nsamples) + freq * np.arange(0, nsamples)
- # compute index for phase vector
- fidx = phase_tags[0].offset - self.sampnr
-
- # compute frequency offset
- nsamples = phase_tags[0].offset - self.last_tag.offset
- freq = (fval - lval) / nsamples
+ def work(self, input_items, output_items):
+ # FIXME: replace class counter with local variable
+ # self.counter = self.nitems_written(0)
- # total phase correction is: phase + freq * time
- phase[:fidx] = lval * np.ones(fidx) + freq * np.arange(0, fidx)
+ # nicer aliases
+ inp = input_items[0]
+ out = output_items[0]
- # compute correction
- print_phase_correction(0, fidx, lval, freq)
+ # read phase tags
+ is_phase = lambda tag: pmt.to_python(tag.key) == "phase_est"
+ tags = list(filter(is_phase, self.get_tags_in_window(0, 0, len(inp))))
- # iterate phase tags "in the middle"
- # FIXME: what if there are less than 2 tags?
- # the code below will probably crash
- for prev_tag, next_tag in zip(phase_tags, phase_tags[1:]):
- # unpack pmt values
- pval = pmt.to_python(prev_tag.value)
- nval = pmt.to_python(next_tag.value)
+ # debugging
+ print(f"Processing {len(tags)} tags = {tags[-1].offset - tags[0].offset} " \
+ f"samples out of {len(inp)} input samples")
- # compute indexes in phase vector
- pidx = prev_tag.offset - self.sampnr
- nidx = next_tag.offset - self.sampnr
+ # compute "the middle"
+ enough_samples = lambda pair: ((pair[1].offset - pair[0].offset) > 0)
+ pairs = list(filter(enough_samples, zip(tags, tags[1:])))
+ blocks = [ self.block_phase(start, end) for (start, end) in pairs ]
+ middle = np.concatenate(blocks) if blocks else []
- # compute frquency correction for block by linearly interpolating
- # frame values
- nsamples = nidx - pidx
- freq = (nval - pval) / nsamples
+ # compute the remainder from the previous call
+ nfront = tags[0].offset - self.counter
+ print(f"Processing {nfront} samples at the front of the buffer")
+ start = self.block_phase(self.last, tags[0])[-nfront:] \
+ if self.last else np.zeros(nfront)
- # total correction is: phase + freq * time
- phase[pidx:nidx] = pval * np.ones(nsamples) + freq * np.arange(0, nsamples)
- print_phase_correction(pidx, nidx, pval, freq)
+ # compute values at the end
+ nback = len(inp) - (tags[-1].offset - self.counter)
+ print(f"Processing {nback} samples at the back of the buffer")
+ end = np.ones(nback) * pmt.to_python(tags[-1].value)
- # for the last block because the next tag is unknown (in the future) we
- # cannot predict the frequency offset. Thus we save the last tag for
- # the next call.
- self.last_tag = phase_tags[-1]
- val = pmt.to_python(self.last_tag.value)
- idx = self.last_tag.offset - self.sampnr
+ # compute correction
+ correction = np.exp(-1j * np.concatenate([start, middle, end]))
+ length = len(correction)
- phase[idx:] = val
+ # write outputs
+ out[:length] = inp[:length] * correction
+ self.counter += len(inp)
- # compute correction
- out[:] = inp * np.exp(-1j * phase)
+ # save last tag for next call
+ self.last = tags[-1]
- # increment processed samples counter
- self.sampnr += len(inp)
- return len(phase)
+ # FIXME: should return `length' but then the last sample is not
+ # included and self.last does something weird
+ return len(out)