meshtastic.tests.test_node API documentation

Meshtastic unit tests for node.py

Expand source code
"""Meshtastic unit tests for node.py"""

import logging
import re
from unittest.mock import MagicMock, patch

import pytest

from ..protobuf import admin_pb2, localonly_pb2, config_pb2
from ..protobuf.channel_pb2 import Channel # pylint: disable=E0611
from ..node import Node
from ..serial_interface import SerialInterface
from ..mesh_interface import MeshInterface

# from ..config_pb2 import Config
# from ..cannedmessages_pb2 import (CannedMessagePluginMessagePart1, CannedMessagePluginMessagePart2,
#                                  CannedMessagePluginMessagePart3, CannedMessagePluginMessagePart4,
#                                  CannedMessagePluginMessagePart5)
# from ..util import Timeout


@pytest.mark.unit
def test_node(capsys):
    """Test that we can instantiate a Node"""
    iface = MagicMock(autospec=SerialInterface)
    with patch("meshtastic.serial_interface.SerialInterface", return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, "bar", noProto=True)
        lc = localonly_pb2.LocalConfig()
        anode.localConfig = lc
        lc.lora.CopyFrom(config_pb2.Config.LoRaConfig())
        anode.moduleConfig = localonly_pb2.LocalModuleConfig()
        anode.showInfo()
        out, err = capsys.readouterr()
        assert re.search(r'Preferences', out)
        assert re.search(r'Module preferences', out)
        assert re.search(r'Channels', out)
        assert re.search(r'Primary channel URL', out)
        assert not re.search(r'remote node', out)
        assert err == ''

# TODO
# @pytest.mark.unit
# def test_node_requestConfig(capsys):
#    """Test run requestConfig"""
#    iface = MagicMock(autospec=SerialInterface)
#    amesg = MagicMock(autospec=AdminMessage)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
#            anode = Node(mo, 'bar')
#            anode.requestConfig()
#    out, err = capsys.readouterr()
#    assert re.search(r'Requesting preferences from remote node', out, re.MULTILINE)
#    assert err == ''


# @pytest.mark.unit
# def test_node_get_canned_message_with_all_parts(capsys):
#    """Test run get_canned_message()"""
#    iface = MagicMock(autospec=SerialInterface)
#    amesg = MagicMock(autospec=AdminMessage)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
#            # we have a sleep in this method, so override it so it goes fast
#            with patch('time.sleep'):
#                anode = Node(mo, 'bar')
#                anode.cannedPluginMessagePart1 = 'a'
#                anode.cannedPluginMessagePart2 = 'b'
#                anode.cannedPluginMessagePart3 = 'c'
#                anode.cannedPluginMessagePart4 = 'd'
#                anode.cannedPluginMessagePart5 = 'e'
#                anode.get_canned_message()
#    out, err = capsys.readouterr()
#    assert re.search(r'canned_plugin_message:abcde', out, re.MULTILINE)
#    assert err == ''
#
#
# @pytest.mark.unit
# def test_node_get_canned_message_with_some_parts(capsys):
#    """Test run get_canned_message()"""
#    iface = MagicMock(autospec=SerialInterface)
#    amesg = MagicMock(autospec=AdminMessage)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
#            # we have a sleep in this method, so override it so it goes fast
#            with patch('time.sleep'):
#                anode = Node(mo, 'bar')
#                anode.cannedPluginMessagePart1 = 'a'
#                anode.get_canned_message()
#    out, err = capsys.readouterr()
#    assert re.search(r'canned_plugin_message:a', out, re.MULTILINE)
#    assert err == ''
#
#
# @pytest.mark.unit
# def test_node_set_canned_message_one_part(caplog):
#    """Test run set_canned_message()"""
#    iface = MagicMock(autospec=SerialInterface)
#    amesg = MagicMock(autospec=AdminMessage)
#    with caplog.at_level(logging.DEBUG):
#        with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#            with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
#                anode = Node(mo, 'bar')
#                anode.set_canned_message('foo')
#    assert re.search(r"Setting canned message 'foo' part 1", caplog.text, re.MULTILINE)
#    assert not re.search(r"Setting canned message '' part 2", caplog.text, re.MULTILINE)
#
#
# @pytest.mark.unit
# def test_node_set_canned_message_200(caplog):
#    """Test run set_canned_message() 200 characters long"""
#    iface = MagicMock(autospec=SerialInterface)
#    amesg = MagicMock(autospec=AdminMessage)
#    with caplog.at_level(logging.DEBUG):
#        with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#            with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
#                anode = Node(mo, 'bar')
#                message_200_chars_long = 'a' * 200
#                anode.set_canned_message(message_200_chars_long)
#    assert re.search(r" part 1", caplog.text, re.MULTILINE)
#    assert not re.search(r"Setting canned message '' part 2", caplog.text, re.MULTILINE)
#
#
# @pytest.mark.unit
# def test_node_set_canned_message_201(caplog):
#    """Test run set_canned_message() 201 characters long"""
#    iface = MagicMock(autospec=SerialInterface)
#    amesg = MagicMock(autospec=AdminMessage)
#    with caplog.at_level(logging.DEBUG):
#        with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#            with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
#                anode = Node(mo, 'bar')
#                message_201_chars_long = 'a' * 201
#                anode.set_canned_message(message_201_chars_long)
#    assert re.search(r" part 1", caplog.text, re.MULTILINE)
#    assert re.search(r"Setting canned message 'a' part 2", caplog.text, re.MULTILINE)
#
#
# @pytest.mark.unit
# def test_node_set_canned_message_1000(caplog):
#    """Test run set_canned_message() 1000 characters long"""
#    iface = MagicMock(autospec=SerialInterface)
#    amesg = MagicMock(autospec=AdminMessage)
#    with caplog.at_level(logging.DEBUG):
#        with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#            with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
#                anode = Node(mo, 'bar')
#                message_1000_chars_long = 'a' * 1000
#                anode.set_canned_message(message_1000_chars_long)
#    assert re.search(r" part 1", caplog.text, re.MULTILINE)
#    assert re.search(r" part 2", caplog.text, re.MULTILINE)
#    assert re.search(r" part 3", caplog.text, re.MULTILINE)
#    assert re.search(r" part 4", caplog.text, re.MULTILINE)
#    assert re.search(r" part 5", caplog.text, re.MULTILINE)
#
#
# @pytest.mark.unit
# def test_node_set_canned_message_1001(capsys):
#    """Test run set_canned_message() 1001 characters long"""
#    iface = MagicMock(autospec=SerialInterface)
#    with pytest.raises(SystemExit) as pytest_wrapped_e:
#        with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#            anode = Node(mo, 'bar')
#            message_1001_chars_long = 'a' * 1001
#            anode.set_canned_message(message_1001_chars_long)
#    assert pytest_wrapped_e.type == SystemExit
#    assert pytest_wrapped_e.value.code == 1
#    out, err = capsys.readouterr()
#    assert re.search(r'Warning: The canned message', out, re.MULTILINE)
#    assert err == ''


# TODO
# @pytest.mark.unit
# def test_setOwnerShort(caplog):
#    """Test setOwner"""
#    anode = Node('foo', 'bar', noProto=True)
#    with caplog.at_level(logging.DEBUG):
#        anode.setOwner(long_name=None, short_name='123')
#    assert re.search(r'p.set_owner.short_name:123:', caplog.text, re.MULTILINE)


