I've been playing with GNU Radio and an SDR and decided to try out their "capture the flag" challenges.
I haven't gotten to far yet, but one of the challenges has is a morse code signal so I decided to write a python block to read it.
It took a little while to figure out that I needed to use a general block or the input simply freezes or gnu radio crashes. The flow graph must decimate down so the sample rate matches one symbol so it will likely only work with computer generated signals.
Anyways, heres the flow graph:
It uses a threshold to change the input to a boolean level, the keep 1 in N reduces the sample rate into the python block to match the morse code letter spacing (eg duration of a single high or low level). The code reader then outputs the words it reads.
******* MESSAGE DEBUG PRINT ********
THIS
************************************
******* MESSAGE DEBUG PRINT ********
IS
************************************
******* MESSAGE DEBUG PRINT ********
VE3IRR
***********************************
Here's the code:
"""
Copyright CodeLV
License GPL v3
"""
import pmt
import numpy as np
from gnuradio import gr
class blk(gr.basic_block):
"""Morse code reader"""
codes = {
'*-': 'A',
'-***': 'B',
'-*-*': 'C',
'-**': 'D',
'*': 'E',
'**-*': 'F',
'--*': 'G',
'****': 'H',
'**': 'I',
'*---': 'J',
'-*-': 'K',
'*-**': 'L',
'--': 'M',
'-*': 'N',
'---': 'O',
'*--*': 'P',
'--*-': 'Q',
'*-*': 'R',
'***': 'S',
'-': 'T',
'**-': 'U',
'***-': 'V',
'*--': 'W',
'-**-': 'X',
'-*--': 'Y',
'--**': 'Z',
'*----': '1',
'**---': '2',
'***--': '3',
'****-': '4',
'*****': '5',
'-****': '6',
'--***': '7',
'---**': '8',
'----*': '9',
'-----': '0',
'*-*-*-': '.',
'--**--': ',',
'**--**': '?',
'*----*': "'",
'-*-*--': "!",
'-**-*': "/",
'-*--*': "(",
'-*--*-': ")",
'*-***': "&",
'---***': ":",
'-*-*-*': ";",
'-***-': "=",
'*-*-*': "+",
'-****-': "-",
'**--*-': "_",
'*-**-*': '"',
'***-**-': '$',
'*--*-*': '@',
'***-*-': '*end*',
'********': '*error*',
'-*-': '*ok*',
'-*-*-': '*start*',
'*-*-*': '*new*',
'***-*': '*verified*',
'*-***': '*wait*',
}
def __init__(
self,
symbol_spacing=3,
word_spacing=5,
message_limit=100,
message_size_limit=1000,
):
"""
Parameters
----------
symbol_spacing: int
Spacing between symbols / characters
word_spacing: int
Spacing between words
message_limit: int
Max number of words to decode
message_size_limit: int
Max message size
"""
gr.basic_block.__init__(
self,
name='Morse code reader', # will show up in GRC
in_sig=[np.int8],
out_sig=[]
)
assert isinstance(symbol_spacing, int), "Symbol spacing must be int"
assert isinstance(word_spacing, int), "Word spacing must be int"
assert symbol_spacing < word_spacing, "Symbol spacing must be less than word spacing"
self.symbol_spacing = symbol_spacing
self.word_spacing = word_spacing
self.port_name = 'out'
self.message_port_register_out(pmt.intern(self.port_name))
self.key = []
self.message = []
self.last_level = False
self.spacing = 0
self.code = '*'
# Anything over this key size is an error
self.key_limit = 8
# Stop reading after this limit as a sanity check in case something goes haywire
self.message_size_limit = message_size_limit
# Stop reading after this limit as a sanity check in case something goes haywire
self.message_limit = message_limit
def flush_message(self):
if not self.message:
return
self.message_port_pub(pmt.intern(self.port_name), pmt.intern(''.join(self.message)))
self.message = []
# Apply limit to avoid UI freezing if sampling/data is incorrect
self.message_limit -= 1
if self.message_limit == 0:
print("Warning: Message limit reached. Decoding stopped")
def lookup_code(self):
code = ''.join(self.key)
self.key = []
return self.codes.get(code, "{}")
def general_work(self, input_items, output_items):
if self.message_limit > 0:
for c in input_items[0]:
level = bool(c)
last_level = self.last_level
self.last_level = level
if level and last_level:
# Flat high
#assert self.code == '*', "Got more than two high samples in a row, is sample rate correct?"
self.code = '-'
elif level:
# Rising edge
self.spacing = 0
self.code = '*'
elif last_level:
# Falling edge
self.spacing = 1
self.key.append(self.code)
assert len(self.key) <= self.key_limit, f"Key limit exceeded: {self.key}"
else:
# Flat low
self.spacing += 1
if self.spacing == self.word_spacing:
# End of word
self.flush_message()
self.key = []
elif self.spacing == self.symbol_spacing and self.key:
# End of key
value = self.lookup_code() # Clears key
self.message.append(value)
assert len(self.message) <= self.message_size_limit, f"Message size limit exceeded: {self.message}"
self.consume(0, len(input_items[0]))
return 0
There's probably better ways to do it but it worked.