diff options
Diffstat (limited to 'tests/correlator/epy_block_0.py')
-rw-r--r-- | tests/correlator/epy_block_0.py | 124 |
1 files changed, 56 insertions, 68 deletions
diff --git a/tests/correlator/epy_block_0.py b/tests/correlator/epy_block_0.py index 90ad75e..9cb25bb 100644 --- a/tests/correlator/epy_block_0.py +++ b/tests/correlator/epy_block_0.py @@ -13,88 +13,76 @@ class blk(gr.sync_block): out_sig=[np.complex64] ) - # keep track of how many samples were processed, - # because tags have an absolute offset - self.sampnr: np.complex64 = 0 + # we need to keep track of the aboslute number of samples that have + # been processed, because tags have an absolute offset + self.counter: np.uint64 = 0 - # because of block processing a tagged block could - # be split in half so we need to keep track of the - # "previous" values - self.last_tag = None + # because we do block processing, we need to keep track of the last tag + # of the previous block to correct the first values of the next block + self.last = None - def work(self, input_items, output_items): - # nicer aliases - out = output_items[0] - inp = input_items[0] - - def print_phase_correction(start, end, phase, freq): - print(f"Correction for block {start:3d} to {end:3d} is phase = {phase: 2.4f} rad, freq = {freq * 1e3: 2.4f} milli rad/samp") - - # read only phase tags - tags = self.get_tags_in_window(0, 0, len(inp)) - - is_phase = lambda tag: pmt.to_python(tag.key) == "phase_est" - phase_tags = list(filter(is_phase, tags)) + def block_phase(self, start, end): + # compute number of samples in block + nsamples = end.offset - start.offset - # FIXME: what if there are no tags? check that! + # unpack pmt values into start and end phase + sphase = pmt.to_python(start.value) + ephase = pmt.to_python(end.value) - print(f"Processing {len(inp)} samples, with {len(phase_tags)} tags") + # compute frequency offset between start and end + freq = (sphase - ephase) / nsamples - # create a phase correction vector - phase = np.zeros(len(inp), dtype=np.float64) + # debugging + print(f"Correction for block of {nsamples:2d} samples is " \ + f"phase={sphase: .4f} rad and freq={freq*1e3: .4f} milli rad / sample") - # compute correction from previous block (if present) - if self.last_tag: - # variables for first and last phase values - lval = pmt.to_python(self.last_tag.value) - fval = pmt.to_python(phase_tags[0].value) + # compute block values + return sphase * np.ones(nsamples) + freq * np.arange(0, nsamples) - # compute index for phase vector - fidx = phase_tags[0].offset - self.sampnr - - # compute frequency offset - nsamples = phase_tags[0].offset - self.last_tag.offset - freq = (fval - lval) / nsamples + def work(self, input_items, output_items): + # FIXME: replace class counter with local variable + # self.counter = self.nitems_written(0) - # total phase correction is: phase + freq * time - phase[:fidx] = lval * np.ones(fidx) + freq * np.arange(0, fidx) + # nicer aliases + inp = input_items[0] + out = output_items[0] - # compute correction - print_phase_correction(0, fidx, lval, freq) + # read phase tags + is_phase = lambda tag: pmt.to_python(tag.key) == "phase_est" + tags = list(filter(is_phase, self.get_tags_in_window(0, 0, len(inp)))) - # iterate phase tags "in the middle" - # FIXME: what if there are less than 2 tags? - # the code below will probably crash - for prev_tag, next_tag in zip(phase_tags, phase_tags[1:]): - # unpack pmt values - pval = pmt.to_python(prev_tag.value) - nval = pmt.to_python(next_tag.value) + # debugging + print(f"Processing {len(tags)} tags = {tags[-1].offset - tags[0].offset} " \ + f"samples out of {len(inp)} input samples") - # compute indexes in phase vector - pidx = prev_tag.offset - self.sampnr - nidx = next_tag.offset - self.sampnr + # compute "the middle" + enough_samples = lambda pair: ((pair[1].offset - pair[0].offset) > 0) + pairs = list(filter(enough_samples, zip(tags, tags[1:]))) + blocks = [ self.block_phase(start, end) for (start, end) in pairs ] + middle = np.concatenate(blocks) if blocks else [] - # compute frquency correction for block by linearly interpolating - # frame values - nsamples = nidx - pidx - freq = (nval - pval) / nsamples + # compute the remainder from the previous call + nfront = tags[0].offset - self.counter + print(f"Processing {nfront} samples at the front of the buffer") + start = self.block_phase(self.last, tags[0])[-nfront:] \ + if self.last else np.zeros(nfront) - # total correction is: phase + freq * time - phase[pidx:nidx] = pval * np.ones(nsamples) + freq * np.arange(0, nsamples) - print_phase_correction(pidx, nidx, pval, freq) + # compute values at the end + nback = len(inp) - (tags[-1].offset - self.counter) + print(f"Processing {nback} samples at the back of the buffer") + end = np.ones(nback) * pmt.to_python(tags[-1].value) - # for the last block because the next tag is unknown (in the future) we - # cannot predict the frequency offset. Thus we save the last tag for - # the next call. - self.last_tag = phase_tags[-1] - val = pmt.to_python(self.last_tag.value) - idx = self.last_tag.offset - self.sampnr + # compute correction + correction = np.exp(-1j * np.concatenate([start, middle, end])) + length = len(correction) - phase[idx:] = val + # write outputs + out[:length] = inp[:length] * correction + self.counter += len(inp) - # compute correction - out[:] = inp * np.exp(-1j * phase) + # save last tag for next call + self.last = tags[-1] - # increment processed samples counter - self.sampnr += len(inp) - return len(phase) + # FIXME: should return `length' but then the last sample is not + # included and self.last does something weird + return len(out) |