Skip to content
Snippets Groups Projects

Remove truncate and add on conflict clause instead

Merged Jonah Husson requested to merge hotfix/remove-truncate into master
Compare and
1 file
+ 64
20
Compare changes
  • Side-by-side
  • Inline
+ 64
20
@@ -5,6 +5,7 @@ Monitors substrate for Node and Network parameter information,
inserts Node information into authorizer DB,
and inserts Node information and Network parameters into Permissioning DB
"""
import subprocess
from collections import namedtuple
import copy
import json
@@ -66,9 +67,13 @@ def main():
table_exists = check_table(conn, "active_nodes")
# Main loop
active_dict = {}
active_dict = dict()
auth_nids = set()
current_bins = []
current_chain_conf = {}
init_auth = get_authorizer_nodes(auth_conn)
for i in init_auth:
auth_nids.add(i[0])
while True:
try:
log.info("Polling substrate...")
@@ -94,8 +99,13 @@ def main():
log.info(f"Updating active nodes: {len(new_dict)} nodes")
# Extract node IDs for authorizer (cmix_id with node type byte added)
authorizer_nids = [i.cmix_id + b'\x02' for i in new_dict.values()]
set_authorizer_nodes(auth_conn, authorizer_nids)
new_auth_nids = [i.cmix_id + b'\x02' for i in new_dict.values()]
new_auth_set = set(new_auth_nids)
to_add = new_auth_set.difference(auth_nids)
to_delete = auth_nids.difference(new_auth_set)
to_revoke = set_authorizer_nodes(auth_conn, to_add, to_delete)
auth_nids = new_auth_set
revoke_auth(to_revoke)
# Pass a copy because the dict will be mutated
set_active_nodes(conn, copy.deepcopy(new_dict))
@@ -213,6 +223,23 @@ def get_substrate_provider():
# Auxiliary Functions #
#######################
def revoke_auth(to_revoke):
"""
revoke_auth accepts a list of node IP addresses to revoke auth from
:param to_revoke: list of node IP addresses
"""
log.info(f"Revoking access to {len(to_revoke)} nodes...")
for nid in to_revoke:
cmd = f"sudo nft -a list chain inet filter input | grep '{nid}' | awk -F'handle ' '{{print $2}}' | xargs -Ixxx sudo nft delete rule inet filter input handle xxx"
log.debug(cmd)
p = subprocess.Popen(cmd.split())
output, error = p.communicate()
if output:
log.debug(output)
if error:
raise IOError(error)
def id_to_reg_code(cmix_id):
"""
Helper to convert cmix ID to reg code
@@ -405,7 +432,7 @@ def poll_active_nodes(substrate):
def update_config_options(conn, chain_conf):
"""
update config based on chain data
:param conn:
:param ChainConf chain_conf:
:return:
@@ -648,12 +675,11 @@ def get_max_app_id(conn):
return int(row[0]) if row and row[0] else 0
def set_authorizer_nodes(conn, nids):
def get_authorizer_nodes(conn):
"""
Set nodes in the authorizer db
:param conn: database connection object
:param nids: list of node IDs for authorizer
:return:
get list of nodes currently in the authorizer nodes table
:param conn: authorizer database connection
:return: list of rows containing id, ip_address, last_updated
"""
cur = conn.cursor()
@@ -667,23 +693,40 @@ def set_authorizer_nodes(conn, nids):
cur.close()
raise e
return cur.fetchall()
def set_authorizer_nodes(conn, to_add, to_delete):
"""
Set nodes in the authorizer db
:param conn: database connection object
:param to_add: list of node IDs to add
:param to_delete: list of node IDs to delete
:return list[ip_address]: list of IPs to revoke auth
"""
cur = conn.cursor()
# Convert Node information into authorizer insert command
node_list = cur.fetchall()
insert_list = []
for n in node_list:
if bytes(n[0]) in nids:
insert_list = insert_list + [n]
nids.remove(bytes(n[0]))
insert_list = insert_list + [(i, None, None) for i in nids]
node_list = get_authorizer_nodes(conn)
to_revoke = []
delete_command = "DELETE FROM nodes WHERE id = %s;"
for row in node_list:
if row[0] in to_delete:
try:
cur.execute(delete_command, (row[0],))
log.debug(cur.query)
except Exception as e:
log.error(f"Failed to remove node from authorizer DB: {cur.query}")
raise e
to_revoke.append(row[1])
insert_list = [(i, None, None) for i in to_add]
# Insert Node information into authorizer db
insert_command = "INSERT INTO nodes (id, ip_address, last_updated) VALUES" + \
(' (%s, %s, %s),' * len(insert_list))
insert_command = insert_command[:-1] + ";"
truncate_command = "TRUNCATE nodes;"
insert_command = insert_command[:-1] + " ON CONFLICT DO NOTHING;"
try:
cur.execute(truncate_command)
log.debug(cur.query)
cur.execute(insert_command, [e for l in insert_list for e in l])
log.debug(cur.query)
conn.commit()
@@ -692,6 +735,7 @@ def set_authorizer_nodes(conn, nids):
raise e
finally:
cur.close()
return to_revoke
def check_table(conn, table_name):
Loading