Add IPv6 support

Extends mikrocata.py with IPv6 support. Saving and adding
previously saved lists should also work. It is stored in a separate
file. Disabling via ENABLE_IPV6 boolean affects only event
processing, which should result in empty v6 files.
This commit is contained in:
Daniel Florian 2024-07-12 21:49:37 +02:00
parent e8bd32e824
commit a518a044f7
2 changed files with 29 additions and 13 deletions

View File

@ -38,6 +38,8 @@ This repository is designed to simplify the installation process for the IDS/IPS
```sh
/ip/firewall/raw/add action=drop chain=prerouting comment="IPS-drop_in_bad_traffic" src-address-list=Suricata
/ip/firewall/raw/add action=drop chain=prerouting comment="IPS-drop_out_bad_traffic" dst-address-list=Suricata
/ipv6/firewall/raw/add action=drop chain=prerouting comment="IPS-drop_in_bad_traffic" src-address-list=Suricata
/ipv6/firewall/raw/add action=drop chain=prerouting comment="IPS-drop_out_bad_traffic" dst-address-list=Suricata
```
3. Enable Mikrotik API:
```sh

View File

@ -33,8 +33,9 @@ TELEGRAM_CHATID = "CHATID"
# You can add your WAN IP, so it doesn't get mistakenly blocked (don't leave empty string)
WAN_IP = "yourpublicip"
LOCAL_IP_PREFIX = "192.168."
WHITELIST_IPS = (WAN_IP, LOCAL_IP_PREFIX, "127.0.0.1", "1.1.1.1", "8.8.8.8")
WHITELIST_IPS = (WAN_IP, LOCAL_IP_PREFIX, "127.0.0.1", "1.1.1.1", "8.8.8.8", "fe80:")
COMMENT_TIME_FORMAT = "%-d %b %Y %H:%M:%S.%f" # See datetime strftime formats.
ENABLE_IPV6 = False
#Set comma separated value of suricata alerts severity which will be blocked in Mikrotik. All severity values are ("1","2","3")
SEVERITY=("1","2")
@ -53,7 +54,7 @@ SAVE_LISTS = [BLOCK_LIST_NAME]
# (!) Make sure you have privileges (!)
SAVE_LISTS_LOCATION = os.path.abspath("/var/lib/mikrocata/savelists-tzsp0.json")
SAVE_LISTS_LOCATION_V6 = os.path.abspath("/var/lib/mikrocata/savelists-tzsp0_v6.json")
# Location for Mikrotik's uptime. (needed for re-adding lists after reboot)
UPTIME_BOOKMARK = os.path.abspath("/var/lib/mikrocata/uptime-tzsp0.bookmark")
@ -139,6 +140,7 @@ def add_to_tik(alerts):
_list = Key("list")
address_list = api.path("/ip/firewall/address-list")
address_list_v6 = api.path("/ipv6/firewall/address-list")
resources = api.path("system/resource")
# Remove duplicate src_ips.
@ -155,7 +157,10 @@ def add_to_tik(alerts):
timestamp = dt.strptime(event["timestamp"],
"%Y-%m-%dT%H:%M:%S.%f%z").strftime(
COMMENT_TIME_FORMAT)
is_v6 = ':' in event["src_ip"]
curr_list = address_list
if ENABLE_IPV6 and is_v6:
curr_list = address_list_v6
if event["src_ip"].startswith(WHITELIST_IPS):
if event["dest_ip"].startswith(WHITELIST_IPS):
continue
@ -175,7 +180,7 @@ def add_to_tik(alerts):
event['proto']} ::: timestamp: {
timestamp}"""
address_list.add(list=BLOCK_LIST_NAME,
curr_list.add(list=BLOCK_LIST_NAME,
address=wanted_ip,
comment=cmnt,
timeout=TIMEOUT)
@ -187,12 +192,12 @@ def add_to_tik(alerts):
except librouteros.exceptions.TrapError as e:
if "failure: already have such entry" in str(e):
for row in address_list.select(_id, _list, _address).where(
for row in curr_list.select(_id, _list, _address).where(
_address == wanted_ip,
_list == BLOCK_LIST_NAME):
address_list.remove(row[".id"])
curr_list.remove(row[".id"])
address_list.add(list=BLOCK_LIST_NAME,
curr_list.add(list=BLOCK_LIST_NAME,
address=wanted_ip,
comment=f"""[{event['alert']['gid']}:{
event['alert']['signature_id']}] {
@ -211,8 +216,12 @@ def add_to_tik(alerts):
# If router has been rebooted add saved list(s), then save lists to a file.
if check_tik_uptime(resources):
add_saved_lists(address_list)
if ENABLE_IPV6:
add_saved_lists(address_list_v6,True)
save_lists(address_list)
if ENABLE_IPV6:
save_lists(address_list_v6,True)
def check_tik_uptime(resources):
@ -308,22 +317,26 @@ def connect_to_tik():
raise
def save_lists(address_list):
def save_lists(address_list,is_v6=False):
_address = Key("address")
_list = Key("list")
_timeout = Key("timeout")
_comment = Key("comment")
with open(SAVE_LISTS_LOCATION, "w") as f:
curr_file = SAVE_LISTS_LOCATION
if is_v6:
curr_file = SAVE_LISTS_LOCATION_V6
with open(curr_file, "w") as f:
for save_list in SAVE_LISTS:
for row in address_list.select(_list, _address, _timeout,
_comment).where(_list == save_list):
f.write(ujson.dumps(row) + "\n")
def add_saved_lists(address_list):
with open(SAVE_LISTS_LOCATION, "r") as f:
def add_saved_lists(address_list, is_v6=False):
curr_file = SAVE_LISTS_LOCATION
if is_v6:
curr_file = SAVE_LISTS_LOCATION_V6
with open(curr_file, "r") as f:
addresses = [ujson.loads(line) for line in f.readlines()]
for row in addresses:
cmnt = row.get("comment")
if cmnt is None:
@ -384,6 +397,7 @@ def main():
connect_to_tik()
read_ignore_list(IGNORE_LIST_LOCATION)
os.makedirs(os.path.dirname(SAVE_LISTS_LOCATION), exist_ok=True)
os.makedirs(os.path.dirname(SAVE_LISTS_LOCATION_V6), exist_ok=True)
os.makedirs(os.path.dirname(UPTIME_BOOKMARK), exist_ok=True)
directory_to_monitor = os.path.dirname(FILEPATH)