Skip to content
Snippets Groups Projects

Remove truncate and add on conflict clause instead

Compare and Show latest version
1 file
+ 43
35
Compare changes
  • Side-by-side
  • Inline
+ 43
35
@@ -71,16 +71,16 @@ def main():
@@ -71,16 +71,16 @@ def main():
auth_nids = set()
auth_nids = set()
current_bins = []
current_bins = []
current_chain_conf = {}
current_chain_conf = {}
init_auth = get_authorizer_nodes()
init_auth = get_authorizer_nodes(auth_conn)
for i in init_auth:
for i in init_auth:
auth_nids.add(i[0])
auth_nids.add(bytes(i[0]))
while True:
while True:
try:
try:
log.info("Polling substrate...")
log.info("Polling substrate...")
# Deal with bins
# Deal with bins
bins, chain_conf = poll_cmix_info(substrate)
bins, chain_conf = poll_cmix_info(substrate)
log.debug(f"Polled bins: {bins}")
log.debug(f"Polled {len(bins)} bins!")
if bins != current_bins:
if bins != current_bins:
log.info(f"Updating GeoBins: {bins}")
log.info(f"Updating GeoBins: {bins}")
set_bins(conn, bins)
set_bins(conn, bins)
@@ -94,7 +94,7 @@ def main():
@@ -94,7 +94,7 @@ def main():
# Deal with active nodes
# Deal with active nodes
new_dict = poll_active_nodes(substrate)
new_dict = poll_active_nodes(substrate)
log.debug(f"Polled active nodes: {new_dict}")
log.debug(f"Polled {len(new_dict)} active nodes!")
if active_dict != new_dict:
if active_dict != new_dict:
log.info(f"Updating active nodes: {len(new_dict)} nodes")
log.info(f"Updating active nodes: {len(new_dict)} nodes")
@@ -103,9 +103,9 @@ def main():
@@ -103,9 +103,9 @@ def main():
new_auth_set = set(new_auth_nids)
new_auth_set = set(new_auth_nids)
to_add = new_auth_set.difference(auth_nids)
to_add = new_auth_set.difference(auth_nids)
to_delete = auth_nids.difference(new_auth_set)
to_delete = auth_nids.difference(new_auth_set)
set_authorizer_nodes(auth_conn, to_add, to_delete)
to_revoke = set_authorizer_nodes(auth_conn, to_add, to_delete)
auth_nids = new_auth_nids
auth_nids = new_auth_set
revoke_auth(to_delete)
revoke_auth(to_revoke)
# Pass a copy because the dict will be mutated
# Pass a copy because the dict will be mutated
set_active_nodes(conn, copy.deepcopy(new_dict))
set_active_nodes(conn, copy.deepcopy(new_dict))
@@ -225,16 +225,17 @@ def get_substrate_provider():
@@ -225,16 +225,17 @@ def get_substrate_provider():
def revoke_auth(to_revoke):
def revoke_auth(to_revoke):
"""
"""
revoke_auth accepts a list of node IDs to revoke auth from
revoke_auth accepts a list of node IP addresses to revoke auth from
:param to_revoke: list of node IDs
:param to_revoke: list of node IP addresses
"""
"""
for nid in to_revoke:
log.info(f"Revoking access to {len(to_revoke)} nodes...")
cmd = f"sudo nft -a list chain inet filter input | grep '{nid}' | awk -F'handle ' '{{print $2}}' | xargs -Ixxx sudo nft delete rule inet filter input handle xxx"
for node_ip in to_revoke:
p = subprocess.Popen(cmd.split())
cmd = f"sudo nft -a list chain inet filter input | grep '{node_ip}' | awk -F'handle ' '{{print $2}}' | xargs -Ixxx sudo nft delete rule inet filter input handle xxx"
output, error = p.communicate()
log.info(f"Running revoke command: {cmd}")
log.debug(output)
p = subprocess.Popen(['/bin/bash', '-c', cmd])
if error:
p.wait(5)
log.error(error)
if p.returncode != 0:
 
raise OSError(f"Revoke command exited with return code {p.returncode}")
def id_to_reg_code(cmix_id):
def id_to_reg_code(cmix_id):
@@ -699,36 +700,43 @@ def set_authorizer_nodes(conn, to_add, to_delete):
@@ -699,36 +700,43 @@ def set_authorizer_nodes(conn, to_add, to_delete):
:param conn: database connection object
:param conn: database connection object
:param to_add: list of node IDs to add
:param to_add: list of node IDs to add
:param to_delete: list of node IDs to delete
:param to_delete: list of node IDs to delete
:return:
:return list[ip_address]: list of IPs to revoke auth
"""
"""
cur = conn.cursor()
cur = conn.cursor()
# Convert Node information into authorizer insert command
# Convert Node information into authorizer insert command
node_list = get_authorizer_nodes(conn)
node_list = get_authorizer_nodes(conn)
 
to_revoke = []
delete_command = "DELETE FROM nodes WHERE id = ?;"
delete_command = "DELETE FROM nodes WHERE id = %s;"
for n in to_delete:
for row in node_list:
 
if bytes(row[0]) in to_delete:
 
try:
 
cur.execute(delete_command, (row[0],))
 
log.debug(cur.query)
 
except Exception as e:
 
log.error(f"Failed to remove node from authorizer DB: {cur.query}")
 
raise e
 
if row[1]:
 
to_revoke.append(row[1])
 
 
if len(to_add) > 0:
 
insert_list = [(i, None, None) for i in to_add]
 
# Insert Node information into authorizer db
 
insert_command = "INSERT INTO nodes (id, ip_address, last_updated) VALUES" + \
 
(' (%s, %s, %s),' * len(insert_list))
 
insert_command = insert_command[:-1] + " ON CONFLICT DO NOTHING;"
try:
try:
cur.execute(delete_command, (n,))
cur.execute(insert_command, [e for l in insert_list for e in l])
log.debug(cur.query)
log.debug(cur.query)
except Exception as e:
except Exception as e:
log.error(f"Failed to remove node from authorizer DB: {cur.query}")
log.error(f"Failed to insert into authorizer db: {cur.query}")
raise e
raise e
 
finally:
 
cur.close()
insert_list = [(i, None, None) for i in to_add]
conn.commit()
# Insert Node information into authorizer db
return to_revoke
insert_command = "INSERT INTO nodes (id, ip_address, last_updated) VALUES" + \
(' (%s, %s, %s),' * len(insert_list))
insert_command = insert_command[:-1] + " ON CONFLICT DO NOTHING;"
try:
cur.execute(insert_command, [e for l in insert_list for e in l])
log.debug(cur.query)
conn.commit()
except Exception as e:
log.error(f"Failed to insert into authorizer db: {cur.query}")
raise e
finally:
cur.close()
def check_table(conn, table_name):
def check_table(conn, table_name):
Loading