Skip to content
Snippets Groups Projects

Remove truncate and add on conflict clause instead

Compare and
1 file
+ 83
35
Compare changes
  • Side-by-side
  • Inline
+ 83
35
@@ -5,6 +5,7 @@ Monitors substrate for Node and Network parameter information,
inserts Node information into authorizer DB,
and inserts Node information and Network parameters into Permissioning DB
"""
import subprocess
from collections import namedtuple
import copy
import json
@@ -66,16 +67,20 @@ def main():
table_exists = check_table(conn, "active_nodes")
# Main loop
active_dict = {}
active_dict = dict()
auth_nids = set()
current_bins = []
current_chain_conf = {}
init_auth = get_authorizer_nodes(auth_conn)
for i in init_auth:
auth_nids.add(bytes(i[0]))
while True:
try:
log.info("Polling substrate...")
# Deal with bins
bins, chain_conf = poll_cmix_info(substrate)
log.debug(f"Polled bins: {bins}")
log.debug(f"Polled {len(bins)} bins!")
if bins != current_bins:
log.info(f"Updating GeoBins: {bins}")
set_bins(conn, bins)
@@ -89,13 +94,19 @@ def main():
# Deal with active nodes
new_dict = poll_active_nodes(substrate)
log.debug(f"Polled active nodes: {new_dict}")
log.debug(f"Polled {len(new_dict)} active nodes!")
if active_dict != new_dict:
log.info(f"Updating active nodes: {len(new_dict)} nodes")
# Extract node IDs for authorizer (cmix_id with node type byte added)
authorizer_nids = [i.cmix_id + b'\x02' for i in new_dict.values()]
set_authorizer_nodes(auth_conn, authorizer_nids)
new_auth_nids = [i.cmix_id + b'\x02' for i in new_dict.values()]
new_auth_set = set(new_auth_nids)
to_add = new_auth_set.difference(auth_nids)
to_delete = auth_nids.difference(new_auth_set)
to_revoke = set_authorizer_nodes(auth_conn, to_add, to_delete)
log.info(f"To revoke: {to_revoke}")
auth_nids = new_auth_set
revoke_auth(to_revoke)
# Pass a copy because the dict will be mutated
set_active_nodes(conn, copy.deepcopy(new_dict))
@@ -213,6 +224,21 @@ def get_substrate_provider():
# Auxiliary Functions #
#######################
def revoke_auth(to_revoke):
"""
revoke_auth accepts a list of node IP addresses to revoke auth from
:param to_revoke: list of node IP addresses
"""
log.info(f"Revoking access to {len(to_revoke)} nodes...")
for node_ip in to_revoke:
cmd = f"sudo nft -a list chain inet filter input | grep '{node_ip}' | awk -F'handle ' '{{print $2}}' | xargs -Ixxx sudo nft delete rule inet filter input handle xxx"
log.info(f"Running revoke command: {cmd}")
p = subprocess.Popen(['/bin/bash', '-c', cmd])
p.wait(5)
if p.returncode != 0:
raise OSError(f"Revoke command exited with return code {p.returncode}")
def id_to_reg_code(cmix_id):
"""
Helper to convert cmix ID to reg code
@@ -405,7 +431,7 @@ def poll_active_nodes(substrate):
def update_config_options(conn, chain_conf):
"""
update config based on chain data
:param conn:
:param ChainConf chain_conf:
:return:
@@ -648,12 +674,11 @@ def get_max_app_id(conn):
return int(row[0]) if row and row[0] else 0
def set_authorizer_nodes(conn, nids):
def get_authorizer_nodes(conn):
"""
Set nodes in the authorizer db
:param conn: database connection object
:param nids: list of node IDs for authorizer
:return:
get list of nodes currently in the authorizer nodes table
:param conn: authorizer database connection
:return: list of rows containing id, ip_address, last_updated
"""
cur = conn.cursor()
@@ -667,31 +692,54 @@ def set_authorizer_nodes(conn, nids):
cur.close()
raise e
return cur.fetchall()
def set_authorizer_nodes(conn, to_add, to_delete):
"""
Set nodes in the authorizer db
:param conn: database connection object
:param to_add: list of node IDs to add
:param to_delete: list of node IDs to delete
:return list[ip_address]: list of IPs to revoke auth
"""
cur = conn.cursor()
# Convert Node information into authorizer insert command
node_list = cur.fetchall()
insert_list = []
for n in node_list:
if bytes(n[0]) in nids:
insert_list = insert_list + [n]
nids.remove(bytes(n[0]))
insert_list = insert_list + [(i, None, None) for i in nids]
# Insert Node information into authorizer db
insert_command = "INSERT INTO nodes (id, ip_address, last_updated) VALUES" + \
(' (%s, %s, %s),' * len(insert_list))
insert_command = insert_command[:-1] + ";"
truncate_command = "TRUNCATE nodes;"
try:
cur.execute(truncate_command)
log.debug(cur.query)
cur.execute(insert_command, [e for l in insert_list for e in l])
log.debug(cur.query)
conn.commit()
except Exception as e:
log.error(f"Failed to insert into authorizer db: {cur.query}")
raise e
finally:
cur.close()
node_list = get_authorizer_nodes(conn)
to_revoke = []
delete_command = "DELETE FROM nodes WHERE id = %s;"
for row in node_list:
if bytes(row[0]) in to_delete:
log.info(f"Deleting {bytes(row[0])} [{row[1]}] - last updated in DB at {row[2]}")
try:
cur.execute(delete_command, (row[0],))
log.debug(cur.query)
except Exception as e:
log.error(f"Failed to remove node from authorizer DB: {cur.query}")
cur.close()
raise e
if row[1]:
to_revoke.append(row[1])
if len(to_add) > 0:
insert_list = [(i, None, None) for i in to_add]
# Insert Node information into authorizer db
insert_command = "INSERT INTO nodes (id, ip_address, last_updated) VALUES" + \
(' (%s, %s, %s),' * len(insert_list))
insert_command = insert_command[:-1] + " ON CONFLICT DO NOTHING;"
try:
cur.execute(insert_command, [e for l in insert_list for e in l])
log.debug(cur.query)
except Exception as e:
log.error(f"Failed to insert into authorizer db: {cur.query}")
cur.close()
raise e
conn.commit()
cur.close()
return to_revoke
def check_table(conn, table_name):
Loading