v2.3.0: Add CIDR whitelist support and fix emoji handling

This commit is contained in:
root 2025-04-18 19:39:29 +00:00
parent 9c2d6bf90e
commit 7f466177e1
3 changed files with 413 additions and 111 deletions

145
CHANGELOG.md Normal file
View File

@ -0,0 +1,145 @@
# Changelog - v2.3.0 (April 2025)
## Key Features
* **Advanced Whitelist Support:** Added full support for CIDR notations (e.g., `"10.0.0.0/8"`) in whitelist.
* **Emoji and Special Characters Handling:** Fixed issues with rules containing emojis or Unicode characters that caused crashes.
* **Debug Mode:** Added debug mode to facilitate troubleshooting.
* **Optimized Logging:** More concise and informative log messages in normal mode.
* **Critical Fixes:** Fixed bugs that caused some alerts to be skipped during processing.
## Technical Improvements
* Implemented `is_ip_in_whitelist()` function to correctly handle all forms of whitelist entries (exact IPs, prefixes, CIDR).
* Added `sanitize_text()` function to remove emojis and special characters from messages.
* Fixed double JSON parsing in `read_json()`.
* Replaced `break` with `continue` to ensure all alerts are processed.
* Improved logging system with categorization (`BLOCKED`, `UPDATED`, `ERROR`).
* Added full IPv6 support in whitelist management.
## Bug Fixes
* Fixed the issue that prevented recognition of CIDR format subnets in the whitelist.
* Fixed a bug that caused failure to process multiple events in a single cycle.
* Fixed crashes during processing of PAW rules containing emojis.
## Upgrade Instructions
1. **Backup Existing Configurations**
Backup all configuration files:
```bash
sudo cp /usr/local/bin/mikrocataTZSP\*.py /usr/local/bin/mikrocataTZSP\*.py.bak
sudo mkdir -p /var/lib/mikrocata/backup-$(date +%Y%m%d)
sudo cp /var/lib/mikrocata/\* /var/lib/mikrocata/backup-$(date +%Y%m%d)/
```
2. **Update Repository Files**
```bash
cd /path/to/mikrocata2selks
git pull
```
3. **Install New Files**
```bash
sudo cp mikrocata.py /usr/local/bin/mikrocataTZSP0.py
```
Repeat for each instance if you have more than one Mikrotik router.
```bash
sudo chmod +x /usr/local/bin/mikrocataTZSP0.py
```
4. **Transfer Previous Configurations**
Open the backup file and the new one to manually copy your custom settings:
```bash
sudo nano /usr/local/bin/mikrocataTZSP0.py.bak
sudo nano /usr/local/bin/mikrocataTZSP0.py
```
Make sure to transfer all of the following settings:
* Mikrotik credentials (`USERNAME`, `PASSWORD`, `ROUTER_IP`)
* Telegram configuration
* Whitelist IPs
* SSL settings
* LISTEN_INTERFACE
* SELKS_CONTAINER_DATA_SURICATA_LOG
5. **Restart Services**
```bash
sudo systemctl restart mikrocataTZSP0.service
```
Repeat for each instance if you have more than one Mikrotik router.
6. **Verify Operation**
```bash
sudo journalctl -u mikrocataTZSP0.service -f
```
## Important Notes for Upgrading
* **Debug Mode:** To activate debug mode for troubleshooting, set `DEBUG_MODE = True` in the file settings.
* **CIDR Whitelist:** The new version supports CIDR notations (e.g., `"10.0.0.0/8"`) in the whitelist.
* **IPv6 Compatibility:** The whitelist management now works correctly for both IPv4 and IPv6.
## What to Do in Case of Problems
If you encounter issues after upgrading:
* Check logs for any errors:
```bash
sudo journalctl -u mikrocataTZSP0.service -n 100
```
* Enable debug mode for more detailed logs:
```bash
sudo nano /usr/local/bin/mikrocataTZSP0.py
```
Set `DEBUG_MODE = True`
```bash
sudo systemctl restart mikrocataTZSP0.service
```
* If necessary, restore the previous version:
```bash
sudo cp /usr/local/bin/mikrocataTZSP0.py.bak /usr/local/bin/mikrocataTZSP0.py
sudo systemctl restart mikrocataTZSP0.service
```
* Report the issue on GitHub with relevant logs.
### v2.2.6 (March 4, 2025)
- **Performance:** Optimized Mikrotik address list saving to run every 5 minutes instead of on every alert
- **Stability:** Reduced router CPU load, particularly beneficial for high-traffic networks or routers with limited resources
- **System:** Added interval-based save mechanism with configurable timing (default: 300 seconds)
### v2.2.5 (January, 2025)
- **Security:** Fixed SSL certificate management issues
- **Reliability:** Improved handling of certificate validation
### v2.2.4 (December 2024)
- **Security:** Added support for self-signed certificates
- **Configuration:** Added new option `ALLOW_SELF_SIGNED_CERTS` for trusted environments
### v2.2.3 (July 2024)
- **Feature:** Added IPv6 support (thanks to contributor: floridan95)
- **Configuration:** Added `ENABLE_IPV6` option to enable/disable IPv6 blocking
### v2.2.2
- **Fix:** Resolved issue with Telegram notifications not being delivered properly
### v2.2.1
- **Bugfix:** Fixed script crash during Suricata logrotate operations
- **Stability:** Improved file handling for log rotation events
### v2.2
- **Compatibility:** Added support for Debian 12
- **System:** Updated installation scripts for newer package versions
### v2.1
- **Reliability:** Improved stability of the `read_json` function (thanks to contributor: bekhzad-khamidullaev)
- **Performance:** Better handling of malformed JSON data

