GNU Radio Morse code decoder block

November 28, 2022

I've been playing with GNU Radio and an SDR and decided to try out their "capture the flag" challenges.

I haven't gotten to far yet, but one of the challenges has is a morse code signal so I decided to write a python block to read it.

It took a little while to figure out that I needed to use a general block or the input simply freezes or gnu radio crashes. The flow graph must decimate down so the sample rate matches one symbol so it will likely only work with computer generated signals.

Anyways, heres the flow graph:

GNU Radio Morse Code Decoder

It uses a threshold to change the input to a boolean level, the keep 1 in N reduces the sample rate into the python block to match the morse code letter spacing (eg duration of a single high or low level). The code reader then outputs the words it reads.

******* MESSAGE DEBUG PRINT ********
THIS
************************************
******* MESSAGE DEBUG PRINT ********
IS
************************************
******* MESSAGE DEBUG PRINT ********
VE3IRR
***********************************

Here's the code:

"""
Copyright CodeLV
License GPL v3
"""
import pmt
import numpy as np
from gnuradio import gr


class blk(gr.basic_block): 
    """Morse code reader"""

    codes = {
        '*-': 'A',
        '-***': 'B',
        '-*-*': 'C',
        '-**': 'D',
        '*': 'E',
        '**-*': 'F',
        '--*': 'G',
        '****': 'H',
        '**': 'I',
        '*---': 'J',
        '-*-': 'K',
        '*-**': 'L',
        '--': 'M',
        '-*': 'N',
        '---': 'O',
        '*--*': 'P',
        '--*-': 'Q',
        '*-*': 'R',
        '***': 'S',
        '-': 'T',
        '**-': 'U',
        '***-': 'V',
        '*--': 'W',
        '-**-': 'X',
        '-*--': 'Y',
        '--**': 'Z',

        '*----': '1',
        '**---': '2',
        '***--': '3',
        '****-': '4',
        '*****': '5',
        '-****': '6',
        '--***': '7',
        '---**': '8',
        '----*': '9',
        '-----': '0',

        '*-*-*-': '.',
        '--**--': ',',
        '**--**': '?',
        '*----*': "'",
        '-*-*--': "!",

        '-**-*': "/",
        '-*--*': "(",
        '-*--*-': ")",
        '*-***': "&",
        '---***': ":",
        '-*-*-*': ";",
        '-***-': "=",
        '*-*-*': "+",
        '-****-': "-",
        '**--*-': "_",
        '*-**-*': '"',
        '***-**-': '$',
        '*--*-*': '@',

        '***-*-': '*end*',
        '********': '*error*',
        '-*-': '*ok*',
        '-*-*-': '*start*',
        '*-*-*': '*new*',
        '***-*': '*verified*',
        '*-***': '*wait*',
    }

    def __init__(
        self, 
        symbol_spacing=3, 
        word_spacing=5,
        message_limit=100,
        message_size_limit=1000,
    ):  
        """
        Parameters
        ----------
        symbol_spacing: int
            Spacing between symbols / characters
        word_spacing: int
            Spacing between words
        message_limit: int
            Max number of words to decode
        message_size_limit: int
            Max message size

        """
        gr.basic_block.__init__(
            self,
            name='Morse code reader',   # will show up in GRC
            in_sig=[np.int8],
            out_sig=[]
        )
        assert isinstance(symbol_spacing, int), "Symbol spacing must be int"
        assert isinstance(word_spacing, int), "Word spacing must be int"
        assert symbol_spacing < word_spacing, "Symbol spacing must be less than word spacing"
        self.symbol_spacing = symbol_spacing
        self.word_spacing = word_spacing
        self.port_name = 'out'
        self.message_port_register_out(pmt.intern(self.port_name))
        self.key = []
        self.message = []
        self.last_level = False
        self.spacing = 0

        self.code = '*'

        # Anything over this key size is an error
        self.key_limit = 8

        # Stop reading after this limit as a sanity check in case something goes haywire
        self.message_size_limit = message_size_limit

        # Stop reading after this limit as a sanity check in case something goes haywire
        self.message_limit = message_limit

    def flush_message(self):
        if not self.message:
            return
        self.message_port_pub(pmt.intern(self.port_name), pmt.intern(''.join(self.message)))
        self.message = []

        # Apply limit to avoid UI freezing if sampling/data is incorrect
        self.message_limit -= 1
        if self.message_limit == 0:
            print("Warning: Message limit reached. Decoding stopped") 

    def lookup_code(self):
        code = ''.join(self.key)
        self.key = []
        return self.codes.get(code, "{}")

    def general_work(self, input_items, output_items):
        if self.message_limit > 0:
            for c in input_items[0]:
                level = bool(c)
                last_level = self.last_level
                self.last_level = level
                if level and last_level:
                    # Flat high
                    #assert self.code == '*', "Got more than two high samples in a row, is sample rate correct?" 
                    self.code = '-'
                elif level:
                    # Rising edge
                    self.spacing = 0 
                    self.code = '*'
                elif last_level:
                    # Falling edge
                    self.spacing = 1 
                    self.key.append(self.code)
                    assert len(self.key) <= self.key_limit, f"Key limit exceeded: {self.key}"
                else:
                    # Flat low
                    self.spacing += 1
                    if self.spacing == self.word_spacing:
                        # End of word
                        self.flush_message()
                        self.key = [] 
                    elif self.spacing == self.symbol_spacing and self.key:
                        # End of key
                        value = self.lookup_code() # Clears key
                        self.message.append(value)
                        assert len(self.message) <= self.message_size_limit, f"Message size limit exceeded: {self.message}"

        self.consume(0, len(input_items[0]))
        return 0

There's probably better ways to do it but it worked.