# TODO
# @pytest.mark.unit
# def test_setOwner_no_short_name(caplog):
#    """Test setOwner"""
#    anode = Node('foo', 'bar', noProto=True)
#    with caplog.at_level(logging.DEBUG):
#        anode.setOwner(long_name ='Test123')
#    assert re.search(r'p.set_owner.long_name:Test123:', caplog.text, re.MULTILINE)
#    assert re.search(r'p.set_owner.short_name:Tst:', caplog.text, re.MULTILINE)
#    assert re.search(r'p.set_owner.is_licensed:False', caplog.text, re.MULTILINE)


# TODO
# @pytest.mark.unit
# def test_setOwner_no_short_name_and_long_name_is_short(caplog):
#    """Test setOwner"""
#    anode = Node('foo', 'bar', noProto=True)
#    with caplog.at_level(logging.DEBUG):
#        anode.setOwner(long_name ='Tnt')
#    assert re.search(r'p.set_owner.long_name:Tnt:', caplog.text, re.MULTILINE)
#    assert re.search(r'p.set_owner.short_name:Tnt:', caplog.text, re.MULTILINE)
#    assert re.search(r'p.set_owner.is_licensed:False', caplog.text, re.MULTILINE)


# TODO
# @pytest.mark.unit
# def test_setOwner_no_short_name_and_long_name_has_words(caplog):
#    """Test setOwner"""
#    anode = Node('foo', 'bar', noProto=True)
#    with caplog.at_level(logging.DEBUG):
#        anode.setOwner(long_name ='A B C', is_licensed=True)
#    assert re.search(r'p.set_owner.long_name:A B C:', caplog.text, re.MULTILINE)
#    assert re.search(r'p.set_owner.short_name:ABC:', caplog.text, re.MULTILINE)
#    assert re.search(r'p.set_owner.is_licensed:True', caplog.text, re.MULTILINE)


