aboutsummaryrefslogtreecommitdiffstats
path: root/src/gr-fadingui/python/xor_frame_sync.py
blob: 312ad2abf9bf85ee4f9fdb46f191b20269f8434e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2021 Naoki Pross.


import numpy as np
from numpy_ringbuffer import RingBuffer

from gnuradio import gr

from fadingui.logger import get_logger
log = get_logger("xor_frame_sync")


class xor_frame_sync(gr.sync_block):
    """
    Performs a frame synchronization by XOR matching a preamble bit sequence
    """
    def __init__(self, sync_pattern, buffer_size):
        # TODO: buffer size should be in packets
        gr.sync_block.__init__(self,
            name="xor_frame_sync",
            in_sig=[np.byte],
            out_sig=[np.byte])

        # binary pattern to match
        self.pattern = sync_pattern
        self.nbytes = len(self.pattern)

        self.pattern_bits = np.unpackbits(np.array(self.pattern, dtype=np.uint8))[::-1]
        self.nbits = len(self.pattern_bits)

        log.debug(f"Loaded pattern {self.pattern_bits} length={self.nbits}")
        assert(self.nbits % 8 == 0)

        # packed buffer to delay the data
        self.delaybuf = RingBuffer(buffer_size, dtype=np.uint8)
        self.delay = 0

        log.debug(f"Created delay ring buffer of size {self.delaybuf.maxlen}")

        # unpacked buffer to compute correlation values, initially filled with zeros
        self.corrbuf = RingBuffer(self.nbits, dtype=np.uint8)
        self.corrbuf.extend(np.zeros(self.corrbuf.maxlen))

        # synchronization state
        self.synchronized = False

    def xcorrelation(self, v):
        """
        Compute the binary correlations between the stored pattern and
        correlation buffer, while shifting v into the buffer.

        Binary correlation between two bit vectors is just size of the
        vector(s) minus the number of bits that differ.

        @return: Number of bits of v that were shifted into the buffer
                 when the correlation matched. If no match is found
                 the return value is None.
        """
        # this could be much faster with shifts, bitwise or and xor
        # but this should do alright for the moment
        v_bits = np.unpackbits(np.array(v, dtype=np.uint8))
        for bitnr, b in enumerate(v_bits):
            self.corrbuf.appendleft(b)
            if (np.bitwise_xor(self.corrbuf, self.pattern_bits) == 0).all():
                return bitnr

        # no cross correlation found
        return None

    def work(self, input_items, output_items):
        """
        Process the inputs, that means:

            - Check that the buffer is synchronized, i.e. there is the sync
              pattern appears every k bits, where k is the size of the packet.

            - If the buffer is not synchronized, compute a binary cross
              correlation to find how much the stream should be delayed.
        """
        # array of samples, growing index = forward in time
        inp = input_items[0]
        inp_len = len(inp)

        if not self.synchronized:
            if inp_len > self.delaybuf.maxlen:
                log.error("Input is bigger than delay buffer")

                # FIXME: Makes the QA hang for some reason
                raise NotImplemented

            # create space for new samples in the delay buffer
            self.delaybuf.extendleft(np.zeros(inp_len))

            # Add values and while processing
            for bytenr, value in enumerate(inp):
                # save value in the buffer
                # FIXME: this is wrong, it should be in reverse order
                self.delaybuf.appendleft(value)

                # compute the cross correlation
                bitnr = self.xcorrelation(value)
                if bitnr is not None:
                    # correlation was found
                    delay_bits = (bitnr - 7)
                    delay_bytes = 8 * (bytenr -1)

                    log.debug(f"Synchronized with delay_bytes={delay_bytes} delay_bits={delay_bits}")

                    # FIXME: add bit delay
                    self.delay = delay_bytes
                    self.synchronized = True

                    # Not aligned to bytes
                    if delay_bits != 0:
                        log.error("Not implemented: byte unaligned delay")
                        self.synchronized = False
                        self.delay = 0

                        # FIXME: Makes the QA hang for some reason
                        # raise NotImplemented

                    # stop processing inputs
                    break

            if not self.synchronized:
                log.warning(f"Processed {inp_len} samples but could not synchronize")
        else:
            self.delaybuf.extendleft(inp)

        # return data with delay
        out = output_items[0]
        # FIXME: this is also wrong
        out[:] = self.delaybuf[:len(out)]


        inptmp = np.array(inp[:12], dtype=np.uint8)
        inphex = np.array(list(map(hex, inptmp)))
        log.debug(f"inp={inptmp}")
        log.debug(f"inp={inphex}")

        # outtmp = np.array(out[:12], dtype=np.uint8)
        # log.debug(f"out={outtmp}")

        return len(out)