From 7f466177e1423ed4d21f78617e1b16973fdb690c Mon Sep 17 00:00:00 2001 From: root Date: Fri, 18 Apr 2025 19:39:29 +0000 Subject: [PATCH] v2.3.0: Add CIDR whitelist support and fix emoji handling --- CHANGELOG.md | 145 ++++++++++++++++++++++ README.md | 34 +---- mikrocata.py | 345 +++++++++++++++++++++++++++++++++++++++------------ 3 files changed, 413 insertions(+), 111 deletions(-) create mode 100644 CHANGELOG.md diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..1ff4830 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,145 @@ +# Changelog - v2.3.0 (April 2025) + +## Key Features + +* **Advanced Whitelist Support:** Added full support for CIDR notations (e.g., `"10.0.0.0/8"`) in whitelist. +* **Emoji and Special Characters Handling:** Fixed issues with rules containing emojis or Unicode characters that caused crashes. +* **Debug Mode:** Added debug mode to facilitate troubleshooting. +* **Optimized Logging:** More concise and informative log messages in normal mode. +* **Critical Fixes:** Fixed bugs that caused some alerts to be skipped during processing. + +## Technical Improvements + +* Implemented `is_ip_in_whitelist()` function to correctly handle all forms of whitelist entries (exact IPs, prefixes, CIDR). +* Added `sanitize_text()` function to remove emojis and special characters from messages. +* Fixed double JSON parsing in `read_json()`. +* Replaced `break` with `continue` to ensure all alerts are processed. +* Improved logging system with categorization (`BLOCKED`, `UPDATED`, `ERROR`). +* Added full IPv6 support in whitelist management. + +## Bug Fixes + +* Fixed the issue that prevented recognition of CIDR format subnets in the whitelist. +* Fixed a bug that caused failure to process multiple events in a single cycle. +* Fixed crashes during processing of PAW rules containing emojis. + +## Upgrade Instructions + +1. **Backup Existing Configurations** + Backup all configuration files: + + ```bash + sudo cp /usr/local/bin/mikrocataTZSP\*.py /usr/local/bin/mikrocataTZSP\*.py.bak + sudo mkdir -p /var/lib/mikrocata/backup-$(date +%Y%m%d) + sudo cp /var/lib/mikrocata/\* /var/lib/mikrocata/backup-$(date +%Y%m%d)/ + ``` + +2. **Update Repository Files** + + ```bash + cd /path/to/mikrocata2selks + git pull + ``` + +3. **Install New Files** + + ```bash + sudo cp mikrocata.py /usr/local/bin/mikrocataTZSP0.py + ``` + + Repeat for each instance if you have more than one Mikrotik router. + + ```bash + sudo chmod +x /usr/local/bin/mikrocataTZSP0.py + ``` + +4. **Transfer Previous Configurations** + Open the backup file and the new one to manually copy your custom settings: + + ```bash + sudo nano /usr/local/bin/mikrocataTZSP0.py.bak + sudo nano /usr/local/bin/mikrocataTZSP0.py + ``` + + Make sure to transfer all of the following settings: + + * Mikrotik credentials (`USERNAME`, `PASSWORD`, `ROUTER_IP`) + * Telegram configuration + * Whitelist IPs + * SSL settings + * LISTEN_INTERFACE + * SELKS_CONTAINER_DATA_SURICATA_LOG + +5. **Restart Services** + + ```bash + sudo systemctl restart mikrocataTZSP0.service + ``` + + Repeat for each instance if you have more than one Mikrotik router. + +6. **Verify Operation** + ```bash + sudo journalctl -u mikrocataTZSP0.service -f + ``` + +## Important Notes for Upgrading + +* **Debug Mode:** To activate debug mode for troubleshooting, set `DEBUG_MODE = True` in the file settings. +* **CIDR Whitelist:** The new version supports CIDR notations (e.g., `"10.0.0.0/8"`) in the whitelist. +* **IPv6 Compatibility:** The whitelist management now works correctly for both IPv4 and IPv6. + +## What to Do in Case of Problems + +If you encounter issues after upgrading: + +* Check logs for any errors: + ```bash + sudo journalctl -u mikrocataTZSP0.service -n 100 + ``` +* Enable debug mode for more detailed logs: + ```bash + sudo nano /usr/local/bin/mikrocataTZSP0.py + ``` + Set `DEBUG_MODE = True` + ```bash + sudo systemctl restart mikrocataTZSP0.service + ``` +* If necessary, restore the previous version: + ```bash + sudo cp /usr/local/bin/mikrocataTZSP0.py.bak /usr/local/bin/mikrocataTZSP0.py + sudo systemctl restart mikrocataTZSP0.service + ``` +* Report the issue on GitHub with relevant logs. + +### v2.2.6 (March 4, 2025) +- **Performance:** Optimized Mikrotik address list saving to run every 5 minutes instead of on every alert +- **Stability:** Reduced router CPU load, particularly beneficial for high-traffic networks or routers with limited resources +- **System:** Added interval-based save mechanism with configurable timing (default: 300 seconds) + +### v2.2.5 (January, 2025) +- **Security:** Fixed SSL certificate management issues +- **Reliability:** Improved handling of certificate validation + +### v2.2.4 (December 2024) +- **Security:** Added support for self-signed certificates +- **Configuration:** Added new option `ALLOW_SELF_SIGNED_CERTS` for trusted environments + +### v2.2.3 (July 2024) +- **Feature:** Added IPv6 support (thanks to contributor: floridan95) +- **Configuration:** Added `ENABLE_IPV6` option to enable/disable IPv6 blocking + +### v2.2.2 +- **Fix:** Resolved issue with Telegram notifications not being delivered properly + +### v2.2.1 +- **Bugfix:** Fixed script crash during Suricata logrotate operations +- **Stability:** Improved file handling for log rotation events + +### v2.2 +- **Compatibility:** Added support for Debian 12 +- **System:** Updated installation scripts for newer package versions + +### v2.1 +- **Reliability:** Improved stability of the `read_json` function (thanks to contributor: bekhzad-khamidullaev) +- **Performance:** Better handling of malformed JSON data diff --git a/README.md b/README.md index ecb8218..405e756 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@

