1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2021 Naoki Pross.
import numpy as np
from numpy_ringbuffer import RingBuffer
from gnuradio import gr
from fadingui.logger import get_logger
log = get_logger("xor_frame_sync")
class xor_frame_sync(gr.sync_block):
"""
Performs a frame synchronization by XOR matching a preamble bit sequence
"""
def __init__(self, sync_pattern, buffer_size):
# TODO: buffer size should be in packets
gr.sync_block.__init__(self,
name="xor_frame_sync",
in_sig=[np.byte],
out_sig=[np.byte])
# binary pattern to match
self.pattern = sync_pattern
self.nbytes = len(self.pattern)
self.pattern_bits = np.unpackbits(np.array(self.pattern, dtype=np.uint8))[::-1]
self.nbits = len(self.pattern_bits)
log.debug(f"Loaded pattern {self.pattern_bits} length={self.nbits}")
assert(self.nbits % 8 == 0)
# packed buffer to delay the data
self.delaybuf = RingBuffer(buffer_size, dtype=np.uint8)
self.delay = 0
log.debug(f"Created delay ring buffer of size {self.delaybuf.maxlen}")
# unpacked buffer to compute correlation values, initially filled with zeros
self.corrbuf = RingBuffer(self.nbits, dtype=np.uint8)
self.corrbuf.extend(np.zeros(self.corrbuf.maxlen))
# synchronization state
self.synchronized = False
def xcorrelation(self, v):
"""
Compute the binary correlations between the stored pattern and
correlation buffer, while shifting v into the buffer.
Binary correlation between two bit vectors is just size of the
vector(s) minus the number of bits that differ.
@return: Number of bits of v that were shifted into the buffer
when the correlation matched. If no match is found
the return value is None.
"""
# this could be much faster with shifts, bitwise or and xor
# but this should do alright for the moment
v_bits = np.unpackbits(np.array(v, dtype=np.uint8))
for bitnr, b in enumerate(v_bits):
self.corrbuf.appendleft(b)
if (np.bitwise_xor(self.corrbuf, self.pattern_bits) == 0).all():
return bitnr
# no cross correlation found
return None
def work(self, input_items, output_items):
"""
Process the inputs, that means:
- Check that the buffer is synchronized, i.e. there is the sync
pattern appears every k bits, where k is the size of the packet.
- If the buffer is not synchronized, compute a binary cross
correlation to find how much the stream should be delayed.
"""
# array of samples, growing index = forward in time
inp = input_items[0]
inp_len = len(inp)
if not self.synchronized:
if inp_len > self.delaybuf.maxlen:
log.error("Input is bigger than delay buffer")
# FIXME: Makes the QA hang for some reason
raise NotImplemented
# create space for new samples in the delay buffer
self.delaybuf.extendleft(np.zeros(inp_len))
# Add values and while processing
for bytenr, value in enumerate(inp):
# save value in the buffer
# FIXME: this is wrong, it should be in reverse order
self.delaybuf.appendleft(value)
# compute the cross correlation
bitnr = self.xcorrelation(value)
if bitnr is not None:
# correlation was found
delay_bits = (bitnr - 7)
delay_bytes = 8 * (bytenr -1)
log.debug(f"Synchronized with delay_bytes={delay_bytes} delay_bits={delay_bits}")
# FIXME: add bit delay
self.delay = delay_bytes
self.synchronized = True
# Not aligned to bytes
if delay_bits != 0:
log.error("Not implemented: byte unaligned delay")
self.synchronized = False
self.delay = 0
# FIXME: Makes the QA hang for some reason
# raise NotImplemented
# stop processing inputs
break
if not self.synchronized:
log.warning(f"Processed {inp_len} samples but could not synchronize")
else:
self.delaybuf.extendleft(inp)
# return data with delay
out = output_items[0]
# FIXME: this is also wrong
# out[:] = self.delaybuf[:len(out)]
out[:] = inp[:]
inptmp = np.array(inp[:12], dtype=np.uint8)
inphex = np.array(list(map(hex, inptmp)))
inpbits = np.array(list(map("{:08b}".format, inptmp)))
log.debug(f"inp={inptmp}")
log.debug(f"inp={inphex}")
log.debug(f"inp={inpbits}")
# outtmp = np.array(out[:12], dtype=np.uint8)
# log.debug(f"out={outtmp}")
return inp_len
|