aboutsummaryrefslogtreecommitdiffstats
path: root/tests/correlator/epy_block_0.py
blob: 1fa47949d1dd7afec8f35fc49d533f3f3902bf3e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import pmt

import numpy as np
from gnuradio import gr


class blk(gr.sync_block):
    def __init__(self):
        gr.sync_block.__init__(
            self,
            name='Phase Lock',
            in_sig=[np.complex64],
            out_sig=[np.complex64]
        )

        # keep track of how many samples were processed,
        # because tags have an absolute offset
        self.sampnr: np.complex64 = 0

        # because of block processing a tagged block could
        # be split in half so we need to keep track of the
        # "previous" values
        self.last_phase = 0

    def work(self, input_items, output_items):
        # TODO: interpolate phase values for frequency correction

        out = output_items[0]
        inp = input_items[0]

        # create a phase correction vector
        phase = np.zeros(len(inp), dtype=np.float64)

        # read tags
        tags = self.get_tags_in_window(0, 0, len(inp))

        # get only phase tags
        is_phase = lambda tag: pmt.to_python(tag.key) == "phase_est"
        phase_tags = list(filter(is_phase, tags))

        print(f"Processing {len(inp)} samples, with {len(phase_tags)} tags")

        # compute correction from previous block
        first_tag = phase_tags[0]
        first_idx = first_tag.offset - self.sampnr
        phase[:first_idx] = self.last_phase

        # iterate phase tags "in the middle"
        for prev_tag, next_tag in zip(phase_tags, phase_tags[1:]):
            # unpack pmt values
            pval = pmt.to_python(prev_tag.value)

            # compute indexes in phase vector
            pidx = prev_tag.offset - self.sampnr
            idx = next_tag.offset - self.sampnr

            # compute phase correction for block
            phase[pidx:idx] = pval
            print(f"Correction for block {pidx} to {idx} is {pval}")

        # compute the remaining part of the block
        last_tag = phase_tags[-1]
        last_val = pmt.to_python(last_tag.value)
        last_idx = last_tag.offset - self.sampnr

        phase[last_idx:] = last_val

        # save values for next call
        self.last_phase = last_val

        # mark samples as processed and compute to output
        self.sampnr += len(inp)
        out[:] = inp * np.exp(-1j * phase)

        return len(out)