Welcome to Mikrocata2SELKS 👋

- Version + Version License: MIT @@ -149,37 +149,7 @@ flowchart TD ## 🔄 Changelog -### v2.2.6 (March 4, 2025) -- **Performance:** Optimized Mikrotik address list saving to run every 5 minutes instead of on every alert -- **Stability:** Reduced router CPU load, particularly beneficial for high-traffic networks or routers with limited resources -- **System:** Added interval-based save mechanism with configurable timing (default: 300 seconds) - -### v2.2.5 (January, 2025) -- **Security:** Fixed SSL certificate management issues -- **Reliability:** Improved handling of certificate validation - -### v2.2.4 (December 2024) -- **Security:** Added support for self-signed certificates -- **Configuration:** Added new option `ALLOW_SELF_SIGNED_CERTS` for trusted environments - -### v2.2.3 (July 2024) -- **Feature:** Added IPv6 support (thanks to contributor: floridan95) -- **Configuration:** Added `ENABLE_IPV6` option to enable/disable IPv6 blocking - -### v2.2.2 -- **Fix:** Resolved issue with Telegram notifications not being delivered properly - -### v2.2.1 -- **Bugfix:** Fixed script crash during Suricata logrotate operations -- **Stability:** Improved file handling for log rotation events - -### v2.2 -- **Compatibility:** Added support for Debian 12 -- **System:** Updated installation scripts for newer package versions - -### v2.1 -- **Reliability:** Improved stability of the `read_json` function (thanks to contributor: bekhzad-khamidullaev) -- **Performance:** Better handling of malformed JSON data +- View CHANGELOG.md ## 🔧 Troubleshooting diff --git a/mikrocata.py b/mikrocata.py index a0f5270..f569017 100644 --- a/mikrocata.py +++ b/mikrocata.py @@ -4,6 +4,7 @@ import ssl import os import socket import re +import ipaddress from time import sleep from datetime import datetime as dt import pyinotify @@ -14,6 +15,8 @@ from librouteros import connect from librouteros.query import Key import requests +VERSION = "2.3.0" # Updated April 2025 + # ------------------------------------------------------------------------------ ################# START EDIT SETTINGS @@ -33,8 +36,8 @@ TELEGRAM_CHATID = "CHATID" # You can add your WAN IP, so it doesn't get mistakenly blocked (don't leave empty string) WAN_IP = "yourpublicip" -LOCAL_IP_PREFIX = "192.168." -WHITELIST_IPS = (WAN_IP, LOCAL_IP_PREFIX, "127.0.0.1", "1.1.1.1", "8.8.8.8", "fe80:") +LOCAL_IP_PREFIX = "192.168.0.0/16" +WHITELIST_IPS = (WAN_IP, LOCAL_IP_PREFIX, "127.0.0.1", "1.1.1.1", "8.8.8.8", "fe80:", "10.0.0.0/8", "172.16.0.0/12") COMMENT_TIME_FORMAT = "%-d %b %Y %H:%M:%S.%f" # See datetime strftime formats. ENABLE_IPV6 = False @@ -46,6 +49,9 @@ SEVERITY=("1","2") # with self-signed certificates in trusted environments ALLOW_SELF_SIGNED_CERTS = False +# Enable debug mode for verbose logging +DEBUG_MODE = False + ################# END EDIT SETTINGS # ------------------------------------------------------------------------------ LISTEN_INTERFACE=("tzsp0") @@ -80,6 +86,59 @@ ignore_list = [] last_save_time = 0 SAVE_INTERVAL = 300 # Save lists every 5 minutes +def debug_log(message): + """Print message only if DEBUG_MODE is enabled""" + if DEBUG_MODE: + print(f"[Mikrocata-DEBUG] {message}") + +def sanitize_text(text): + """Remove emojis and other non-ASCII characters from text""" + if not text: + return "" + # Keep only ASCII characters (removes all emojis and special characters) + return ''.join(char for char in text if ord(char) < 128) + +def is_ip_in_whitelist(ip_to_check, whitelist): + """ + Check if an IP is in the whitelist, supporting both direct matches, + prefix matches, and CIDR notation. + """ + try: + # Convert the IP to check to an ipaddress object for CIDR matching + if ':' in ip_to_check: # IPv6 + ip_obj = ipaddress.IPv6Address(ip_to_check) + else: # IPv4 + ip_obj = ipaddress.IPv4Address(ip_to_check) + + for item in whitelist: + # Direct IP match + if ip_to_check == item: + debug_log(f"IP {ip_to_check} matches exact whitelist entry {item}") + return True + + # String prefix match (like "192.168.") + if isinstance(item, str) and not '/' in item and ip_to_check.startswith(item): + debug_log(f"IP {ip_to_check} matches prefix whitelist entry {item}") + return True + + # CIDR notation check (like "10.0.0.0/8") + if '/' in item: + try: + network = ipaddress.ip_network(item) + if ip_obj in network: + debug_log(f"IP {ip_to_check} is within CIDR whitelist range {item}") + return True + except ValueError: + print(f"[Mikrocata] Warning: Invalid CIDR notation in whitelist: {item}") + + debug_log(f"IP {ip_to_check} is not in any whitelist entry") + return False + + except ValueError as e: + debug_log(f"Error checking whitelist for IP {ip_to_check}: {e}") + # If we can't parse the IP, we should not whitelist it + return False + class EventHandler(pyinotify.ProcessEvent): def process_IN_MODIFY(self, event): if event.pathname == FILEPATH: @@ -113,7 +172,6 @@ def seek_to_end(fpath): sleep(10) continue - def read_json(fpath): global last_pos while True: @@ -125,7 +183,7 @@ def read_json(fpath): try: alert = json.loads(line) if alert.get('event_type') == 'alert': - alerts.append(json.loads(line)) + alerts.append(alert) # Fixed: don't json.loads again else: last_pos = f.tell() continue @@ -138,7 +196,6 @@ def read_json(fpath): sleep(10) continue - def add_to_tik(alerts): global last_pos global api @@ -148,98 +205,207 @@ def add_to_tik(alerts): _id = Key(".id") _list = Key("list") - address_list = api.path("/ip/firewall/address-list") - address_list_v6 = api.path("/ipv6/firewall/address-list") - resources = api.path("system/resource") - - # Remove duplicate src_ips. - for event in {item['src_ip']: item for item in alerts}.values(): + if DEBUG_MODE: + print(f"[Mikrocata] Processing {len(alerts)} alert events") + + if not alerts: + debug_log("No alerts to process") + return + + try: + address_list = api.path("/ip/firewall/address-list") + address_list_v6 = api.path("/ipv6/firewall/address-list") + resources = api.path("system/resource") + debug_log("Successfully connected to Mikrotik API paths") + except Exception as e: + print(f"[Mikrocata] Error connecting to Mikrotik API: {str(e)}") + raise + # Remove duplicate src_ips + unique_alerts = {item['src_ip']: item for item in alerts}.values() + debug_log(f"Processing {len(unique_alerts)} unique source IPs from alerts") + + for event in unique_alerts: + debug_log(f"Processing alert: SID={event['alert']['signature_id']}, Severity={event['alert']['severity']}") + + # Check alert severity if str(event["alert"]["severity"]) not in SEVERITY: - print("pass severity: " + str(event["alert"]["severity"])) - break + print(f"[Mikrocata] Skipping alert SID={event['alert']['signature_id']} (severity {event['alert']['severity']})") + continue + # Check interface if str(event["in_iface"]) not in LISTEN_INTERFACE: - break + debug_log(f"Skipping alert from interface {event['in_iface']}") + continue - if not in_ignore_list(ignore_list, event): + # Check if in ignore list + if in_ignore_list(ignore_list, event): + print(f"[Mikrocata] Skipping alert {event['alert']['signature_id']} - in ignore list") + continue + + debug_log(f"Alert passed all filters, preparing to add to MikroTik") + + try: timestamp = dt.strptime(event["timestamp"], - "%Y-%m-%dT%H:%M:%S.%f%z").strftime( - COMMENT_TIME_FORMAT) - is_v6 = ':' in event["src_ip"] - curr_list = address_list - if ENABLE_IPV6 and is_v6: - curr_list = address_list_v6 - if event["src_ip"].startswith(WHITELIST_IPS): - if event["dest_ip"].startswith(WHITELIST_IPS): - continue + "%Y-%m-%dT%H:%M:%S.%f%z").strftime( + COMMENT_TIME_FORMAT) + except Exception as e: + debug_log(f"Error parsing timestamp {event['timestamp']}: {str(e)}") + timestamp = dt.now().strftime(COMMENT_TIME_FORMAT) + + # Determine if IPv6 + is_v6 = ':' in event["src_ip"] + curr_list = address_list + if ENABLE_IPV6 and is_v6: + debug_log(f"IPv6 address detected: {event['src_ip']}") + curr_list = address_list_v6 + + # Check whitelist with improved function + if is_ip_in_whitelist(event["src_ip"], WHITELIST_IPS): + debug_log(f"Source IP {event['src_ip']} in whitelist") + if is_ip_in_whitelist(event["dest_ip"], WHITELIST_IPS): + debug_log(f"Destination IP {event['dest_ip']} also in whitelist - skipping alert") + continue - wanted_ip, wanted_port = event["dest_ip"], event.get("src_port") - src_ip, src_port = event["src_ip"], event.get("dest_port") + wanted_ip, wanted_port = event["dest_ip"], event.get("src_port") + src_ip, src_port = event["src_ip"], event.get("dest_port") + debug_log(f"Source IP in whitelist, targeting destination: {wanted_ip}") + else: + wanted_ip, wanted_port = event["src_ip"], event.get("dest_port") + src_ip, src_port = event["dest_ip"], event.get("src_port") + debug_log(f"Targeting source IP: {wanted_ip}") - else: - wanted_ip, wanted_port = event["src_ip"], event.get("dest_port") - src_ip, src_port = event["dest_ip"], event.get("src_port") + # Check if target IP is in whitelist + if is_ip_in_whitelist(wanted_ip, WHITELIST_IPS): + print(f"[Mikrocata] Skipping: target IP {wanted_ip} is in whitelist") + continue - try: - cmnt=f"""[{event['alert']['gid']}:{ + try: + # Log original signature before sanitizing + original_signature = event['alert']['signature'] + debug_log(f"Original signature: {original_signature}") + + # Sanitize the signature to remove emojis and special characters + signature = sanitize_text(original_signature) + debug_log(f"Sanitized signature: {signature}") + + # If significant information was lost in sanitization, log a warning + if len(signature) < len(original_signature) * 0.7: # If more than 30% of chars were removed + debug_log(f"WARNING: Significant information lost during sanitization!") + + cmnt = f"""[{event['alert']['gid']}:{ + event['alert']['signature_id']}] { + signature} ::: Port: { + wanted_port}/{ + event['proto']} ::: timestamp: { + timestamp}""" + + debug_log(f"Adding to list '{BLOCK_LIST_NAME}': IP={wanted_ip}, Timeout={TIMEOUT}") + debug_log(f"Comment: {cmnt}") + + curr_list.add(list=BLOCK_LIST_NAME, + address=wanted_ip, + comment=cmnt, + timeout=TIMEOUT) + + print(f"[Mikrocata] BLOCKED: {wanted_ip} - SID:{event['alert']['signature_id']} - Severity:{event['alert']['severity']}") + + # Telegram notifications + if enable_telegram: + debug_log("Telegram notifications enabled, sending message") + clean_message = sanitize_text(f"From: {wanted_ip}\nTo: {src_ip}:{str(wanted_port)}\nRule: {cmnt}") + response = requests.get(sendTelegram(clean_message)) + debug_log(f"Telegram API response: {response.status_code}") + + except librouteros.exceptions.TrapError as e: + debug_log(f"MikroTik TrapError: {str(e)}") + + if "failure: already have such entry" in str(e): + debug_log(f"IP {wanted_ip} already exists in list {BLOCK_LIST_NAME}, updating entry") + + # Find and remove existing entry + existing_entries = list(curr_list.select(_id, _list, _address).where( + _address == wanted_ip, + _list == BLOCK_LIST_NAME)) + + debug_log(f"Found {len(existing_entries)} existing entries for {wanted_ip}") + + for row in existing_entries: + debug_log(f"Removing existing entry with ID {row['.id']}") + curr_list.remove(row[".id"]) + + # Sanitize the signature here too + signature = sanitize_text(event['alert']['signature']) + + # Add updated entry + updated_comment = f"""[{event['alert']['gid']}:{ event['alert']['signature_id']}] { - event['alert']['signature']} ::: Port: { + signature} ::: Port: { wanted_port}/{ event['proto']} ::: timestamp: { timestamp}""" - + + debug_log(f"Re-adding IP {wanted_ip} with updated comment") curr_list.add(list=BLOCK_LIST_NAME, - address=wanted_ip, - comment=cmnt, - timeout=TIMEOUT) + address=wanted_ip, + comment=updated_comment, + timeout=TIMEOUT) + print(f"[Mikrocata] UPDATED: {wanted_ip} - SID:{event['alert']['signature_id']}") - print(f"[Mikrocata] new ip added: {cmnt}") - if enable_telegram == True: - print(requests.get(sendTelegram("From: " + wanted_ip + "\nTo: " + src_ip + ":" + str(wanted_port) + "\nRule: " + cmnt)).json()) + else: + print(f"[Mikrocata] ERROR: TrapError: {str(e)}") + raise + except socket.timeout as e: + print(f"[Mikrocata] Socket timeout: {str(e)}, reconnecting...") + connect_to_tik() + + except Exception as e: + print(f"[Mikrocata] ERROR: {type(e).__name__} while processing {wanted_ip}: {str(e)}") + if DEBUG_MODE: + import traceback + print(f"[Mikrocata] Traceback: {traceback.format_exc()}") + continue - except librouteros.exceptions.TrapError as e: - if "failure: already have such entry" in str(e): - for row in curr_list.select(_id, _list, _address).where( - _address == wanted_ip, - _list == BLOCK_LIST_NAME): - curr_list.remove(row[".id"]) - - curr_list.add(list=BLOCK_LIST_NAME, - address=wanted_ip, - comment=f"""[{event['alert']['gid']}:{ - event['alert']['signature_id']}] { - event['alert']['signature']} ::: Port: { - wanted_port}/{ - event['proto']} ::: timestamp: { - timestamp}""", - timeout=TIMEOUT) - - else: - raise - - except socket.timeout: - connect_to_tik() - - # If router has been rebooted add saved list(s), then save lists to a file. - # Only save lists and check uptime every x minutes + # Save lists and check uptime every x minutes current_time = int(dt.now().timestamp()) - if current_time - last_save_time >= SAVE_INTERVAL: + time_since_last_save = current_time - last_save_time + debug_log(f"Time since last save: {time_since_last_save} seconds (interval: {SAVE_INTERVAL} seconds)") + + if time_since_last_save >= SAVE_INTERVAL: + debug_log(f"Save interval reached, saving lists and checking router uptime") last_save_time = current_time # Check router uptime and restore lists if needed - if check_tik_uptime(resources): - add_saved_lists(address_list) - if ENABLE_IPV6: - add_saved_lists(address_list_v6, True) + debug_log("Checking MikroTik uptime") + uptime_check = check_tik_uptime(resources) + + if uptime_check: + print("[Mikrocata] Router rebooted - restoring saved lists") + try: + add_saved_lists(address_list) + debug_log("Successfully restored IPv4 address lists") + if ENABLE_IPV6: + add_saved_lists(address_list_v6, True) + debug_log("Successfully restored IPv6 address lists") + except Exception as e: + print(f"[Mikrocata] ERROR: Failed to restore lists: {str(e)}") + else: + debug_log("Router has not been rebooted, no need to restore lists") # Save current lists to file - save_lists(address_list) - if ENABLE_IPV6: - save_lists(address_list_v6, True) + try: + save_lists(address_list) + debug_log("Successfully saved IPv4 address lists") + if ENABLE_IPV6: + save_lists(address_list_v6, True) + debug_log("Successfully saved IPv6 address lists") + except Exception as e: + print(f"[Mikrocata] ERROR: Failed to save lists: {str(e)}") - print("[Mikrocata] Lists saved") + debug_log("Lists saved successfully") + + debug_log("Alert processing completed") def check_tik_uptime(resources): @@ -425,13 +591,22 @@ def sendTelegram(message): telegram_url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage?chat_id={TELEGRAM_CHATID}&text={message}&disable_web_page_preview=true&parse_mode=html" try: response = requests.get(telegram_url) - print(response.json()) + debug_log(f"Telegram response: {response.json()}") + return telegram_url except Exception as e: - print(f"Failed to send Telegram message: {e}") - return telegram_url + print(f"[Mikrocata] Failed to send Telegram message: {e}") + return "" def main(): + + print(f"[Mikrocata] Starting Mikrocata2SELKS v{VERSION}") + + if DEBUG_MODE: + print("[Mikrocata] Starting in DEBUG mode - verbose logging enabled") + else: + print("[Mikrocata] Starting in normal mode") + seek_to_end(FILEPATH) connect_to_tik() read_ignore_list(IGNORE_LIST_LOCATION) @@ -446,21 +621,33 @@ def main(): notifier = pyinotify.Notifier(wm, handler) wm.add_watch(directory_to_monitor, pyinotify.IN_CREATE | pyinotify.IN_MODIFY | pyinotify.IN_DELETE, rec=False) + print(f"[Mikrocata] Monitoring {FILEPATH} for alerts") + print(f"[Mikrocata] Whitelist configured for: {WHITELIST_IPS}") + print(f"[Mikrocata] Configured to process alerts with severity: {SEVERITY}") + while True: try: notifier.loop() except (librouteros.exceptions.ConnectionClosed, socket.timeout) as e: - print(f"[Mikrocata] (4) {str(e)}") + print(f"[Mikrocata] Connection error: {str(e)}") connect_to_tik() continue except librouteros.exceptions.TrapError as e: - print(f"[Mikrocata] (8) librouteros.TrapError: {str(e)}") + print(f"[Mikrocata] TrapError: {str(e)}") continue except KeyError as e: - print(f"[Mikrocata] (8) KeyError: {str(e)}") + print(f"[Mikrocata] KeyError: {str(e)}") + continue + + except Exception as e: + print(f"[Mikrocata] Unexpected error: {str(e)}") + if DEBUG_MODE: + import traceback + print(f"[Mikrocata] Traceback: {traceback.format_exc()}") + sleep(5) continue if __name__ == "__main__":