Update mikrocata.py

Update mikrocata.py 2.2.1
This commit is contained in:
Giuseppe 2024-02-23 10:26:08 +01:00 committed by GitHub
parent 26e5289a54
commit bd3a8503a8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -72,23 +72,23 @@ api = None
ignore_list = [] ignore_list = []
class EventHandler(pyinotify.ProcessEvent): class EventHandler(pyinotify.ProcessEvent):
@classmethod def process_IN_MODIFY(self, event):
def process_IN_MODIFY(cls, event): if event.pathname == FILEPATH:
try: try:
add_to_tik(read_json(FILEPATH)) add_to_tik(read_json(FILEPATH))
except ConnectionError: except ConnectionError:
connect_to_tik() connect_to_tik()
check_truncated(FILEPATH) def process_IN_CREATE(self, event):
if event.pathname == FILEPATH:
print(f"[Mikrocata] New eve.json detected. Resetting last_pos.")
# Check if logrotate truncated file. (Use 'copytruncate' option.) global last_pos
def check_truncated(fpath): last_pos = 0
global last_pos self.process_IN_MODIFY(event)
if last_pos > os.path.getsize(fpath):
last_pos = 0
def process_IN_DELETE(self, event):
if event.pathname == FILEPATH:
print(f"[Mikrocata] eve.json deleted. Monitoring for new file.")
def seek_to_end(fpath): def seek_to_end(fpath):
global last_pos global last_pos
@ -180,6 +180,7 @@ def add_to_tik(alerts):
comment=cmnt, comment=cmnt,
timeout=TIMEOUT) timeout=TIMEOUT)
print(f"[Mikrocata] new ip added: {cmnt}")
if enable_telegram == True: if enable_telegram == True:
print(requests.get(sendTelegram("From: " + wanted_ip + "\nTo: " + src_ip + ":" + wanted_port + "\nRule: " + cmnt)).json()) print(requests.get(sendTelegram("From: " + wanted_ip + "\nTo: " + src_ip + ":" + wanted_port + "\nRule: " + cmnt)).json())
@ -273,6 +274,7 @@ def connect_to_tik():
try: try:
api = connect(username=USERNAME, password=PASSWORD, host=ROUTER_IP, api = connect(username=USERNAME, password=PASSWORD, host=ROUTER_IP,
ssl_wrapper=ctx.wrap_socket, port=PORT) ssl_wrapper=ctx.wrap_socket, port=PORT)
print(f"[Mikrocata] Connected to Mikrotik")
break break
except librouteros.exceptions.TrapError as e: except librouteros.exceptions.TrapError as e:
@ -306,14 +308,6 @@ def connect_to_tik():
raise raise
def sendTelegram(message):
sleep(2)
telegram_url = "https://api.telegram.org/bot" + TELEGRAM_TOKEN + "/sendMessage?chat_id=" + TELEGRAM_CHATID + "&text=" + message + "&disable_web_page_preview=true&parse_mode=html"
return telegram_url
def save_lists(address_list): def save_lists(address_list):
_address = Key("address") _address = Key("address")
_list = Key("list") _list = Key("list")
@ -326,7 +320,6 @@ def save_lists(address_list):
_comment).where(_list == save_list): _comment).where(_list == save_list):
f.write(ujson.dumps(row) + "\n") f.write(ujson.dumps(row) + "\n")
def add_saved_lists(address_list): def add_saved_lists(address_list):
with open(SAVE_LISTS_LOCATION, "r") as f: with open(SAVE_LISTS_LOCATION, "r") as f:
addresses = [ujson.loads(line) for line in f.readlines()] addresses = [ujson.loads(line) for line in f.readlines()]
@ -345,7 +338,6 @@ def add_saved_lists(address_list):
raise raise
def read_ignore_list(fpath): def read_ignore_list(fpath):
global ignore_list global ignore_list
@ -361,7 +353,6 @@ def read_ignore_list(fpath):
except FileNotFoundError: except FileNotFoundError:
print(f"[Mikrocata] File: {IGNORE_LIST_LOCATION} not found. Continuing..") print(f"[Mikrocata] File: {IGNORE_LIST_LOCATION} not found. Continuing..")
def in_ignore_list(ignr_list, event): def in_ignore_list(ignr_list, event):
for entry in ignr_list: for entry in ignr_list:
if entry.isdigit() and int(entry) == int(event['alert']['signature_id']): if entry.isdigit() and int(entry) == int(event['alert']['signature_id']):
@ -377,6 +368,16 @@ def in_ignore_list(ignr_list, event):
return False return False
def sendTelegram(message):
if enable_telegram:
telegram_url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage?chat_id={TELEGRAM_CHATID}&text={message}&disable_web_page_preview=true&parse_mode=html"
try:
response = requests.get(telegram_url)
print(response.json())
except Exception as e:
print(f"Failed to send Telegram message: {e}")
return telegram_url
def main(): def main():
seek_to_end(FILEPATH) seek_to_end(FILEPATH)
@ -385,10 +386,12 @@ def main():
os.makedirs(os.path.dirname(SAVE_LISTS_LOCATION), exist_ok=True) os.makedirs(os.path.dirname(SAVE_LISTS_LOCATION), exist_ok=True)
os.makedirs(os.path.dirname(UPTIME_BOOKMARK), exist_ok=True) os.makedirs(os.path.dirname(UPTIME_BOOKMARK), exist_ok=True)
directory_to_monitor = os.path.dirname(FILEPATH)
wm = pyinotify.WatchManager() wm = pyinotify.WatchManager()
handler = EventHandler() handler = EventHandler()
notifier = pyinotify.Notifier(wm, handler) notifier = pyinotify.Notifier(wm, handler)
wm.add_watch(FILEPATH, pyinotify.IN_MODIFY) wm.add_watch(directory_to_monitor, pyinotify.IN_CREATE | pyinotify.IN_MODIFY | pyinotify.IN_DELETE, rec=False)
while True: while True:
try: try:
@ -407,6 +410,5 @@ def main():
print(f"[Mikrocata] (8) KeyError: {str(e)}") print(f"[Mikrocata] (8) KeyError: {str(e)}")
continue continue
if __name__ == "__main__": if __name__ == "__main__":
main() main()