Plugin working in CLI but errors WebUI Service Discovery

Dear Forum,

I try to implement a check to monitor or Site2Site Tunnels on an Sophos XG(S) FIrewall.

Mainly I adpted the plugins for the other services on the firewall from Andreas-Doehler on cmk-Exchange.

I ended up with this code now

#!/usr/bin/env python3

from datetime import datetime
from typing import Any, Dict, Optional

from cmk.agent_based.v2 import (
    CheckPlugin,
    CheckResult,
    DiscoveryResult,
    Result,
    Service,
    SimpleSNMPSection,
    SNMPTree,
    State,
    StringTable,
    all_of,
    check_levels,
    exists,
    startswith,
)

Section = Dict[str, Dict[str, str]]

def parse_sophosxg_s2s(string_table: StringTable) -> Optional[Section]:
    parsed = {}
    for line in string_table:
        if line[1] == "2" or line[1] == "3":
            parsed[line[0]] = {"state": line[2], "active": line[3]}
    return parsed

def discover_sophosxg_s2s(section: Section) -> DiscoveryResult:
    for key in section.keys():
        yield Service(item=key)

def check_sophosxg_s2s(item: str, section: Section) -> CheckResult:
    data = section.get(item)
    if data.get("active") == "0":
        yield Result(state=State.OK, summary="Tunnel is deactivated")
    else:
        if data.get("state") == "1":
            yield Result(state=State.OK, summary="Tunnel is up")
        else:
            if data.get("state") == "0":
                 yield Result(state=State.CRIT, summary="Tunnel is down")
            else:
                yield Result(state=State.CRIT, summary="Error in Check")

snmp_section_sophosxg_s2s = SimpleSNMPSection(
    name = "sophosxg_s2s",
    parse_function = parse_sophosxg_s2s,
    fetch = SNMPTree(
        base = '.1.3.6.1.4.1.2604.5.1.6.1.1.1.1',
        oids = ['2', # Name
            '6', # ConnType 0=ClientVPN 2=S2S-Tunnel 3=Interface
                        '9', # ConnStatus
            '10' # Activated
            ],
    ),
    detect=all_of(
        startswith(".1.3.6.1.2.1.1.2.0", ".1.3.6.1.4.1.2604.5"),
        exists(".1.3.6.1.4.1.2604.5.1.1.*"),
    ),
)

check_plugin_sophosxg_s2s = CheckPlugin(
    name = "sophosxg_s2s",
    sections = [ "sophosxg_s2s" ],
    service_name = "S2S %s",
    discovery_function = discover_sophosxg_s2s,
    check_function = check_sophosxg_s2s,
)

On the ssh-console I can get the services with
cmk -vI --detect-plugins=sophosxg_s2s XG-host
and they also apear in the WebUI and I get the outputs as I expect them. Just when I try to run a service discovery I get them as vanished services (censored by the names of our tunnels)

From Plugin detected with CLI but now with WebUI I already tried with

find ~/local/lib/python3/cmk_addons/plugins/* -type f -name '*pyc'-delete; 
find ~/local/lib/python3/cmk_addons/plugins/* -type d -empty -delete; omd restart apache; omd restart redis; cmk -O; cmk -R

which had no success, anything more I can try to get the check registered correctly, so my services don’t vanish when I perform an service discovery?

Best Regards

Any Updates to the scripts I had send to you ?
Greetz Bernd

Hi,

after I restarted the server completly the plugins are now working, all adaptions I made with your suggestions got me this code

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Sophos XG Firewall - S2S & Interface Tunnel Monitoring
Checkmk 2.4 Agent Based API v2
"""

from typing import Dict, Optional
from cmk.agent_based.v2 import (
    CheckPlugin,
    CheckResult,
    DiscoveryResult,
    Result,
    Service,
    SimpleSNMPSection,
    SNMPTree,
    State,
    StringTable,
    all_of,
    exists,
    startswith,
)

# Type Alias
Section = Dict[str, Dict[str, str]]


def parse_sophosxg_s2s(string_table: StringTable) -> Optional[Section]:
    """Parse Sophos XG VPN connection table."""
    parsed = {}
    for line in string_table:
        if len(line) < 4:
            continue

        name, conn_type, state, active = line

        # Filter: Only S2S (2) and Interface (3) tunnels
        # Ignore Client VPN (0)
        if conn_type in ("2", "3"):
            parsed[name] = {
                "state": state,     # "0"=Down, "1"=Up
                "active": active    # "0"=Disabled, "1"=Enabled
            }

    return parsed if parsed else None


def discover_sophosxg_s2s(section: Section) -> DiscoveryResult:
    """Discover all S2S and Interface tunnels."""
    for tunnel_name in sorted(section.keys()):
        yield Service(item=tunnel_name)


def check_sophosxg_s2s(item: str, section: Section) -> CheckResult:
    """Check state of a Sophos XG S2S tunnel."""
    if item not in section:
        return

    data = section[item]
    active = data.get("active")
    state = data.get("state")

    # Case 1: Tunnel administratively disabled
    if active == "0":
        yield Result(
            state=State.OK,
            summary="Tunnel deactivated (administrative)"
        )
        return

    # Case 2: Tunnel is enabled - check status
    if state == "1":
        yield Result(state=State.OK, summary="Tunnel up")
    elif state == "0":
        yield Result(state=State.CRIT, summary="Tunnel down!")
    else:
        yield Result(
            state=State.UNKNOWN,
            summary=f"Unknown tunnel state: {state}"
        )


# ============================================================================
# SNMP SECTION
# ============================================================================

snmp_section_sophosxg_s2s = SimpleSNMPSection(
    name="sophosxg_s2s",
    parse_function=parse_sophosxg_s2s,
    fetch=SNMPTree(
        base='.1.3.6.1.4.1.2604.5.1.6.1.1.1.1',
        oids=[
            '2',   # sfosVPNConnName
            '6',   # sfosVPNConnType (0=Client, 2=S2S, 3=Interface)
            '9',   # sfosVPNConnStatus (0=Down, 1=Up)
            '10'   # sfosVPNConnActivated (0=No, 1=Yes)
        ],
    ),
    detect=all_of(
        startswith(".1.3.6.1.2.1.1.2.0", ".1.3.6.1.4.1.2604.5"),
        exists(".1.3.6.1.4.1.2604.5.1.1.*"),
    ),
)


# ============================================================================
# CHECK PLUGIN REGISTRATION
# ============================================================================

check_plugin_sophosxg_s2s = CheckPlugin(
    name="sophosxg_s2s",
    sections=["sophosxg_s2s"],
    service_name="S2S Tunnel %s",
    discovery_function=discover_sophosxg_s2s,
    check_function=check_sophosxg_s2s,
)

however cmk -R and omd reload apache did still not work, but as I had a kernel-update pending I rebooted the server and it worked.
Your latest skrip was not tested, as the reboot was done before you sent it to me :slight_smile:

1 Like