aboutsummaryrefslogtreecommitdiffstats
path: root/src/gr-fadingui/python/xor_frame_sync.py
blob: 735a0317bb8c51973e11d9ea95947abf5414df40 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2021 Naoki Pross.


import numpy as np
from numpy_ringbuffer import RingBuffer

from gnuradio import gr


class xor_frame_sync(gr.sync_block):
    """
    docstring for block xor_frame_sync
    """
    def __init__(self, sync_pattern, buffer_size):
        gr.sync_block.__init__(self,
            name="xor_frame_sync",
            in_sig=[np.byte],
            out_sig=[np.byte])

        # binary pattern to match
        self.pattern = np.array(sync_pattern, dtype=np.dtype("uint8"))
        self.nbits = len(sync_pattern)

        # buffer to delay the data
        self.delay_fifo = RingBuffer(buffer_size, dtype=np.byte)

        # buffers to store cross correlation data
        self.xcorr = RingBuffer(buffer_size, dtype=np.dtype("uint8"))

        # synchronization state
        self.synchronized = False
        self.delay = 0

    def xcorrelation(self):
        """
        Compute the binary correlation between the stream and the stored
        pattern. Binary correlation between two bit vectors is just size of the
        vector(s) minus the number of bits that differ.
        """
        unpacked = np.unpackbits(self.delay_fifo[0])
        return self.nbits - sum(np.logical_xor(unpacked, self.pattern))

    def work(self, input_items, output_items):
        """
        Process the inputs, that means:

            - Check that the buffer is synchronized, i.e. there is the sync
              pattern appears every k bits, where k is the size of the packet.

            - If the buffer is not synchronized, compute a binary cross
              correlation to find how by much the stream should be delayed.
        """
        inp = input_items[0]
        out = output_items[0]

        # Add data to delay buffer
        self.delay_fifo.appendleft(inp)

        # TODO: check for synchronization, else compute

        # Compute correlation
        if not self.synchronized:
            self.xcorr.append(self.xcorrelation())

            peak = np.argmax(self.xcorr)
            if self.xcorr[peak] != self.nbits:
                print(f"Warning! XOR correlation is not perfect (peak value = {self.xcorr[peak]})")

            self.delay = peak
            self.synchronized = True

        # return data with delay
        out[:] = self.delay_fifo[self.delay]

        return len(output_items[0])