# TODO
# @pytest.mark.unit
# def test_setOwner_long_name_no_short(caplog):
#    """Test setOwner"""
#    anode = Node('foo', 'bar', noProto=True)
#    with caplog.at_level(logging.DEBUG):
#        anode.setOwner(long_name ='Aabo', is_licensed=True)
#    assert re.search(r'p.set_owner.long_name:Aabo:', caplog.text, re.MULTILINE)
#    assert re.search(r'p.set_owner.short_name:Aab:', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_exitSimulator(caplog):
    """Test exitSimulator"""
    interface = MeshInterface()
    interface.nodesByNum = {}
    anode = Node(interface, "!ba400000", noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.exitSimulator()
    assert re.search(r"in exitSimulator", caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_reboot(caplog):
    """Test reboot"""
    interface = MeshInterface()
    interface.nodesByNum = {}
    anode = Node(interface, 1234567890, noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.reboot()
    assert re.search(r"Telling node to reboot", caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_shutdown(caplog):
    """Test shutdown"""
    interface = MeshInterface()
    interface.nodesByNum = {}
    anode = Node(interface, 1234567890, noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.shutdown()
    assert re.search(r"Telling node to shutdown", caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_setURL_empty_url(capsys):
    """Test reboot"""
    anode = Node("foo", "bar", noProto=True)
    with pytest.raises(SystemExit) as pytest_wrapped_e:
        anode.setURL("")
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1
    out, err = capsys.readouterr()
    assert re.search(r"Warning: config or channels not loaded", out, re.MULTILINE)
    assert err == ""


# TODO
# @pytest.mark.unit
# def test_setURL_valid_URL(caplog):
#    """Test setURL"""
#    iface = MagicMock(autospec=SerialInterface)
#    url = "https://www.meshtastic.org/d/#CgUYAyIBAQ"
#    with caplog.at_level(logging.DEBUG):
#        anode = Node(iface, 'bar', noProto=True)
#        anode.radioConfig = 'baz'
#        channels = ['zoo']
#        anode.channels = channels
#        anode.setURL(url)
#    assert re.search(r'Channel i:0', caplog.text, re.MULTILINE)
#    assert re.search(r'modem_config: MidSlow', caplog.text, re.MULTILINE)
#    assert re.search(r'psk: "\\001"', caplog.text, re.MULTILINE)
#    assert re.search(r'role: PRIMARY', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_setURL_valid_URL_but_no_settings(capsys):
    """Test setURL"""
    iface = MagicMock(autospec=SerialInterface)
    url = "https://www.meshtastic.org/d/#"
    with pytest.raises(SystemExit) as pytest_wrapped_e:
        anode = Node(iface, "bar", noProto=True)
        anode.radioConfig = "baz"
        anode.setURL(url)
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1
    out, err = capsys.readouterr()
    assert re.search(r"Warning: config or channels not loaded", out, re.MULTILINE)
    assert err == ""


# TODO
# @pytest.mark.unit
# def test_showChannels(capsys):
#    """Test showChannels"""
#    anode = Node('foo', 'bar')
#
#    # primary channel
#    # role: 0=Disabled, 1=Primary, 2=Secondary
#    # modem_config: 0-5
#    # role: 0=Disabled, 1=Primary, 2=Secondary
#    channel1 = Channel(index=1, role=1)
#    channel1.settings.modem_config = 3
#    channel1.settings.psk = b'\x01'
#
#    channel2 = Channel(index=2, role=2)
#    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
#    channel2.settings.name = 'testing'
#
#    channel3 = Channel(index=3, role=0)
#    channel4 = Channel(index=4, role=0)
#    channel5 = Channel(index=5, role=0)
#    channel6 = Channel(index=6, role=0)
#    channel7 = Channel(index=7, role=0)
#    channel8 = Channel(index=8, role=0)
#
#    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
#
#    anode.channels = channels
#    anode.showChannels()
#    out, err = capsys.readouterr()
#    assert re.search(r'Channels:', out, re.MULTILINE)
#    # primary channel
#    assert re.search(r'Primary channel URL', out, re.MULTILINE)
#    assert re.search(r'PRIMARY psk=default ', out, re.MULTILINE)
#    assert re.search(r'"modemConfig": "MidSlow"', out, re.MULTILINE)
#    assert re.search(r'"psk": "AQ=="', out, re.MULTILINE)
#    # secondary channel
#    assert re.search(r'SECONDARY psk=secret ', out, re.MULTILINE)
#    assert re.search(r'"psk": "ipR5DsbJHjWREkCmMKi0M4cA8ksO539Bes31sJAwqDQ="', out, re.MULTILINE)
#    assert err == ''


@pytest.mark.unit
def test_getChannelByChannelIndex():
    """Test getChannelByChannelIndex()"""
    anode = Node("foo", "bar")

    channel1 = Channel(index=1, role=1)  # primary channel
    channel2 = Channel(index=2, role=2)  # secondary channel
    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [
        channel1,
        channel2,
        channel3,
        channel4,
        channel5,
        channel6,
        channel7,
        channel8,
    ]

    anode.channels = channels

    # test primary
    assert anode.getChannelByChannelIndex(0) is not None
    # test secondary
    assert anode.getChannelByChannelIndex(1) is not None
    # test disabled
    assert anode.getChannelByChannelIndex(2) is not None
    # test invalid values
    assert anode.getChannelByChannelIndex(-1) is None
    assert anode.getChannelByChannelIndex(9) is None


# TODO
# @pytest.mark.unit
# def test_deleteChannel_try_to_delete_primary_channel(capsys):
#    """Try to delete primary channel."""
#    anode = Node('foo', 'bar')
#
#    channel1 = Channel(index=1, role=1)
#    channel1.settings.modem_config = 3
#    channel1.settings.psk = b'\x01'
#
#    # no secondary channels
#    channel2 = Channel(index=2, role=0)
#    channel3 = Channel(index=3, role=0)
#    channel4 = Channel(index=4, role=0)
#    channel5 = Channel(index=5, role=0)
#    channel6 = Channel(index=6, role=0)
#    channel7 = Channel(index=7, role=0)
#    channel8 = Channel(index=8, role=0)
#
#    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
#
#    anode.channels = channels
#    with pytest.raises(SystemExit) as pytest_wrapped_e:
#        anode.deleteChannel(0)
#    assert pytest_wrapped_e.type == SystemExit
#    assert pytest_wrapped_e.value.code == 1
#    out, err = capsys.readouterr()
#    assert re.search(r'Warning: Only SECONDARY channels can be deleted', out, re.MULTILINE)
#    assert err == ''


# TODO
# @pytest.mark.unit
# def test_deleteChannel_secondary():
#    """Try to delete a secondary channel."""
#
#    channel1 = Channel(index=1, role=1)
#    channel1.settings.modem_config = 3
#    channel1.settings.psk = b'\x01'
#
#    channel2 = Channel(index=2, role=2)
#    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
#    channel2.settings.name = 'testing'
#
#    channel3 = Channel(index=3, role=0)
#    channel4 = Channel(index=4, role=0)
#    channel5 = Channel(index=5, role=0)
#    channel6 = Channel(index=6, role=0)
#    channel7 = Channel(index=7, role=0)
#    channel8 = Channel(index=8, role=0)
#
#    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
#
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        mo.localNode.getChannelByName.return_value = None
#        mo.myInfo.max_channels = 8
#        anode = Node(mo, 'bar', noProto=True)
#
#        anode.channels = channels
#        assert len(anode.channels) == 8
#        assert channels[0].settings.modem_config == 3
#        assert channels[1].settings.name == 'testing'
#        assert channels[2].settings.name == ''
#        assert channels[3].settings.name == ''
#        assert channels[4].settings.name == ''
#        assert channels[5].settings.name == ''
#        assert channels[6].settings.name == ''
#        assert channels[7].settings.name == ''
#
#        anode.deleteChannel(1)
#
#        assert len(anode.channels) == 8
#        assert channels[0].settings.modem_config == 3
#        assert channels[1].settings.name == ''
#        assert channels[2].settings.name == ''
#        assert channels[3].settings.name == ''
#        assert channels[4].settings.name == ''
#        assert channels[5].settings.name == ''
#        assert channels[6].settings.name == ''
#        assert channels[7].settings.name == ''


# TODO
# @pytest.mark.unit
# def test_deleteChannel_secondary_with_admin_channel_after_testing():
#    """Try to delete a secondary channel where there is an admin channel."""
#
#    channel1 = Channel(index=1, role=1)
#    channel1.settings.modem_config = 3
#    channel1.settings.psk = b'\x01'
#
#    channel2 = Channel(index=2, role=2)
#    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
#    channel2.settings.name = 'testing'
#
#    channel3 = Channel(index=3, role=2)
#    channel3.settings.name = 'admin'
#
#    channel4 = Channel(index=4, role=0)
#    channel5 = Channel(index=5, role=0)
#    channel6 = Channel(index=6, role=0)
#    channel7 = Channel(index=7, role=0)
#    channel8 = Channel(index=8, role=0)
#
#    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
#
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        mo.localNode.getChannelByName.return_value = None
#        mo.myInfo.max_channels = 8
#        anode = Node(mo, 'bar', noProto=True)
#
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        assert mo.localNode == anode
#
#        anode.channels = channels
#        assert len(anode.channels) == 8
#        assert channels[0].settings.modem_config == 3
#        assert channels[1].settings.name == 'testing'
#        assert channels[2].settings.name == 'admin'
#        assert channels[3].settings.name == ''
#        assert channels[4].settings.name == ''
#        assert channels[5].settings.name == ''
#        assert channels[6].settings.name == ''
#        assert channels[7].settings.name == ''
#
#        anode.deleteChannel(1)
#
#        assert len(anode.channels) == 8
#        assert channels[0].settings.modem_config == 3
#        assert channels[1].settings.name == 'admin'
#        assert channels[2].settings.name == ''
#        assert channels[3].settings.name == ''
#        assert channels[4].settings.name == ''
#        assert channels[5].settings.name == ''
#        assert channels[6].settings.name == ''
#        assert channels[7].settings.name == ''


# TODO
# @pytest.mark.unit
# def test_deleteChannel_secondary_with_admin_channel_before_testing():
#    """Try to delete a secondary channel where there is an admin channel."""
#
#    channel1 = Channel(index=1, role=1)
#    channel1.settings.modem_config = 3
#    channel1.settings.psk = b'\x01'
#
#    channel2 = Channel(index=2, role=2)
#    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
#    channel2.settings.name = 'admin'
#
#    channel3 = Channel(index=3, role=2)
#    channel3.settings.name = 'testing'
#
#    channel4 = Channel(index=4, role=0)
#    channel5 = Channel(index=5, role=0)
#    channel6 = Channel(index=6, role=0)
#    channel7 = Channel(index=7, role=0)
#    channel8 = Channel(index=8, role=0)
#
#    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
#
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        mo.localNode.getChannelByName.return_value = None
#        mo.myInfo.max_channels = 8
#        anode = Node(mo, 'bar', noProto=True)
#
#        anode.channels = channels
#        assert len(anode.channels) == 8
#        assert channels[0].settings.modem_config == 3
#        assert channels[1].settings.name == 'admin'
#        assert channels[2].settings.name == 'testing'
#        assert channels[3].settings.name == ''
#        assert channels[4].settings.name == ''
#        assert channels[5].settings.name == ''
#        assert channels[6].settings.name == ''
#        assert channels[7].settings.name == ''
#
#        anode.deleteChannel(2)
#
#        assert len(anode.channels) == 8
#        assert channels[0].settings.modem_config == 3
#        assert channels[1].settings.name == 'admin'
#        assert channels[2].settings.name == ''
#        assert channels[3].settings.name == ''
#        assert channels[4].settings.name == ''
#        assert channels[5].settings.name == ''
#        assert channels[6].settings.name == ''
#        assert channels[7].settings.name == ''
#
#
# @pytest.mark.unit
# def test_getChannelByName():
#    """Get a channel by the name."""
#    anode = Node('foo', 'bar')
#
#    channel1 = Channel(index=1, role=1)
#    channel1.settings.modem_config = 3
#    channel1.settings.psk = b'\x01'
#
#    channel2 = Channel(index=2, role=2)
#    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
#    channel2.settings.name = 'admin'
#
#    channel3 = Channel(index=3, role=0)
#    channel4 = Channel(index=4, role=0)
#    channel5 = Channel(index=5, role=0)
#    channel6 = Channel(index=6, role=0)
#    channel7 = Channel(index=7, role=0)
#    channel8 = Channel(index=8, role=0)
#
#    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
#
#    anode.channels = channels
#    ch = anode.getChannelByName('admin')
#    assert ch.index == 2


# TODO
# @pytest.mark.unit
# def test_getChannelByName_invalid_name():
#    """Get a channel by the name but one that is not present."""
#    anode = Node('foo', 'bar')
#
#    channel1 = Channel(index=1, role=1)
#    channel1.settings.modem_config = 3
#    channel1.settings.psk = b'\x01'
#
#    channel2 = Channel(index=2, role=2)
#    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
#    channel2.settings.name = 'admin'
#
#    channel3 = Channel(index=3, role=0)
#    channel4 = Channel(index=4, role=0)
#    channel5 = Channel(index=5, role=0)
#    channel6 = Channel(index=6, role=0)
#    channel7 = Channel(index=7, role=0)
#    channel8 = Channel(index=8, role=0)
#
#    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
#
#    anode.channels = channels
#    ch = anode.getChannelByName('testing')
#    assert ch is None
#
#
# @pytest.mark.unit
# def test_getDisabledChannel():
#    """Get the first disabled channel."""
#    anode = Node('foo', 'bar')
#
#    channel1 = Channel(index=1, role=1)
#    channel1.settings.modem_config = 3
#    channel1.settings.psk = b'\x01'
#
#    channel2 = Channel(index=2, role=2)
#    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
#    channel2.settings.name = 'testingA'
#
#    channel3 = Channel(index=3, role=2)
#    channel3.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
#    channel3.settings.name = 'testingB'
#
#    channel4 = Channel(index=4, role=0)
#    channel5 = Channel(index=5, role=0)
#    channel6 = Channel(index=6, role=0)
#    channel7 = Channel(index=7, role=0)
#    channel8 = Channel(index=8, role=0)
#
#    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
#
#    anode.channels = channels
#    ch = anode.getDisabledChannel()
#    assert ch.index == 4


# TODO
# @pytest.mark.unit
# def test_getDisabledChannel_where_all_channels_are_used():
#    """Get the first disabled channel."""
#    anode = Node('foo', 'bar')
#
#    channel1 = Channel(index=1, role=1)
#    channel1.settings.modem_config = 3
#    channel1.settings.psk = b'\x01'
#
#    channel2 = Channel(index=2, role=2)
#    channel3 = Channel(index=3, role=2)
#    channel4 = Channel(index=4, role=2)
#    channel5 = Channel(index=5, role=2)
#    channel6 = Channel(index=6, role=2)
#    channel7 = Channel(index=7, role=2)
#    channel8 = Channel(index=8, role=2)
#
#    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
#
#    anode.channels = channels
#    ch = anode.getDisabledChannel()
#    assert ch is None


# TODO
# @pytest.mark.unit
# def test_getAdminChannelIndex():
#    """Get the 'admin' channel index."""
#    anode = Node('foo', 'bar')
#
#    channel1 = Channel(index=1, role=1)
#    channel1.settings.modem_config = 3
#    channel1.settings.psk = b'\x01'
#
#    channel2 = Channel(index=2, role=2)
#    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
#    channel2.settings.name = 'admin'
#
#    channel3 = Channel(index=3, role=0)
#    channel4 = Channel(index=4, role=0)
#    channel5 = Channel(index=5, role=0)
#    channel6 = Channel(index=6, role=0)
#    channel7 = Channel(index=7, role=0)
#    channel8 = Channel(index=8, role=0)
#
#    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
#
#    anode.channels = channels
#    i = anode._getAdminChannelIndex()
#    assert i == 2


# TODO
# @pytest.mark.unit
# def test_getAdminChannelIndex_when_no_admin_named_channel():
#    """Get the 'admin' channel when there is not one."""
#    anode = Node('foo', 'bar')
#
#    channel1 = Channel(index=1, role=1)
#    channel1.settings.modem_config = 3
#    channel1.settings.psk = b'\x01'
#
#    channel2 = Channel(index=2, role=0)
#    channel3 = Channel(index=3, role=0)
#    channel4 = Channel(index=4, role=0)
#    channel5 = Channel(index=5, role=0)
#    channel6 = Channel(index=6, role=0)
#    channel7 = Channel(index=7, role=0)
#    channel8 = Channel(index=8, role=0)
#
#    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
#
#    anode.channels = channels
#    i = anode._getAdminChannelIndex()
#    assert i == 0


# TODO
# TODO: should we check if we need to turn it off?
# @pytest.mark.unit
# def test_turnOffEncryptionOnPrimaryChannel(capsys):
#    """Turn off encryption when there is a psk."""
#    anode = Node('foo', 'bar', noProto=True)
#
#    channel1 = Channel(index=1, role=1)
#    channel1.settings.modem_config = 3
#    # value from using "--ch-set psk 0x1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b "
#    channel1.settings.psk = b'\x1a\x1a\x1a\x1a++++\x1a\x1a\x1a\x1a++++\x1a\x1a\x1a\x1a++++\x1a\x1a\x1a\x1a++++'
#
#    channel2 = Channel(index=2, role=0)
#    channel3 = Channel(index=3, role=0)
#    channel4 = Channel(index=4, role=0)
#    channel5 = Channel(index=5, role=0)
#    channel6 = Channel(index=6, role=0)
#    channel7 = Channel(index=7, role=0)
#    channel8 = Channel(index=8, role=0)
#
#    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
#
#    anode.channels = channels
#    anode.turnOffEncryptionOnPrimaryChannel()
#    out, err = capsys.readouterr()
#    assert re.search(r'Writing modified channels to device', out)
#    assert err == ''


@pytest.mark.unit
def test_writeConfig_with_no_radioConfig(capsys):
    """Test writeConfig with no radioConfig."""
    anode = Node("foo", "bar", noProto=True)

    with pytest.raises(SystemExit) as pytest_wrapped_e:
        anode.writeConfig('foo')
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1
    out, err = capsys.readouterr()
    print(out)
    assert re.search(r"Error: No valid config with name foo", out)
    assert err == ""


# TODO
# @pytest.mark.unit
# def test_writeConfig(caplog):
#    """Test writeConfig"""
#    anode = Node('foo', 'bar', noProto=True)
#    radioConfig = RadioConfig()
#    anode.radioConfig = radioConfig
#
#    with caplog.at_level(logging.DEBUG):
#        anode.writeConfig()
#    assert re.search(r'Wrote config', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_requestChannel_not_localNode(caplog, capsys):
    """Test _requestChannel()"""
    iface = MagicMock(autospec=SerialInterface)
    with patch("meshtastic.serial_interface.SerialInterface", return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, "bar", noProto=True)
        with caplog.at_level(logging.DEBUG):
            anode._requestChannel(0)
            assert re.search(
                r"Requesting channel 0 info from remote node", caplog.text, re.MULTILINE
            )
        out, err = capsys.readouterr()
        assert re.search(r"Requesting channel 0 info", out, re.MULTILINE)
        assert err == ""


@pytest.mark.unit
def test_requestChannel_localNode(caplog):
    """Test _requestChannel()"""
    iface = MagicMock(autospec=SerialInterface)
    with patch("meshtastic.serial_interface.SerialInterface", return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, "bar", noProto=True)

        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
        mo.localNode = anode

        with caplog.at_level(logging.DEBUG):
            anode._requestChannel(0)
            assert re.search(r"Requesting channel 0", caplog.text, re.MULTILINE)
            assert not re.search(r"from remote node", caplog.text, re.MULTILINE)

@pytest.mark.unit
def test_requestChannels_non_localNode(caplog):
    """Test requestChannels() with a starting index of 0"""
    iface = MagicMock(autospec=SerialInterface)
    with patch("meshtastic.serial_interface.SerialInterface", return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, "bar", noProto=True)
        anode.partialChannels = ['0']
        with caplog.at_level(logging.DEBUG):
            anode.requestChannels(0)
            assert re.search(f"Requesting channel 0 info from remote node", caplog.text, re.MULTILINE)
            assert anode.partialChannels == []

@pytest.mark.unit
def test_requestChannels_non_localNode_starting_index(caplog):
    """Test requestChannels() with a starting index of non-0"""
    iface = MagicMock(autospec=SerialInterface)
    with patch("meshtastic.serial_interface.SerialInterface", return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, "bar", noProto=True)
        anode.partialChannels = ['1']
        with caplog.at_level(logging.DEBUG):
            anode.requestChannels(3)
            assert re.search(f"Requesting channel 3 info from remote node", caplog.text, re.MULTILINE)
            # make sure it hasn't been initialized
            assert anode.partialChannels == ['1']

# @pytest.mark.unit
# def test_onResponseRequestCannedMessagePluginMesagePart1(caplog):
#    """Test onResponseRequestCannedMessagePluginMessagePart1()"""
#
#    part1 = CannedMessagePluginMessagePart1()
#    part1.text = 'foo1'
#
#    msg1 = MagicMock(autospec=AdminMessage)
#    msg1.get_canned_message_plugin_part1_response = part1
#
#    packet = {
#            'from': 682968612,
#            'to': 682968612,
#            'decoded': {
#                'portnum': 'ADMIN_APP',
#                'payload': 'faked',
#                'requestId': 927039000,
#                'admin': {
#                    'getCannedMessagePluginPart1Response': {'text': 'foo1'},
#                    'raw': msg1
#                    }
#                },
#            'id': 589440320,
#            'rxTime': 1642710843,
#            'hopLimit': 3,
#            'priority': 'RELIABLE',
#            'raw': 'faked',
#            'fromId': '!28b54624',
#            'toId': '!28b54624'
#            }
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        anode = Node(mo, 'bar', noProto=True)
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.onResponseRequestCannedMessagePluginMessagePart1(packet)
#            assert re.search(r'onResponseRequestCannedMessagePluginMessagePart1', caplog.text, re.MULTILINE)
#            assert anode.cannedPluginMessagePart1 == 'foo1'


# @pytest.mark.unit
# def test_onResponseRequestCannedMessagePluginMesagePart2(caplog):
#    """Test onResponseRequestCannedMessagePluginMessagePart2()"""
#
#    part2 = CannedMessagePluginMessagePart2()
#    part2.text = 'foo2'
#
#    msg2 = MagicMock(autospec=AdminMessage)
#    msg2.get_canned_message_plugin_part2_response = part2
#
#    packet = {
#            'from': 682968612,
#            'to': 682968612,
#            'decoded': {
#                'portnum': 'ADMIN_APP',
#                'payload': 'faked',
#                'requestId': 927039000,
#                'admin': {
#                    'getCannedMessagePluginPart2Response': {'text': 'foo2'},
#                    'raw': msg2
#                    }
#                },
#            'id': 589440320,
#            'rxTime': 1642710843,
#            'hopLimit': 3,
#            'priority': 'RELIABLE',
#            'raw': 'faked',
#            'fromId': '!28b54624',
#            'toId': '!28b54624'
#            }
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        anode = Node(mo, 'bar', noProto=True)
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.onResponseRequestCannedMessagePluginMessagePart2(packet)
#            assert re.search(r'onResponseRequestCannedMessagePluginMessagePart2', caplog.text, re.MULTILINE)
#            assert anode.cannedPluginMessagePart2 == 'foo2'


# @pytest.mark.unit
# def test_onResponseRequestCannedMessagePluginMesagePart3(caplog):
#    """Test onResponseRequestCannedMessagePluginMessagePart3()"""
#
#    part3 = CannedMessagePluginMessagePart3()
#    part3.text = 'foo3'
#
#    msg3 = MagicMock(autospec=AdminMessage)
#    msg3.get_canned_message_plugin_part3_response = part3
#
#    packet = {
#            'from': 682968612,
#            'to': 682968612,
#            'decoded': {
#                'portnum': 'ADMIN_APP',
#                'payload': 'faked',
#                'requestId': 927039000,
#                'admin': {
#                    'getCannedMessagePluginPart3Response': {'text': 'foo3'},
#                    'raw': msg3
#                    }
#                },
#            'id': 589440320,
#            'rxTime': 1642710843,
#            'hopLimit': 3,
#            'priority': 'RELIABLE',
#            'raw': 'faked',
#            'fromId': '!28b54624',
#            'toId': '!28b54624'
#            }
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        anode = Node(mo, 'bar', noProto=True)
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.onResponseRequestCannedMessagePluginMessagePart3(packet)
#            assert re.search(r'onResponseRequestCannedMessagePluginMessagePart3', caplog.text, re.MULTILINE)
#            assert anode.cannedPluginMessagePart3 == 'foo3'


# @pytest.mark.unit
# def test_onResponseRequestCannedMessagePluginMesagePart4(caplog):
#    """Test onResponseRequestCannedMessagePluginMessagePart4()"""
#
#    part4 = CannedMessagePluginMessagePart4()
#    part4.text = 'foo4'
#
#    msg4 = MagicMock(autospec=AdminMessage)
#    msg4.get_canned_message_plugin_part4_response = part4
#
#    packet = {
#            'from': 682968612,
#            'to': 682968612,
#            'decoded': {
#                'portnum': 'ADMIN_APP',
#                'payload': 'faked',
#                'requestId': 927039000,
#                'admin': {
#                    'getCannedMessagePluginPart4Response': {'text': 'foo4'},
#                    'raw': msg4
#                    }
#                },
#            'id': 589440320,
#            'rxTime': 1642710843,
#            'hopLimit': 3,
#            'priority': 'RELIABLE',
#            'raw': 'faked',
#            'fromId': '!28b54624',
#            'toId': '!28b54624'
#            }
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        anode = Node(mo, 'bar', noProto=True)
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.onResponseRequestCannedMessagePluginMessagePart4(packet)
#            assert re.search(r'onResponseRequestCannedMessagePluginMessagePart4', caplog.text, re.MULTILINE)
#            assert anode.cannedPluginMessagePart4 == 'foo4'


# @pytest.mark.unit
# def test_onResponseRequestCannedMessagePluginMesagePart5(caplog):
#    """Test onResponseRequestCannedMessagePluginMessagePart5()"""
#
#    part5 = CannedMessagePluginMessagePart5()
#    part5.text = 'foo5'
#
#    msg5 = MagicMock(autospec=AdminMessage)
#    msg5.get_canned_message_plugin_part5_response = part5
#
#
#    packet = {
#            'from': 682968612,
#            'to': 682968612,
#            'decoded': {
#                'portnum': 'ADMIN_APP',
#                'payload': 'faked',
#                'requestId': 927039000,
#                'admin': {
#                    'getCannedMessagePluginPart5Response': {'text': 'foo5'},
#                    'raw': msg5
#                    }
#                },
#            'id': 589440320,
#            'rxTime': 1642710843,
#            'hopLimit': 3,
#            'priority': 'RELIABLE',
#            'raw': 'faked',
#            'fromId': '!28b54624',
#            'toId': '!28b54624'
#            }
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        anode = Node(mo, 'bar', noProto=True)
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.onResponseRequestCannedMessagePluginMessagePart5(packet)
#            assert re.search(r'onResponseRequestCannedMessagePluginMessagePart5', caplog.text, re.MULTILINE)
#            assert anode.cannedPluginMessagePart5 == 'foo5'


# @pytest.mark.unit
# def test_onResponseRequestCannedMessagePluginMesagePart1_error(caplog, capsys):
#    """Test onResponseRequestCannedMessagePluginMessagePart1() with error"""
#
#    packet = {
#            'decoded': {
#                'routing': {
#                    'errorReason': 'some made up error',
#                    },
#                },
#            }
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        anode = Node(mo, 'bar', noProto=True)
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.onResponseRequestCannedMessagePluginMessagePart1(packet)
#            assert re.search(r'onResponseRequestCannedMessagePluginMessagePart1', caplog.text, re.MULTILINE)
#        out, err = capsys.readouterr()
#        assert re.search(r'Error on response', out)
#        assert err == ''


# @pytest.mark.unit
# def test_onResponseRequestCannedMessagePluginMesagePart2_error(caplog, capsys):
#    """Test onResponseRequestCannedMessagePluginMessagePart2() with error"""
#
#    packet = {
#            'decoded': {
#                'routing': {
#                    'errorReason': 'some made up error',
#                    },
#                },
#            }
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        anode = Node(mo, 'bar', noProto=True)
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.onResponseRequestCannedMessagePluginMessagePart2(packet)
#            assert re.search(r'onResponseRequestCannedMessagePluginMessagePart2', caplog.text, re.MULTILINE)
#        out, err = capsys.readouterr()
#        assert re.search(r'Error on response', out)
#        assert err == ''


# @pytest.mark.unit
# def test_onResponseRequestCannedMessagePluginMesagePart3_error(caplog, capsys):
#    """Test onResponseRequestCannedMessagePluginMessagePart3() with error"""
#
#    packet = {
#            'decoded': {
#                'routing': {
#                    'errorReason': 'some made up error',
#                    },
#                },
#            }
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        anode = Node(mo, 'bar', noProto=True)
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.onResponseRequestCannedMessagePluginMessagePart3(packet)
#            assert re.search(r'onResponseRequestCannedMessagePluginMessagePart3', caplog.text, re.MULTILINE)
#        out, err = capsys.readouterr()
#        assert re.search(r'Error on response', out)
#        assert err == ''
#
#
# @pytest.mark.unit
# def test_onResponseRequestCannedMessagePluginMesagePart4_error(caplog, capsys):
#    """Test onResponseRequestCannedMessagePluginMessagePart4() with error"""
#
#    packet = {
#            'decoded': {
#                'routing': {
#                    'errorReason': 'some made up error',
#                    },
#                },
#            }
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        anode = Node(mo, 'bar', noProto=True)
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.onResponseRequestCannedMessagePluginMessagePart4(packet)
#            assert re.search(r'onResponseRequestCannedMessagePluginMessagePart4', caplog.text, re.MULTILINE)
#        out, err = capsys.readouterr()
#        assert re.search(r'Error on response', out)
#        assert err == ''
#
#
# @pytest.mark.unit
# def test_onResponseRequestCannedMessagePluginMesagePart5_error(caplog, capsys):
#    """Test onResponseRequestCannedMessagePluginMessagePart5() with error"""
#
#    packet = {
#            'decoded': {
#                'routing': {
#                    'errorReason': 'some made up error',
#                    },
#                },
#            }
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        anode = Node(mo, 'bar', noProto=True)
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.onResponseRequestCannedMessagePluginMessagePart5(packet)
#            assert re.search(r'onResponseRequestCannedMessagePluginMessagePart5', caplog.text, re.MULTILINE)
#        out, err = capsys.readouterr()
#        assert re.search(r'Error on response', out)
#        assert err == ''


# TODO
# @pytest.mark.unit
# def test_onResponseRequestChannel(caplog):
#    """Test onResponseRequestChannel()"""
#
#    channel1 = Channel(index=1, role=1)
#    channel1.settings.modem_config = 3
#    channel1.settings.psk = b'\x01'
#
#    msg1 = MagicMock(autospec=AdminMessage)
#    msg1.get_channel_response = channel1
#
#    msg2 = MagicMock(autospec=AdminMessage)
#    channel2 = Channel(index=2, role=0) # disabled
#    msg2.get_channel_response = channel2
#
#    # default primary channel
#    packet1 = {
#        'from': 2475227164,
#        'to': 2475227164,
#        'decoded': {
#            'portnum': 'ADMIN_APP',
#            'payload': b':\t\x12\x05\x18\x03"\x01\x01\x18\x01',
#            'requestId': 2615094405,
#            'admin': {
#                'getChannelResponse': {
#                    'settings': {
#                        'modemConfig': 'Bw125Cr48Sf4096',
#                        'psk': 'AQ=='
#                    },
#                    'role': 'PRIMARY'
#                },
#                'raw': msg1,
#            }
#        },
#        'id': 1692918436,
#        'hopLimit': 3,
#        'priority':
#        'RELIABLE',
#        'raw': 'fake',
#        'fromId': '!9388f81c',
#        'toId': '!9388f81c'
#        }
#
#    # no other channels
#    packet2 = {
#        'from': 2475227164,
#        'to': 2475227164,
#        'decoded': {
#            'portnum': 'ADMIN_APP',
#            'payload': b':\x04\x08\x02\x12\x00',
#            'requestId': 743049663,
#            'admin': {
#                'getChannelResponse': {
#                    'index': 2,
#                    'settings': {}
#                },
#                'raw': msg2,
#            }
#        },
#        'id': 1692918456,
#        'rxTime': 1640202239,
#        'hopLimit': 3,
#        'priority': 'RELIABLE',
#        'raw': 'faked',
#        'fromId': '!9388f81c',
#        'toId': '!9388f81c'
#    }
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        mo.localNode.getChannelByName.return_value = None
#        mo.myInfo.max_channels = 8
#        anode = Node(mo, 'bar', noProto=True)
#
#        radioConfig = RadioConfig()
#        anode.radioConfig = radioConfig
#
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.requestConfig()
#            anode.onResponseRequestChannel(packet1)
#            assert re.search(r'Received channel', caplog.text, re.MULTILINE)
#            anode.onResponseRequestChannel(packet2)
#            assert re.search(r'Received channel', caplog.text, re.MULTILINE)
#            assert re.search(r'Finished downloading channels', caplog.text, re.MULTILINE)
#            assert len(anode.channels) == 8
#            assert anode.channels[0].settings.modem_config == 3
#            assert anode.channels[1].settings.name == ''
#            assert anode.channels[2].settings.name == ''
#            assert anode.channels[3].settings.name == ''
#            assert anode.channels[4].settings.name == ''
#            assert anode.channels[5].settings.name == ''
#            assert anode.channels[6].settings.name == ''
#            assert anode.channels[7].settings.name == ''


# TODO
# @pytest.mark.unit
# def test_onResponseRequestSetting(caplog):
#    """Test onResponseRequestSetting()"""
#    # Note: Split out the get_radio_response to a MagicMock
#    # so it could be "returned" (not really sure how to do that
#    # in a python dict.
#    amsg = MagicMock(autospec=AdminMessage)
#    amsg.get_radio_response = """{
#  preferences {
#    phone_timeout_secs: 900
#    ls_secs: 300
#    position_broadcast_smart: true
#    position_flags: 35
#  }
# }"""
#    packet = {
#        'from': 2475227164,
#        'to': 2475227164,
#        'decoded': {
#            'portnum': 'ADMIN_APP',
#            'payload': b'*\x0e\n\x0c0\x84\x07P\xac\x02\x88\x01\x01\xb0\t#',
#            'requestId': 3145147848,
#            'admin': {
#                'getRadioResponse': {
#                    'preferences': {
#                        'phoneTimeoutSecs': 900,
#                        'lsSecs': 300,
#                        'positionBroadcastSmart': True,
#                        'positionFlags': 35
#                     }
#                },
#                'raw': amsg
#            },
#            'id': 365963704,
#            'rxTime': 1640195197,
#            'hopLimit': 3,
#            'priority': 'RELIABLE',
#            'raw': 'faked',
#            'fromId': '!9388f81c',
#            'toId': '!9388f81c'
#        }
#    }
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        mo.localNode.getChannelByName.return_value = None
#        mo.myInfo.max_channels = 8
#        anode = Node(mo, 'bar', noProto=True)
#
#        radioConfig = RadioConfig()
#        anode.radioConfig = radioConfig
#
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.onResponseRequestSettings(packet)
#            assert re.search(r'Received radio config, now fetching channels..', caplog.text, re.MULTILINE)


# TODO
# @pytest.mark.unit
# def test_onResponseRequestSetting_with_error(capsys):
#    """Test onResponseRequestSetting() with an error"""
#    packet = {
#        'from': 2475227164,
#        'to': 2475227164,
#        'decoded': {
#            'portnum': 'ADMIN_APP',
#            'payload': b'*\x0e\n\x0c0\x84\x07P\xac\x02\x88\x01\x01\xb0\t#',
#            'requestId': 3145147848,
#            'routing': {
#                'errorReason': 'some made up error',
#            },
#            'admin': {
#                'getRadioResponse': {
#                    'preferences': {
#                        'phoneTimeoutSecs': 900,
#                        'lsSecs': 300,
#                        'positionBroadcastSmart': True,
#                        'positionFlags': 35
#                     }
#                },
#            },
#            'id': 365963704,
#            'rxTime': 1640195197,
#            'hopLimit': 3,
#            'priority': 'RELIABLE',
#            'fromId': '!9388f81c',
#            'toId': '!9388f81c'
#        }
#    }
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        mo.localNode.getChannelByName.return_value = None
#        mo.myInfo.max_channels = 8
#        anode = Node(mo, 'bar', noProto=True)
#
#        radioConfig = RadioConfig()
#        anode.radioConfig = radioConfig
#
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        anode.onResponseRequestSettings(packet)
#        out, err = capsys.readouterr()
#        assert re.search(r'Error on response', out)
#        assert err == ''


@pytest.mark.unit
@pytest.mark.parametrize("favorite", ["!1dec0ded", 502009325])
def test_set_favorite(favorite):
    """Test setFavorite"""
    iface = MagicMock(autospec=SerialInterface)
    node = Node(iface, 12345678)
    amesg = admin_pb2.AdminMessage()
    with patch("meshtastic.admin_pb2.AdminMessage", return_value=amesg):
        node.setFavorite(favorite)
    assert amesg.set_favorite_node == 502009325
    iface.sendData.assert_called_once()


@pytest.mark.unit
@pytest.mark.parametrize("favorite", ["!1dec0ded", 502009325])
def test_remove_favorite(favorite):
    """Test setFavorite"""
    iface = MagicMock(autospec=SerialInterface)
    node = Node(iface, 12345678)
    amesg = admin_pb2.AdminMessage()
    with patch("meshtastic.admin_pb2.AdminMessage", return_value=amesg):
        node.removeFavorite(favorite)

    assert amesg.remove_favorite_node == 502009325
    iface.sendData.assert_called_once()


@pytest.mark.unit
@pytest.mark.parametrize("ignored", ["!1dec0ded", 502009325])
def test_set_ignored(ignored):
    """Test setFavorite"""
    iface = MagicMock(autospec=SerialInterface)
    node = Node(iface, 12345678)
    amesg = admin_pb2.AdminMessage()
    with patch("meshtastic.admin_pb2.AdminMessage", return_value=amesg):
        node.setIgnored(ignored)
    assert amesg.set_ignored_node == 502009325
    iface.sendData.assert_called_once()


@pytest.mark.unit
@pytest.mark.parametrize("ignored", ["!1dec0ded", 502009325])
def test_remove_ignored(ignored):
    """Test setFavorite"""
    iface = MagicMock(autospec=SerialInterface)
    node = Node(iface, 12345678)
    amesg = admin_pb2.AdminMessage()
    with patch("meshtastic.admin_pb2.AdminMessage", return_value=amesg):
        node.removeIgnored(ignored)

    assert amesg.remove_ignored_node == 502009325
    iface.sendData.assert_called_once()


# TODO
# @pytest.mark.unitslow
# def test_waitForConfig():
#    """Test waitForConfig()"""
#    anode = Node('foo', 'bar')
#    radioConfig = RadioConfig()
#    anode.radioConfig = radioConfig
#    anode._timeout = Timeout(0.01)
#    result = anode.waitForConfig()
#    assert not result
def test_exitSimulator(caplog)

Test exitSimulator

Expand source code
@pytest.mark.unit
def test_exitSimulator(caplog):
    """Test exitSimulator"""
    interface = MeshInterface()
    interface.nodesByNum = {}
    anode = Node(interface, "!ba400000", noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.exitSimulator()
    assert re.search(r"in exitSimulator", caplog.text, re.MULTILINE)
def test_getChannelByChannelIndex()

Test getChannelByChannelIndex()

Expand source code
@pytest.mark.unit
def test_getChannelByChannelIndex():
    """Test getChannelByChannelIndex()"""
    anode = Node("foo", "bar")

    channel1 = Channel(index=1, role=1)  # primary channel
    channel2 = Channel(index=2, role=2)  # secondary channel
    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [
        channel1,
        channel2,
        channel3,
        channel4,
        channel5,
        channel6,
        channel7,
        channel8,
    ]

    anode.channels = channels

    # test primary
    assert anode.getChannelByChannelIndex(0) is not None
    # test secondary
    assert anode.getChannelByChannelIndex(1) is not None
    # test disabled
    assert anode.getChannelByChannelIndex(2) is not None
    # test invalid values
    assert anode.getChannelByChannelIndex(-1) is None
    assert anode.getChannelByChannelIndex(9) is None
def test_node(capsys)

Test that we can instantiate a Node

Expand source code
@pytest.mark.unit
def test_node(capsys):
    """Test that we can instantiate a Node"""
    iface = MagicMock(autospec=SerialInterface)
    with patch("meshtastic.serial_interface.SerialInterface", return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, "bar", noProto=True)
        lc = localonly_pb2.LocalConfig()
        anode.localConfig = lc
        lc.lora.CopyFrom(config_pb2.Config.LoRaConfig())
        anode.moduleConfig = localonly_pb2.LocalModuleConfig()
        anode.showInfo()
        out, err = capsys.readouterr()
        assert re.search(r'Preferences', out)
        assert re.search(r'Module preferences', out)
        assert re.search(r'Channels', out)
        assert re.search(r'Primary channel URL', out)
        assert not re.search(r'remote node', out)
        assert err == ''
def test_reboot(caplog)

Test reboot

Expand source code
@pytest.mark.unit
def test_reboot(caplog):
    """Test reboot"""
    interface = MeshInterface()
    interface.nodesByNum = {}
    anode = Node(interface, 1234567890, noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.reboot()
    assert re.search(r"Telling node to reboot", caplog.text, re.MULTILINE)
def test_remove_favorite(favorite)

Test setFavorite

Expand source code
@pytest.mark.unit
@pytest.mark.parametrize("favorite", ["!1dec0ded", 502009325])
def test_remove_favorite(favorite):
    """Test setFavorite"""
    iface = MagicMock(autospec=SerialInterface)
    node = Node(iface, 12345678)
    amesg = admin_pb2.AdminMessage()
    with patch("meshtastic.admin_pb2.AdminMessage", return_value=amesg):
        node.removeFavorite(favorite)

    assert amesg.remove_favorite_node == 502009325
    iface.sendData.assert_called_once()
def test_remove_ignored(ignored)

Test setFavorite

Expand source code
@pytest.mark.unit
@pytest.mark.parametrize("ignored", ["!1dec0ded", 502009325])
def test_remove_ignored(ignored):
    """Test setFavorite"""
    iface = MagicMock(autospec=SerialInterface)
    node = Node(iface, 12345678)
    amesg = admin_pb2.AdminMessage()
    with patch("meshtastic.admin_pb2.AdminMessage", return_value=amesg):
        node.removeIgnored(ignored)

    assert amesg.remove_ignored_node == 502009325
    iface.sendData.assert_called_once()
def test_requestChannel_localNode(caplog)

Test _requestChannel()

Expand source code
@pytest.mark.unit
def test_requestChannel_localNode(caplog):
    """Test _requestChannel()"""
    iface = MagicMock(autospec=SerialInterface)
    with patch("meshtastic.serial_interface.SerialInterface", return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, "bar", noProto=True)

        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
        mo.localNode = anode

        with caplog.at_level(logging.DEBUG):
            anode._requestChannel(0)
            assert re.search(r"Requesting channel 0", caplog.text, re.MULTILINE)
            assert not re.search(r"from remote node", caplog.text, re.MULTILINE)
def test_requestChannel_not_localNode(caplog, capsys)

Test _requestChannel()

Expand source code
@pytest.mark.unit
def test_requestChannel_not_localNode(caplog, capsys):
    """Test _requestChannel()"""
    iface = MagicMock(autospec=SerialInterface)
    with patch("meshtastic.serial_interface.SerialInterface", return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, "bar", noProto=True)
        with caplog.at_level(logging.DEBUG):
            anode._requestChannel(0)
            assert re.search(
                r"Requesting channel 0 info from remote node", caplog.text, re.MULTILINE
            )
        out, err = capsys.readouterr()
        assert re.search(r"Requesting channel 0 info", out, re.MULTILINE)
        assert err == ""
def test_requestChannels_non_localNode(caplog)

Test requestChannels() with a starting index of 0

Expand source code
@pytest.mark.unit
def test_requestChannels_non_localNode(caplog):
    """Test requestChannels() with a starting index of 0"""
    iface = MagicMock(autospec=SerialInterface)
    with patch("meshtastic.serial_interface.SerialInterface", return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, "bar", noProto=True)
        anode.partialChannels = ['0']
        with caplog.at_level(logging.DEBUG):
            anode.requestChannels(0)
            assert re.search(f"Requesting channel 0 info from remote node", caplog.text, re.MULTILINE)
            assert anode.partialChannels == []
def test_requestChannels_non_localNode_starting_index(caplog)

Test requestChannels() with a starting index of non-0

Expand source code
@pytest.mark.unit
def test_requestChannels_non_localNode_starting_index(caplog):
    """Test requestChannels() with a starting index of non-0"""
    iface = MagicMock(autospec=SerialInterface)
    with patch("meshtastic.serial_interface.SerialInterface", return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, "bar", noProto=True)
        anode.partialChannels = ['1']
        with caplog.at_level(logging.DEBUG):
            anode.requestChannels(3)
            assert re.search(f"Requesting channel 3 info from remote node", caplog.text, re.MULTILINE)
            # make sure it hasn't been initialized
            assert anode.partialChannels == ['1']
def test_setURL_empty_url(capsys)

Test reboot

Expand source code
@pytest.mark.unit
def test_setURL_empty_url(capsys):
    """Test reboot"""
    anode = Node("foo", "bar", noProto=True)
    with pytest.raises(SystemExit) as pytest_wrapped_e:
        anode.setURL("")
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1
    out, err = capsys.readouterr()
    assert re.search(r"Warning: config or channels not loaded", out, re.MULTILINE)
    assert err == ""
def test_setURL_valid_URL_but_no_settings(capsys)

Test setURL

Expand source code
@pytest.mark.unit
def test_setURL_valid_URL_but_no_settings(capsys):
    """Test setURL"""
    iface = MagicMock(autospec=SerialInterface)
    url = "https://www.meshtastic.org/d/#"
    with pytest.raises(SystemExit) as pytest_wrapped_e:
        anode = Node(iface, "bar", noProto=True)
        anode.radioConfig = "baz"
        anode.setURL(url)
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1
    out, err = capsys.readouterr()
    assert re.search(r"Warning: config or channels not loaded", out, re.MULTILINE)
    assert err == ""
def test_set_favorite(favorite)

Test setFavorite

Expand source code
@pytest.mark.unit
@pytest.mark.parametrize("favorite", ["!1dec0ded", 502009325])
def test_set_favorite(favorite):
    """Test setFavorite"""
    iface = MagicMock(autospec=SerialInterface)
    node = Node(iface, 12345678)
    amesg = admin_pb2.AdminMessage()
    with patch("meshtastic.admin_pb2.AdminMessage", return_value=amesg):
        node.setFavorite(favorite)
    assert amesg.set_favorite_node == 502009325
    iface.sendData.assert_called_once()
def test_set_ignored(ignored)

Test setFavorite

Expand source code
@pytest.mark.unit
@pytest.mark.parametrize("ignored", ["!1dec0ded", 502009325])
def test_set_ignored(ignored):
    """Test setFavorite"""
    iface = MagicMock(autospec=SerialInterface)
    node = Node(iface, 12345678)
    amesg = admin_pb2.AdminMessage()
    with patch("meshtastic.admin_pb2.AdminMessage", return_value=amesg):
        node.setIgnored(ignored)
    assert amesg.set_ignored_node == 502009325
    iface.sendData.assert_called_once()
def test_shutdown(caplog)

Test shutdown

Expand source code
@pytest.mark.unit
def test_shutdown(caplog):
    """Test shutdown"""
    interface = MeshInterface()
    interface.nodesByNum = {}
    anode = Node(interface, 1234567890, noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.shutdown()
    assert re.search(r"Telling node to shutdown", caplog.text, re.MULTILINE)
def test_writeConfig_with_no_radioConfig(capsys)

Test writeConfig with no radioConfig.

Expand source code
@pytest.mark.unit
def test_writeConfig_with_no_radioConfig(capsys):
    """Test writeConfig with no radioConfig."""
    anode = Node("foo", "bar", noProto=True)

    with pytest.raises(SystemExit) as pytest_wrapped_e:
        anode.writeConfig('foo')
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1
    out, err = capsys.readouterr()
    print(out)
    assert re.search(r"Error: No valid config with name foo", out)
    assert err == ""