View File

@ -1,6 +1,6 @@
<h1 align="center">Welcome to Mikrocata2SELKS 👋</h1> <h1 align="center">Welcome to Mikrocata2SELKS 👋</h1>
<p> <p>
<img alt="Version" src="https://img.shields.io/badge/version-2.2.6-blue.svg?cacheSeconds=2592000" /> <img alt="Version" src="https://img.shields.io/badge/version-2.3.0-blue.svg?cacheSeconds=2592000" />
<a href="https://github.com/angolo40/mikrocata2selks" target="_blank"> <a href="https://github.com/angolo40/mikrocata2selks" target="_blank">
<img alt="License: MIT" src="https://img.shields.io/github/license/angolo40/Mikrocata2SELKS" /> <img alt="License: MIT" src="https://img.shields.io/github/license/angolo40/Mikrocata2SELKS" />
</a> </a>
@ -149,37 +149,7 @@ flowchart TD
## 🔄 Changelog ## 🔄 Changelog
### v2.2.6 (March 4, 2025) - View CHANGELOG.md
- **Performance:** Optimized Mikrotik address list saving to run every 5 minutes instead of on every alert
- **Stability:** Reduced router CPU load, particularly beneficial for high-traffic networks or routers with limited resources
- **System:** Added interval-based save mechanism with configurable timing (default: 300 seconds)
### v2.2.5 (January, 2025)
- **Security:** Fixed SSL certificate management issues
- **Reliability:** Improved handling of certificate validation
### v2.2.4 (December 2024)
- **Security:** Added support for self-signed certificates
- **Configuration:** Added new option `ALLOW_SELF_SIGNED_CERTS` for trusted environments
### v2.2.3 (July 2024)
- **Feature:** Added IPv6 support (thanks to contributor: floridan95)
- **Configuration:** Added `ENABLE_IPV6` option to enable/disable IPv6 blocking
### v2.2.2
- **Fix:** Resolved issue with Telegram notifications not being delivered properly
### v2.2.1
- **Bugfix:** Fixed script crash during Suricata logrotate operations
- **Stability:** Improved file handling for log rotation events
### v2.2
- **Compatibility:** Added support for Debian 12
- **System:** Updated installation scripts for newer package versions
### v2.1
- **Reliability:** Improved stability of the `read_json` function (thanks to contributor: bekhzad-khamidullaev)
- **Performance:** Better handling of malformed JSON data
## 🔧 Troubleshooting ## 🔧 Troubleshooting

