mirror of
https://github.com/angolo40/mikrocata2selks.git
synced 2025-05-09 15:07:22 +00:00
Merge pull request #30 from angolo40/v2.3.0-update
v2.3.0: Add CIDR whitelist support and fix emoji handling
This commit is contained in:
commit
7863881f8f
145
CHANGELOG.md
Normal file
145
CHANGELOG.md
Normal file
@ -0,0 +1,145 @@
|
||||
# Changelog - v2.3.0 (April 2025)
|
||||
|
||||
## Key Features
|
||||
|
||||
* **Advanced Whitelist Support:** Added full support for CIDR notations (e.g., `"10.0.0.0/8"`) in whitelist.
|
||||
* **Emoji and Special Characters Handling:** Fixed issues with rules containing emojis or Unicode characters that caused crashes.
|
||||
* **Debug Mode:** Added debug mode to facilitate troubleshooting.
|
||||
* **Optimized Logging:** More concise and informative log messages in normal mode.
|
||||
* **Critical Fixes:** Fixed bugs that caused some alerts to be skipped during processing.
|
||||
|
||||
## Technical Improvements
|
||||
|
||||
* Implemented `is_ip_in_whitelist()` function to correctly handle all forms of whitelist entries (exact IPs, prefixes, CIDR).
|
||||
* Added `sanitize_text()` function to remove emojis and special characters from messages.
|
||||
* Fixed double JSON parsing in `read_json()`.
|
||||
* Replaced `break` with `continue` to ensure all alerts are processed.
|
||||
* Improved logging system with categorization (`BLOCKED`, `UPDATED`, `ERROR`).
|
||||
* Added full IPv6 support in whitelist management.
|
||||
|
||||
## Bug Fixes
|
||||
|
||||
* Fixed the issue that prevented recognition of CIDR format subnets in the whitelist.
|
||||
* Fixed a bug that caused failure to process multiple events in a single cycle.
|
||||
* Fixed crashes during processing of PAW rules containing emojis.
|
||||
|
||||
## Upgrade Instructions
|
||||
|
||||
1. **Backup Existing Configurations**
|
||||
Backup all configuration files:
|
||||
|
||||
```bash
|
||||
sudo cp /usr/local/bin/mikrocataTZSP\*.py /usr/local/bin/mikrocataTZSP\*.py.bak
|
||||
sudo mkdir -p /var/lib/mikrocata/backup-$(date +%Y%m%d)
|
||||
sudo cp /var/lib/mikrocata/\* /var/lib/mikrocata/backup-$(date +%Y%m%d)/
|
||||
```
|
||||
|
||||
2. **Update Repository Files**
|
||||
|
||||
```bash
|
||||
cd /path/to/mikrocata2selks
|
||||
git pull
|
||||
```
|
||||
|
||||
3. **Install New Files**
|
||||
|
||||
```bash
|
||||
sudo cp mikrocata.py /usr/local/bin/mikrocataTZSP0.py
|
||||
```
|
||||
|
||||
Repeat for each instance if you have more than one Mikrotik router.
|
||||
|
||||
```bash
|
||||
sudo chmod +x /usr/local/bin/mikrocataTZSP0.py
|
||||
```
|
||||
|
||||
4. **Transfer Previous Configurations**
|
||||
Open the backup file and the new one to manually copy your custom settings:
|
||||
|
||||
```bash
|
||||
sudo nano /usr/local/bin/mikrocataTZSP0.py.bak
|
||||
sudo nano /usr/local/bin/mikrocataTZSP0.py
|
||||
```
|
||||
|
||||
Make sure to transfer all of the following settings:
|
||||
|
||||
* Mikrotik credentials (`USERNAME`, `PASSWORD`, `ROUTER_IP`)
|
||||
* Telegram configuration
|
||||
* Whitelist IPs
|
||||
* SSL settings
|
||||
* LISTEN_INTERFACE
|
||||
* SELKS_CONTAINER_DATA_SURICATA_LOG
|
||||
|
||||
5. **Restart Services**
|
||||
|
||||
```bash
|
||||
sudo systemctl restart mikrocataTZSP0.service
|
||||
```
|
||||
|
||||
Repeat for each instance if you have more than one Mikrotik router.
|
||||
|
||||
6. **Verify Operation**
|
||||
```bash
|
||||
sudo journalctl -u mikrocataTZSP0.service -f
|
||||
```
|
||||
|
||||
## Important Notes for Upgrading
|
||||
|
||||
* **Debug Mode:** To activate debug mode for troubleshooting, set `DEBUG_MODE = True` in the file settings.
|
||||
* **CIDR Whitelist:** The new version supports CIDR notations (e.g., `"10.0.0.0/8"`) in the whitelist.
|
||||
* **IPv6 Compatibility:** The whitelist management now works correctly for both IPv4 and IPv6.
|
||||
|
||||
## What to Do in Case of Problems
|
||||
|
||||
If you encounter issues after upgrading:
|
||||
|
||||
* Check logs for any errors:
|
||||
```bash
|
||||
sudo journalctl -u mikrocataTZSP0.service -n 100
|
||||
```
|
||||
* Enable debug mode for more detailed logs:
|
||||
```bash
|
||||
sudo nano /usr/local/bin/mikrocataTZSP0.py
|
||||
```
|
||||
Set `DEBUG_MODE = True`
|
||||
```bash
|
||||
sudo systemctl restart mikrocataTZSP0.service
|
||||
```
|
||||
* If necessary, restore the previous version:
|
||||
```bash
|
||||
sudo cp /usr/local/bin/mikrocataTZSP0.py.bak /usr/local/bin/mikrocataTZSP0.py
|
||||
sudo systemctl restart mikrocataTZSP0.service
|
||||
```
|
||||
* Report the issue on GitHub with relevant logs.
|
||||
|
||||
### v2.2.6 (March 4, 2025)
|
||||
- **Performance:** Optimized Mikrotik address list saving to run every 5 minutes instead of on every alert
|
||||
- **Stability:** Reduced router CPU load, particularly beneficial for high-traffic networks or routers with limited resources
|
||||
- **System:** Added interval-based save mechanism with configurable timing (default: 300 seconds)
|
||||
|
||||
### v2.2.5 (January, 2025)
|
||||
- **Security:** Fixed SSL certificate management issues
|
||||
- **Reliability:** Improved handling of certificate validation
|
||||
|
||||
### v2.2.4 (December 2024)
|
||||
- **Security:** Added support for self-signed certificates
|
||||
- **Configuration:** Added new option `ALLOW_SELF_SIGNED_CERTS` for trusted environments
|
||||
|
||||
### v2.2.3 (July 2024)
|
||||
- **Feature:** Added IPv6 support (thanks to contributor: floridan95)
|
||||
- **Configuration:** Added `ENABLE_IPV6` option to enable/disable IPv6 blocking
|
||||
|
||||
### v2.2.2
|
||||
- **Fix:** Resolved issue with Telegram notifications not being delivered properly
|
||||
|
||||
### v2.2.1
|
||||
- **Bugfix:** Fixed script crash during Suricata logrotate operations
|
||||
- **Stability:** Improved file handling for log rotation events
|
||||
|
||||
### v2.2
|
||||
- **Compatibility:** Added support for Debian 12
|
||||
- **System:** Updated installation scripts for newer package versions
|
||||
|
||||
### v2.1
|
||||
- **Reliability:** Improved stability of the `read_json` function (thanks to contributor: bekhzad-khamidullaev)
|
||||
- **Performance:** Better handling of malformed JSON data
|
34
README.md
34
README.md
@ -1,6 +1,6 @@
|
||||
<h1 align="center">Welcome to Mikrocata2SELKS 👋</h1>
|
||||
<p>
|
||||
<img alt="Version" src="https://img.shields.io/badge/version-2.2.6-blue.svg?cacheSeconds=2592000" />
|
||||
<img alt="Version" src="https://img.shields.io/badge/version-2.3.0-blue.svg?cacheSeconds=2592000" />
|
||||
<a href="https://github.com/angolo40/mikrocata2selks" target="_blank">
|
||||
<img alt="License: MIT" src="https://img.shields.io/github/license/angolo40/Mikrocata2SELKS" />
|
||||
</a>
|
||||
@ -149,37 +149,7 @@ flowchart TD
|
||||
|
||||
## 🔄 Changelog
|
||||
|
||||
### v2.2.6 (March 4, 2025)
|
||||
- **Performance:** Optimized Mikrotik address list saving to run every 5 minutes instead of on every alert
|
||||
- **Stability:** Reduced router CPU load, particularly beneficial for high-traffic networks or routers with limited resources
|
||||
- **System:** Added interval-based save mechanism with configurable timing (default: 300 seconds)
|
||||
|
||||
### v2.2.5 (January, 2025)
|
||||
- **Security:** Fixed SSL certificate management issues
|
||||
- **Reliability:** Improved handling of certificate validation
|
||||
|
||||
### v2.2.4 (December 2024)
|
||||
- **Security:** Added support for self-signed certificates
|
||||
- **Configuration:** Added new option `ALLOW_SELF_SIGNED_CERTS` for trusted environments
|
||||
|
||||
### v2.2.3 (July 2024)
|
||||
- **Feature:** Added IPv6 support (thanks to contributor: floridan95)
|
||||
- **Configuration:** Added `ENABLE_IPV6` option to enable/disable IPv6 blocking
|
||||
|
||||
### v2.2.2
|
||||
- **Fix:** Resolved issue with Telegram notifications not being delivered properly
|
||||
|
||||
### v2.2.1
|
||||
- **Bugfix:** Fixed script crash during Suricata logrotate operations
|
||||
- **Stability:** Improved file handling for log rotation events
|
||||
|
||||
### v2.2
|
||||
- **Compatibility:** Added support for Debian 12
|
||||
- **System:** Updated installation scripts for newer package versions
|
||||
|
||||
### v2.1
|
||||
- **Reliability:** Improved stability of the `read_json` function (thanks to contributor: bekhzad-khamidullaev)
|
||||
- **Performance:** Better handling of malformed JSON data
|
||||
- View CHANGELOG.md
|
||||
|
||||
## 🔧 Troubleshooting
|
||||
|
||||
|
345
mikrocata.py
345
mikrocata.py
@ -4,6 +4,7 @@ import ssl
|
||||
import os
|
||||
import socket
|
||||
import re
|
||||
import ipaddress
|
||||
from time import sleep
|
||||
from datetime import datetime as dt
|
||||
import pyinotify
|
||||
@ -14,6 +15,8 @@ from librouteros import connect
|
||||
from librouteros.query import Key
|
||||
import requests
|
||||
|
||||
VERSION = "2.3.0" # Updated April 2025
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
################# START EDIT SETTINGS
|
||||
|
||||
@ -33,8 +36,8 @@ TELEGRAM_CHATID = "CHATID"
|
||||
|
||||
# You can add your WAN IP, so it doesn't get mistakenly blocked (don't leave empty string)
|
||||
WAN_IP = "yourpublicip"
|
||||
LOCAL_IP_PREFIX = "192.168."
|
||||
WHITELIST_IPS = (WAN_IP, LOCAL_IP_PREFIX, "127.0.0.1", "1.1.1.1", "8.8.8.8", "fe80:")
|
||||
LOCAL_IP_PREFIX = "192.168.0.0/16"
|
||||
WHITELIST_IPS = (WAN_IP, LOCAL_IP_PREFIX, "127.0.0.1", "1.1.1.1", "8.8.8.8", "fe80:", "10.0.0.0/8", "172.16.0.0/12")
|
||||
COMMENT_TIME_FORMAT = "%-d %b %Y %H:%M:%S.%f" # See datetime strftime formats.
|
||||
ENABLE_IPV6 = False
|
||||
|
||||
@ -46,6 +49,9 @@ SEVERITY=("1","2")
|
||||
# with self-signed certificates in trusted environments
|
||||
ALLOW_SELF_SIGNED_CERTS = False
|
||||
|
||||
# Enable debug mode for verbose logging
|
||||
DEBUG_MODE = False
|
||||
|
||||
################# END EDIT SETTINGS
|
||||
# ------------------------------------------------------------------------------
|
||||
LISTEN_INTERFACE=("tzsp0")
|
||||
@ -80,6 +86,59 @@ ignore_list = []
|
||||
last_save_time = 0
|
||||
SAVE_INTERVAL = 300 # Save lists every 5 minutes
|
||||
|
||||
def debug_log(message):
|
||||
"""Print message only if DEBUG_MODE is enabled"""
|
||||
if DEBUG_MODE:
|
||||
print(f"[Mikrocata-DEBUG] {message}")
|
||||
|
||||
def sanitize_text(text):
|
||||
"""Remove emojis and other non-ASCII characters from text"""
|
||||
if not text:
|
||||
return ""
|
||||
# Keep only ASCII characters (removes all emojis and special characters)
|
||||
return ''.join(char for char in text if ord(char) < 128)
|
||||
|
||||
def is_ip_in_whitelist(ip_to_check, whitelist):
|
||||
"""
|
||||
Check if an IP is in the whitelist, supporting both direct matches,
|
||||
prefix matches, and CIDR notation.
|
||||
"""
|
||||
try:
|
||||
# Convert the IP to check to an ipaddress object for CIDR matching
|
||||
if ':' in ip_to_check: # IPv6
|
||||
ip_obj = ipaddress.IPv6Address(ip_to_check)
|
||||
else: # IPv4
|
||||
ip_obj = ipaddress.IPv4Address(ip_to_check)
|
||||
|
||||
for item in whitelist:
|
||||
# Direct IP match
|
||||
if ip_to_check == item:
|
||||
debug_log(f"IP {ip_to_check} matches exact whitelist entry {item}")
|
||||
return True
|
||||
|
||||
# String prefix match (like "192.168.")
|
||||
if isinstance(item, str) and not '/' in item and ip_to_check.startswith(item):
|
||||
debug_log(f"IP {ip_to_check} matches prefix whitelist entry {item}")
|
||||
return True
|
||||
|
||||
# CIDR notation check (like "10.0.0.0/8")
|
||||
if '/' in item:
|
||||
try:
|
||||
network = ipaddress.ip_network(item)
|
||||
if ip_obj in network:
|
||||
debug_log(f"IP {ip_to_check} is within CIDR whitelist range {item}")
|
||||
return True
|
||||
except ValueError:
|
||||
print(f"[Mikrocata] Warning: Invalid CIDR notation in whitelist: {item}")
|
||||
|
||||
debug_log(f"IP {ip_to_check} is not in any whitelist entry")
|
||||
return False
|
||||
|
||||
except ValueError as e:
|
||||
debug_log(f"Error checking whitelist for IP {ip_to_check}: {e}")
|
||||
# If we can't parse the IP, we should not whitelist it
|
||||
return False
|
||||
|
||||
class EventHandler(pyinotify.ProcessEvent):
|
||||
def process_IN_MODIFY(self, event):
|
||||
if event.pathname == FILEPATH:
|
||||
@ -113,7 +172,6 @@ def seek_to_end(fpath):
|
||||
sleep(10)
|
||||
continue
|
||||
|
||||
|
||||
def read_json(fpath):
|
||||
global last_pos
|
||||
while True:
|
||||
@ -125,7 +183,7 @@ def read_json(fpath):
|
||||
try:
|
||||
alert = json.loads(line)
|
||||
if alert.get('event_type') == 'alert':
|
||||
alerts.append(json.loads(line))
|
||||
alerts.append(alert) # Fixed: don't json.loads again
|
||||
else:
|
||||
last_pos = f.tell()
|
||||
continue
|
||||
@ -138,7 +196,6 @@ def read_json(fpath):
|
||||
sleep(10)
|
||||
continue
|
||||
|
||||
|
||||
def add_to_tik(alerts):
|
||||
global last_pos
|
||||
global api
|
||||
@ -148,98 +205,207 @@ def add_to_tik(alerts):
|
||||
_id = Key(".id")
|
||||
_list = Key("list")
|
||||
|
||||
address_list = api.path("/ip/firewall/address-list")
|
||||
address_list_v6 = api.path("/ipv6/firewall/address-list")
|
||||
resources = api.path("system/resource")
|
||||
|
||||
# Remove duplicate src_ips.
|
||||
for event in {item['src_ip']: item for item in alerts}.values():
|
||||
if DEBUG_MODE:
|
||||
print(f"[Mikrocata] Processing {len(alerts)} alert events")
|
||||
|
||||
if not alerts:
|
||||
debug_log("No alerts to process")
|
||||
return
|
||||
|
||||
try:
|
||||
address_list = api.path("/ip/firewall/address-list")
|
||||
address_list_v6 = api.path("/ipv6/firewall/address-list")
|
||||
resources = api.path("system/resource")
|
||||
debug_log("Successfully connected to Mikrotik API paths")
|
||||
except Exception as e:
|
||||
print(f"[Mikrocata] Error connecting to Mikrotik API: {str(e)}")
|
||||
raise
|
||||
|
||||
# Remove duplicate src_ips
|
||||
unique_alerts = {item['src_ip']: item for item in alerts}.values()
|
||||
debug_log(f"Processing {len(unique_alerts)} unique source IPs from alerts")
|
||||
|
||||
for event in unique_alerts:
|
||||
debug_log(f"Processing alert: SID={event['alert']['signature_id']}, Severity={event['alert']['severity']}")
|
||||
|
||||
# Check alert severity
|
||||
if str(event["alert"]["severity"]) not in SEVERITY:
|
||||
print("pass severity: " + str(event["alert"]["severity"]))
|
||||
break
|
||||
print(f"[Mikrocata] Skipping alert SID={event['alert']['signature_id']} (severity {event['alert']['severity']})")
|
||||
continue
|
||||
|
||||
# Check interface
|
||||
if str(event["in_iface"]) not in LISTEN_INTERFACE:
|
||||
break
|
||||
debug_log(f"Skipping alert from interface {event['in_iface']}")
|
||||
continue
|
||||
|
||||
if not in_ignore_list(ignore_list, event):
|
||||
# Check if in ignore list
|
||||
if in_ignore_list(ignore_list, event):
|
||||
print(f"[Mikrocata] Skipping alert {event['alert']['signature_id']} - in ignore list")
|
||||
continue
|
||||
|
||||
debug_log(f"Alert passed all filters, preparing to add to MikroTik")
|
||||
|
||||
try:
|
||||
timestamp = dt.strptime(event["timestamp"],
|
||||
"%Y-%m-%dT%H:%M:%S.%f%z").strftime(
|
||||
COMMENT_TIME_FORMAT)
|
||||
is_v6 = ':' in event["src_ip"]
|
||||
curr_list = address_list
|
||||
if ENABLE_IPV6 and is_v6:
|
||||
curr_list = address_list_v6
|
||||
if event["src_ip"].startswith(WHITELIST_IPS):
|
||||
if event["dest_ip"].startswith(WHITELIST_IPS):
|
||||
continue
|
||||
"%Y-%m-%dT%H:%M:%S.%f%z").strftime(
|
||||
COMMENT_TIME_FORMAT)
|
||||
except Exception as e:
|
||||
debug_log(f"Error parsing timestamp {event['timestamp']}: {str(e)}")
|
||||
timestamp = dt.now().strftime(COMMENT_TIME_FORMAT)
|
||||
|
||||
# Determine if IPv6
|
||||
is_v6 = ':' in event["src_ip"]
|
||||
curr_list = address_list
|
||||
if ENABLE_IPV6 and is_v6:
|
||||
debug_log(f"IPv6 address detected: {event['src_ip']}")
|
||||
curr_list = address_list_v6
|
||||
|
||||
# Check whitelist with improved function
|
||||
if is_ip_in_whitelist(event["src_ip"], WHITELIST_IPS):
|
||||
debug_log(f"Source IP {event['src_ip']} in whitelist")
|
||||
if is_ip_in_whitelist(event["dest_ip"], WHITELIST_IPS):
|
||||
debug_log(f"Destination IP {event['dest_ip']} also in whitelist - skipping alert")
|
||||
continue
|
||||
|
||||
wanted_ip, wanted_port = event["dest_ip"], event.get("src_port")
|
||||
src_ip, src_port = event["src_ip"], event.get("dest_port")
|
||||
wanted_ip, wanted_port = event["dest_ip"], event.get("src_port")
|
||||
src_ip, src_port = event["src_ip"], event.get("dest_port")
|
||||
debug_log(f"Source IP in whitelist, targeting destination: {wanted_ip}")
|
||||
else:
|
||||
wanted_ip, wanted_port = event["src_ip"], event.get("dest_port")
|
||||
src_ip, src_port = event["dest_ip"], event.get("src_port")
|
||||
debug_log(f"Targeting source IP: {wanted_ip}")
|
||||
|
||||
else:
|
||||
wanted_ip, wanted_port = event["src_ip"], event.get("dest_port")
|
||||
src_ip, src_port = event["dest_ip"], event.get("src_port")
|
||||
# Check if target IP is in whitelist
|
||||
if is_ip_in_whitelist(wanted_ip, WHITELIST_IPS):
|
||||
print(f"[Mikrocata] Skipping: target IP {wanted_ip} is in whitelist")
|
||||
continue
|
||||
|
||||
try:
|
||||
cmnt=f"""[{event['alert']['gid']}:{
|
||||
try:
|
||||
# Log original signature before sanitizing
|
||||
original_signature = event['alert']['signature']
|
||||
debug_log(f"Original signature: {original_signature}")
|
||||
|
||||
# Sanitize the signature to remove emojis and special characters
|
||||
signature = sanitize_text(original_signature)
|
||||
debug_log(f"Sanitized signature: {signature}")
|
||||
|
||||
# If significant information was lost in sanitization, log a warning
|
||||
if len(signature) < len(original_signature) * 0.7: # If more than 30% of chars were removed
|
||||
debug_log(f"WARNING: Significant information lost during sanitization!")
|
||||
|
||||
cmnt = f"""[{event['alert']['gid']}:{
|
||||
event['alert']['signature_id']}] {
|
||||
signature} ::: Port: {
|
||||
wanted_port}/{
|
||||
event['proto']} ::: timestamp: {
|
||||
timestamp}"""
|
||||
|
||||
debug_log(f"Adding to list '{BLOCK_LIST_NAME}': IP={wanted_ip}, Timeout={TIMEOUT}")
|
||||
debug_log(f"Comment: {cmnt}")
|
||||
|
||||
curr_list.add(list=BLOCK_LIST_NAME,
|
||||
address=wanted_ip,
|
||||
comment=cmnt,
|
||||
timeout=TIMEOUT)
|
||||
|
||||
print(f"[Mikrocata] BLOCKED: {wanted_ip} - SID:{event['alert']['signature_id']} - Severity:{event['alert']['severity']}")
|
||||
|
||||
# Telegram notifications
|
||||
if enable_telegram:
|
||||
debug_log("Telegram notifications enabled, sending message")
|
||||
clean_message = sanitize_text(f"From: {wanted_ip}\nTo: {src_ip}:{str(wanted_port)}\nRule: {cmnt}")
|
||||
response = requests.get(sendTelegram(clean_message))
|
||||
debug_log(f"Telegram API response: {response.status_code}")
|
||||
|
||||
except librouteros.exceptions.TrapError as e:
|
||||
debug_log(f"MikroTik TrapError: {str(e)}")
|
||||
|
||||
if "failure: already have such entry" in str(e):
|
||||
debug_log(f"IP {wanted_ip} already exists in list {BLOCK_LIST_NAME}, updating entry")
|
||||
|
||||
# Find and remove existing entry
|
||||
existing_entries = list(curr_list.select(_id, _list, _address).where(
|
||||
_address == wanted_ip,
|
||||
_list == BLOCK_LIST_NAME))
|
||||
|
||||
debug_log(f"Found {len(existing_entries)} existing entries for {wanted_ip}")
|
||||
|
||||
for row in existing_entries:
|
||||
debug_log(f"Removing existing entry with ID {row['.id']}")
|
||||
curr_list.remove(row[".id"])
|
||||
|
||||
# Sanitize the signature here too
|
||||
signature = sanitize_text(event['alert']['signature'])
|
||||
|
||||
# Add updated entry
|
||||
updated_comment = f"""[{event['alert']['gid']}:{
|
||||
event['alert']['signature_id']}] {
|
||||
event['alert']['signature']} ::: Port: {
|
||||
signature} ::: Port: {
|
||||
wanted_port}/{
|
||||
event['proto']} ::: timestamp: {
|
||||
timestamp}"""
|
||||
|
||||
|
||||
debug_log(f"Re-adding IP {wanted_ip} with updated comment")
|
||||
curr_list.add(list=BLOCK_LIST_NAME,
|
||||
address=wanted_ip,
|
||||
comment=cmnt,
|
||||
timeout=TIMEOUT)
|
||||
address=wanted_ip,
|
||||
comment=updated_comment,
|
||||
timeout=TIMEOUT)
|
||||
print(f"[Mikrocata] UPDATED: {wanted_ip} - SID:{event['alert']['signature_id']}")
|
||||
|
||||
print(f"[Mikrocata] new ip added: {cmnt}")
|
||||
if enable_telegram == True:
|
||||
print(requests.get(sendTelegram("From: " + wanted_ip + "\nTo: " + src_ip + ":" + str(wanted_port) + "\nRule: " + cmnt)).json())
|
||||
else:
|
||||
print(f"[Mikrocata] ERROR: TrapError: {str(e)}")
|
||||
raise
|
||||
|
||||
except socket.timeout as e:
|
||||
print(f"[Mikrocata] Socket timeout: {str(e)}, reconnecting...")
|
||||
connect_to_tik()
|
||||
|
||||
except Exception as e:
|
||||
print(f"[Mikrocata] ERROR: {type(e).__name__} while processing {wanted_ip}: {str(e)}")
|
||||
if DEBUG_MODE:
|
||||
import traceback
|
||||
print(f"[Mikrocata] Traceback: {traceback.format_exc()}")
|
||||
continue
|
||||
|
||||
except librouteros.exceptions.TrapError as e:
|
||||
if "failure: already have such entry" in str(e):
|
||||
for row in curr_list.select(_id, _list, _address).where(
|
||||
_address == wanted_ip,
|
||||
_list == BLOCK_LIST_NAME):
|
||||
curr_list.remove(row[".id"])
|
||||
|
||||
curr_list.add(list=BLOCK_LIST_NAME,
|
||||
address=wanted_ip,
|
||||
comment=f"""[{event['alert']['gid']}:{
|
||||
event['alert']['signature_id']}] {
|
||||
event['alert']['signature']} ::: Port: {
|
||||
wanted_port}/{
|
||||
event['proto']} ::: timestamp: {
|
||||
timestamp}""",
|
||||
timeout=TIMEOUT)
|
||||
|
||||
else:
|
||||
raise
|
||||
|
||||
except socket.timeout:
|
||||
connect_to_tik()
|
||||
|
||||
# If router has been rebooted add saved list(s), then save lists to a file.
|
||||
# Only save lists and check uptime every x minutes
|
||||
# Save lists and check uptime every x minutes
|
||||
current_time = int(dt.now().timestamp())
|
||||
if current_time - last_save_time >= SAVE_INTERVAL:
|
||||
time_since_last_save = current_time - last_save_time
|
||||
debug_log(f"Time since last save: {time_since_last_save} seconds (interval: {SAVE_INTERVAL} seconds)")
|
||||
|
||||
if time_since_last_save >= SAVE_INTERVAL:
|
||||
debug_log(f"Save interval reached, saving lists and checking router uptime")
|
||||
last_save_time = current_time
|
||||
|
||||
# Check router uptime and restore lists if needed
|
||||
if check_tik_uptime(resources):
|
||||
add_saved_lists(address_list)
|
||||
if ENABLE_IPV6:
|
||||
add_saved_lists(address_list_v6, True)
|
||||
debug_log("Checking MikroTik uptime")
|
||||
uptime_check = check_tik_uptime(resources)
|
||||
|
||||
if uptime_check:
|
||||
print("[Mikrocata] Router rebooted - restoring saved lists")
|
||||
try:
|
||||
add_saved_lists(address_list)
|
||||
debug_log("Successfully restored IPv4 address lists")
|
||||
if ENABLE_IPV6:
|
||||
add_saved_lists(address_list_v6, True)
|
||||
debug_log("Successfully restored IPv6 address lists")
|
||||
except Exception as e:
|
||||
print(f"[Mikrocata] ERROR: Failed to restore lists: {str(e)}")
|
||||
else:
|
||||
debug_log("Router has not been rebooted, no need to restore lists")
|
||||
|
||||
# Save current lists to file
|
||||
save_lists(address_list)
|
||||
if ENABLE_IPV6:
|
||||
save_lists(address_list_v6, True)
|
||||
try:
|
||||
save_lists(address_list)
|
||||
debug_log("Successfully saved IPv4 address lists")
|
||||
if ENABLE_IPV6:
|
||||
save_lists(address_list_v6, True)
|
||||
debug_log("Successfully saved IPv6 address lists")
|
||||
except Exception as e:
|
||||
print(f"[Mikrocata] ERROR: Failed to save lists: {str(e)}")
|
||||
|
||||
print("[Mikrocata] Lists saved")
|
||||
debug_log("Lists saved successfully")
|
||||
|
||||
debug_log("Alert processing completed")
|
||||
|
||||
def check_tik_uptime(resources):
|
||||
|
||||
@ -425,13 +591,22 @@ def sendTelegram(message):
|
||||
telegram_url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage?chat_id={TELEGRAM_CHATID}&text={message}&disable_web_page_preview=true&parse_mode=html"
|
||||
try:
|
||||
response = requests.get(telegram_url)
|
||||
print(response.json())
|
||||
debug_log(f"Telegram response: {response.json()}")
|
||||
return telegram_url
|
||||
except Exception as e:
|
||||
print(f"Failed to send Telegram message: {e}")
|
||||
return telegram_url
|
||||
print(f"[Mikrocata] Failed to send Telegram message: {e}")
|
||||
return ""
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
print(f"[Mikrocata] Starting Mikrocata2SELKS v{VERSION}")
|
||||
|
||||
if DEBUG_MODE:
|
||||
print("[Mikrocata] Starting in DEBUG mode - verbose logging enabled")
|
||||
else:
|
||||
print("[Mikrocata] Starting in normal mode")
|
||||
|
||||
seek_to_end(FILEPATH)
|
||||
connect_to_tik()
|
||||
read_ignore_list(IGNORE_LIST_LOCATION)
|
||||
@ -446,21 +621,33 @@ def main():
|
||||
notifier = pyinotify.Notifier(wm, handler)
|
||||
wm.add_watch(directory_to_monitor, pyinotify.IN_CREATE | pyinotify.IN_MODIFY | pyinotify.IN_DELETE, rec=False)
|
||||
|
||||
print(f"[Mikrocata] Monitoring {FILEPATH} for alerts")
|
||||
print(f"[Mikrocata] Whitelist configured for: {WHITELIST_IPS}")
|
||||
print(f"[Mikrocata] Configured to process alerts with severity: {SEVERITY}")
|
||||
|
||||
while True:
|
||||
try:
|
||||
notifier.loop()
|
||||
|
||||
except (librouteros.exceptions.ConnectionClosed, socket.timeout) as e:
|
||||
print(f"[Mikrocata] (4) {str(e)}")
|
||||
print(f"[Mikrocata] Connection error: {str(e)}")
|
||||
connect_to_tik()
|
||||
continue
|
||||
|
||||
except librouteros.exceptions.TrapError as e:
|
||||
print(f"[Mikrocata] (8) librouteros.TrapError: {str(e)}")
|
||||
print(f"[Mikrocata] TrapError: {str(e)}")
|
||||
continue
|
||||
|
||||
except KeyError as e:
|
||||
print(f"[Mikrocata] (8) KeyError: {str(e)}")
|
||||
print(f"[Mikrocata] KeyError: {str(e)}")
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
print(f"[Mikrocata] Unexpected error: {str(e)}")
|
||||
if DEBUG_MODE:
|
||||
import traceback
|
||||
print(f"[Mikrocata] Traceback: {traceback.format_exc()}")
|
||||
sleep(5)
|
||||
continue
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
Loading…
x
Reference in New Issue
Block a user