View File

@ -4,6 +4,7 @@ import ssl
import os import os
import socket import socket
import re import re
import ipaddress
from time import sleep from time import sleep
from datetime import datetime as dt from datetime import datetime as dt
import pyinotify import pyinotify
@ -14,6 +15,8 @@ from librouteros import connect
from librouteros.query import Key from librouteros.query import Key
import requests import requests
VERSION = "2.3.0" # Updated April 2025
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
################# START EDIT SETTINGS ################# START EDIT SETTINGS
@ -33,8 +36,8 @@ TELEGRAM_CHATID = "CHATID"
# You can add your WAN IP, so it doesn't get mistakenly blocked (don't leave empty string) # You can add your WAN IP, so it doesn't get mistakenly blocked (don't leave empty string)
WAN_IP = "yourpublicip" WAN_IP = "yourpublicip"
LOCAL_IP_PREFIX = "192.168." LOCAL_IP_PREFIX = "192.168.0.0/16"
WHITELIST_IPS = (WAN_IP, LOCAL_IP_PREFIX, "127.0.0.1", "1.1.1.1", "8.8.8.8", "fe80:") WHITELIST_IPS = (WAN_IP, LOCAL_IP_PREFIX, "127.0.0.1", "1.1.1.1", "8.8.8.8", "fe80:", "10.0.0.0/8", "172.16.0.0/12")
COMMENT_TIME_FORMAT = "%-d %b %Y %H:%M:%S.%f" # See datetime strftime formats. COMMENT_TIME_FORMAT = "%-d %b %Y %H:%M:%S.%f" # See datetime strftime formats.
ENABLE_IPV6 = False ENABLE_IPV6 = False
@ -46,6 +49,9 @@ SEVERITY=("1","2")
# with self-signed certificates in trusted environments # with self-signed certificates in trusted environments
ALLOW_SELF_SIGNED_CERTS = False ALLOW_SELF_SIGNED_CERTS = False
# Enable debug mode for verbose logging
DEBUG_MODE = False
################# END EDIT SETTINGS ################# END EDIT SETTINGS
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
LISTEN_INTERFACE=("tzsp0") LISTEN_INTERFACE=("tzsp0")
@ -80,6 +86,59 @@ ignore_list = []
last_save_time = 0 last_save_time = 0
SAVE_INTERVAL = 300 # Save lists every 5 minutes SAVE_INTERVAL = 300 # Save lists every 5 minutes
def debug_log(message):
"""Print message only if DEBUG_MODE is enabled"""
if DEBUG_MODE:
print(f"[Mikrocata-DEBUG] {message}")
def sanitize_text(text):
"""Remove emojis and other non-ASCII characters from text"""
if not text:
return ""
# Keep only ASCII characters (removes all emojis and special characters)
return ''.join(char for char in text if ord(char) < 128)
def is_ip_in_whitelist(ip_to_check, whitelist):
"""
Check if an IP is in the whitelist, supporting both direct matches,
prefix matches, and CIDR notation.
"""
try:
# Convert the IP to check to an ipaddress object for CIDR matching
if ':' in ip_to_check: # IPv6
ip_obj = ipaddress.IPv6Address(ip_to_check)
else: # IPv4
ip_obj = ipaddress.IPv4Address(ip_to_check)
for item in whitelist:
# Direct IP match
if ip_to_check == item:
debug_log(f"IP {ip_to_check} matches exact whitelist entry {item}")
return True
# String prefix match (like "192.168.")
if isinstance(item, str) and not '/' in item and ip_to_check.startswith(item):
debug_log(f"IP {ip_to_check} matches prefix whitelist entry {item}")
return True
# CIDR notation check (like "10.0.0.0/8")
if '/' in item:
try:
network = ipaddress.ip_network(item)
if ip_obj in network:
debug_log(f"IP {ip_to_check} is within CIDR whitelist range {item}")
return True
except ValueError:
print(f"[Mikrocata] Warning: Invalid CIDR notation in whitelist: {item}")
debug_log(f"IP {ip_to_check} is not in any whitelist entry")
return False
except ValueError as e:
debug_log(f"Error checking whitelist for IP {ip_to_check}: {e}")
# If we can't parse the IP, we should not whitelist it
return False
class EventHandler(pyinotify.ProcessEvent): class EventHandler(pyinotify.ProcessEvent):
def process_IN_MODIFY(self, event): def process_IN_MODIFY(self, event):
if event.pathname == FILEPATH: if event.pathname == FILEPATH:
@ -113,7 +172,6 @@ def seek_to_end(fpath):
sleep(10) sleep(10)
continue continue
def read_json(fpath): def read_json(fpath):
global last_pos global last_pos
while True: while True:
@ -125,7 +183,7 @@ def read_json(fpath):
try: try:
alert = json.loads(line) alert = json.loads(line)
if alert.get('event_type') == 'alert': if alert.get('event_type') == 'alert':
alerts.append(json.loads(line)) alerts.append(alert) # Fixed: don't json.loads again
else: else:
last_pos = f.tell() last_pos = f.tell()
continue continue
@ -138,7 +196,6 @@ def read_json(fpath):
sleep(10) sleep(10)
continue continue
def add_to_tik(alerts): def add_to_tik(alerts):
global last_pos global last_pos
global api global api
@ -148,98 +205,207 @@ def add_to_tik(alerts):
_id = Key(".id") _id = Key(".id")
_list = Key("list") _list = Key("list")
address_list = api.path("/ip/firewall/address-list") if DEBUG_MODE:
address_list_v6 = api.path("/ipv6/firewall/address-list") print(f"[Mikrocata] Processing {len(alerts)} alert events")
resources = api.path("system/resource")
if not alerts:
# Remove duplicate src_ips. debug_log("No alerts to process")
for event in {item['src_ip']: item for item in alerts}.values(): return
try:
address_list = api.path("/ip/firewall/address-list")
address_list_v6 = api.path("/ipv6/firewall/address-list")
resources = api.path("system/resource")
debug_log("Successfully connected to Mikrotik API paths")
except Exception as e:
print(f"[Mikrocata] Error connecting to Mikrotik API: {str(e)}")
raise
# Remove duplicate src_ips
unique_alerts = {item['src_ip']: item for item in alerts}.values()
debug_log(f"Processing {len(unique_alerts)} unique source IPs from alerts")
for event in unique_alerts:
debug_log(f"Processing alert: SID={event['alert']['signature_id']}, Severity={event['alert']['severity']}")
# Check alert severity
if str(event["alert"]["severity"]) not in SEVERITY: if str(event["alert"]["severity"]) not in SEVERITY:
print("pass severity: " + str(event["alert"]["severity"])) print(f"[Mikrocata] Skipping alert SID={event['alert']['signature_id']} (severity {event['alert']['severity']})")
break continue
# Check interface
if str(event["in_iface"]) not in LISTEN_INTERFACE: if str(event["in_iface"]) not in LISTEN_INTERFACE:
break debug_log(f"Skipping alert from interface {event['in_iface']}")
continue
if not in_ignore_list(ignore_list, event): # Check if in ignore list
if in_ignore_list(ignore_list, event):
print(f"[Mikrocata] Skipping alert {event['alert']['signature_id']} - in ignore list")
continue
debug_log(f"Alert passed all filters, preparing to add to MikroTik")
try:
timestamp = dt.strptime(event["timestamp"], timestamp = dt.strptime(event["timestamp"],
"%Y-%m-%dT%H:%M:%S.%f%z").strftime( "%Y-%m-%dT%H:%M:%S.%f%z").strftime(
COMMENT_TIME_FORMAT) COMMENT_TIME_FORMAT)
is_v6 = ':' in event["src_ip"] except Exception as e:
curr_list = address_list debug_log(f"Error parsing timestamp {event['timestamp']}: {str(e)}")
if ENABLE_IPV6 and is_v6: timestamp = dt.now().strftime(COMMENT_TIME_FORMAT)
curr_list = address_list_v6
if event["src_ip"].startswith(WHITELIST_IPS): # Determine if IPv6
if event["dest_ip"].startswith(WHITELIST_IPS): is_v6 = ':' in event["src_ip"]
continue curr_list = address_list
if ENABLE_IPV6 and is_v6:
debug_log(f"IPv6 address detected: {event['src_ip']}")
curr_list = address_list_v6
# Check whitelist with improved function
if is_ip_in_whitelist(event["src_ip"], WHITELIST_IPS):
debug_log(f"Source IP {event['src_ip']} in whitelist")
if is_ip_in_whitelist(event["dest_ip"], WHITELIST_IPS):
debug_log(f"Destination IP {event['dest_ip']} also in whitelist - skipping alert")
continue
wanted_ip, wanted_port = event["dest_ip"], event.get("src_port") wanted_ip, wanted_port = event["dest_ip"], event.get("src_port")
src_ip, src_port = event["src_ip"], event.get("dest_port") src_ip, src_port = event["src_ip"], event.get("dest_port")
debug_log(f"Source IP in whitelist, targeting destination: {wanted_ip}")
else:
wanted_ip, wanted_port = event["src_ip"], event.get("dest_port")
src_ip, src_port = event["dest_ip"], event.get("src_port")
debug_log(f"Targeting source IP: {wanted_ip}")
else: # Check if target IP is in whitelist
wanted_ip, wanted_port = event["src_ip"], event.get("dest_port") if is_ip_in_whitelist(wanted_ip, WHITELIST_IPS):
src_ip, src_port = event["dest_ip"], event.get("src_port") print(f"[Mikrocata] Skipping: target IP {wanted_ip} is in whitelist")
continue
try: try:
cmnt=f"""[{event['alert']['gid']}:{ # Log original signature before sanitizing
original_signature = event['alert']['signature']
debug_log(f"Original signature: {original_signature}")
# Sanitize the signature to remove emojis and special characters
signature = sanitize_text(original_signature)
debug_log(f"Sanitized signature: {signature}")
# If significant information was lost in sanitization, log a warning
if len(signature) < len(original_signature) * 0.7: # If more than 30% of chars were removed
debug_log(f"WARNING: Significant information lost during sanitization!")
cmnt = f"""[{event['alert']['gid']}:{
event['alert']['signature_id']}] {
signature} ::: Port: {
wanted_port}/{
event['proto']} ::: timestamp: {
timestamp}"""
debug_log(f"Adding to list '{BLOCK_LIST_NAME}': IP={wanted_ip}, Timeout={TIMEOUT}")
debug_log(f"Comment: {cmnt}")
curr_list.add(list=BLOCK_LIST_NAME,
address=wanted_ip,
comment=cmnt,
timeout=TIMEOUT)
print(f"[Mikrocata] BLOCKED: {wanted_ip} - SID:{event['alert']['signature_id']} - Severity:{event['alert']['severity']}")
# Telegram notifications
if enable_telegram:
debug_log("Telegram notifications enabled, sending message")
clean_message = sanitize_text(f"From: {wanted_ip}\nTo: {src_ip}:{str(wanted_port)}\nRule: {cmnt}")
response = requests.get(sendTelegram(clean_message))
debug_log(f"Telegram API response: {response.status_code}")
except librouteros.exceptions.TrapError as e:
debug_log(f"MikroTik TrapError: {str(e)}")
if "failure: already have such entry" in str(e):
debug_log(f"IP {wanted_ip} already exists in list {BLOCK_LIST_NAME}, updating entry")
# Find and remove existing entry
existing_entries = list(curr_list.select(_id, _list, _address).where(
_address == wanted_ip,
_list == BLOCK_LIST_NAME))
debug_log(f"Found {len(existing_entries)} existing entries for {wanted_ip}")
for row in existing_entries:
debug_log(f"Removing existing entry with ID {row['.id']}")
curr_list.remove(row[".id"])
# Sanitize the signature here too
signature = sanitize_text(event['alert']['signature'])
# Add updated entry
updated_comment = f"""[{event['alert']['gid']}:{
event['alert']['signature_id']}] { event['alert']['signature_id']}] {
event['alert']['signature']} ::: Port: { signature} ::: Port: {
wanted_port}/{ wanted_port}/{
event['proto']} ::: timestamp: { event['proto']} ::: timestamp: {
timestamp}""" timestamp}"""
debug_log(f"Re-adding IP {wanted_ip} with updated comment")
curr_list.add(list=BLOCK_LIST_NAME, curr_list.add(list=BLOCK_LIST_NAME,
address=wanted_ip, address=wanted_ip,
comment=cmnt, comment=updated_comment,
timeout=TIMEOUT) timeout=TIMEOUT)
print(f"[Mikrocata] UPDATED: {wanted_ip} - SID:{event['alert']['signature_id']}")
print(f"[Mikrocata] new ip added: {cmnt}") else:
if enable_telegram == True: print(f"[Mikrocata] ERROR: TrapError: {str(e)}")
print(requests.get(sendTelegram("From: " + wanted_ip + "\nTo: " + src_ip + ":" + str(wanted_port) + "\nRule: " + cmnt)).json()) raise
except socket.timeout as e:
print(f"[Mikrocata] Socket timeout: {str(e)}, reconnecting...")
connect_to_tik()
except Exception as e:
print(f"[Mikrocata] ERROR: {type(e).__name__} while processing {wanted_ip}: {str(e)}")
if DEBUG_MODE:
import traceback
print(f"[Mikrocata] Traceback: {traceback.format_exc()}")
continue
except librouteros.exceptions.TrapError as e: # Save lists and check uptime every x minutes
if "failure: already have such entry" in str(e):
for row in curr_list.select(_id, _list, _address).where(
_address == wanted_ip,
_list == BLOCK_LIST_NAME):
curr_list.remove(row[".id"])
curr_list.add(list=BLOCK_LIST_NAME,
address=wanted_ip,
comment=f"""[{event['alert']['gid']}:{
event['alert']['signature_id']}] {
event['alert']['signature']} ::: Port: {
wanted_port}/{
event['proto']} ::: timestamp: {
timestamp}""",
timeout=TIMEOUT)
else:
raise
except socket.timeout:
connect_to_tik()
# If router has been rebooted add saved list(s), then save lists to a file.
# Only save lists and check uptime every x minutes
current_time = int(dt.now().timestamp()) current_time = int(dt.now().timestamp())
if current_time - last_save_time >= SAVE_INTERVAL: time_since_last_save = current_time - last_save_time
debug_log(f"Time since last save: {time_since_last_save} seconds (interval: {SAVE_INTERVAL} seconds)")
if time_since_last_save >= SAVE_INTERVAL:
debug_log(f"Save interval reached, saving lists and checking router uptime")
last_save_time = current_time last_save_time = current_time
# Check router uptime and restore lists if needed # Check router uptime and restore lists if needed
if check_tik_uptime(resources): debug_log("Checking MikroTik uptime")
add_saved_lists(address_list) uptime_check = check_tik_uptime(resources)
if ENABLE_IPV6:
add_saved_lists(address_list_v6, True) if uptime_check:
print("[Mikrocata] Router rebooted - restoring saved lists")
try:
add_saved_lists(address_list)
debug_log("Successfully restored IPv4 address lists")
if ENABLE_IPV6:
add_saved_lists(address_list_v6, True)
debug_log("Successfully restored IPv6 address lists")
except Exception as e:
print(f"[Mikrocata] ERROR: Failed to restore lists: {str(e)}")
else:
debug_log("Router has not been rebooted, no need to restore lists")
# Save current lists to file # Save current lists to file
save_lists(address_list) try:
if ENABLE_IPV6: save_lists(address_list)
save_lists(address_list_v6, True) debug_log("Successfully saved IPv4 address lists")
if ENABLE_IPV6:
save_lists(address_list_v6, True)
debug_log("Successfully saved IPv6 address lists")
except Exception as e:
print(f"[Mikrocata] ERROR: Failed to save lists: {str(e)}")
print("[Mikrocata] Lists saved") debug_log("Lists saved successfully")
debug_log("Alert processing completed")
def check_tik_uptime(resources): def check_tik_uptime(resources):
@ -425,13 +591,22 @@ def sendTelegram(message):
telegram_url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage?chat_id={TELEGRAM_CHATID}&text={message}&disable_web_page_preview=true&parse_mode=html" telegram_url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage?chat_id={TELEGRAM_CHATID}&text={message}&disable_web_page_preview=true&parse_mode=html"
try: try:
response = requests.get(telegram_url) response = requests.get(telegram_url)
print(response.json()) debug_log(f"Telegram response: {response.json()}")
return telegram_url
except Exception as e: except Exception as e:
print(f"Failed to send Telegram message: {e}") print(f"[Mikrocata] Failed to send Telegram message: {e}")
return telegram_url return ""
def main(): def main():
print(f"[Mikrocata] Starting Mikrocata2SELKS v{VERSION}")
if DEBUG_MODE:
print("[Mikrocata] Starting in DEBUG mode - verbose logging enabled")
else:
print("[Mikrocata] Starting in normal mode")
seek_to_end(FILEPATH) seek_to_end(FILEPATH)
connect_to_tik() connect_to_tik()
read_ignore_list(IGNORE_LIST_LOCATION) read_ignore_list(IGNORE_LIST_LOCATION)
@ -446,21 +621,33 @@ def main():
notifier = pyinotify.Notifier(wm, handler) notifier = pyinotify.Notifier(wm, handler)
wm.add_watch(directory_to_monitor, pyinotify.IN_CREATE | pyinotify.IN_MODIFY | pyinotify.IN_DELETE, rec=False) wm.add_watch(directory_to_monitor, pyinotify.IN_CREATE | pyinotify.IN_MODIFY | pyinotify.IN_DELETE, rec=False)
print(f"[Mikrocata] Monitoring {FILEPATH} for alerts")
print(f"[Mikrocata] Whitelist configured for: {WHITELIST_IPS}")
print(f"[Mikrocata] Configured to process alerts with severity: {SEVERITY}")
while True: while True:
try: try:
notifier.loop() notifier.loop()
except (librouteros.exceptions.ConnectionClosed, socket.timeout) as e: except (librouteros.exceptions.ConnectionClosed, socket.timeout) as e:
print(f"[Mikrocata] (4) {str(e)}") print(f"[Mikrocata] Connection error: {str(e)}")
connect_to_tik() connect_to_tik()
continue continue
except librouteros.exceptions.TrapError as e: except librouteros.exceptions.TrapError as e:
print(f"[Mikrocata] (8) librouteros.TrapError: {str(e)}") print(f"[Mikrocata] TrapError: {str(e)}")
continue continue
except KeyError as e: except KeyError as e:
print(f"[Mikrocata] (8) KeyError: {str(e)}") print(f"[Mikrocata] KeyError: {str(e)}")
continue
except Exception as e:
print(f"[Mikrocata] Unexpected error: {str(e)}")
if DEBUG_MODE:
import traceback
print(f"[Mikrocata] Traceback: {traceback.format_exc()}")
sleep(5)
continue continue
if __name__ == "__main__": if __name__ == "__main__":