bin.Databases.QuestDB
1import collections 2from Databases.database import Database 3import socket 4import psycopg2 5 6 7class QuestDB(Database): 8 name = "quest" 9 10 def __init__(self, config): 11 super().__init__() 12 # For UDP, change socket.SOCK_STREAM to socket.SOCK_DGRAM 13 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 14 15 # Required args for line protocol and postgre connection 16 self.conf = { 17 "host": config["host"], 18 "tcp_port": int(config["tcp_port"]), 19 "pg_port": int(config["pg_port"]), 20 } 21 self.connection = None 22 23 def start(self): 24 """Connect to server using etc/config.cfg file""" 25 self.sock.connect((self.conf["host"], self.conf["tcp_port"])) 26 self.connection = psycopg2.connect( 27 host=self.conf["host"], 28 port=self.conf["pg_port"], 29 user="admin", 30 password="quest", 31 database="qdb", 32 ) 33 34 def stop(self): 35 """Close connection to server""" 36 self.sock.close() 37 if self.connection: 38 self.connection.close() 39 print("PostgreSQL connection is closed") 40 41 ############### 42 # INSERTS # 43 ############### 44 45 def save(self, record): 46 """Save bgp record using InfluxDB Line protocol 47 48 Format : 49 bgp,type={record['type']},project={record['project']}, 50 collector={record['collector']},country={record['country_code'] or ''} 51 peer={record["peer_asn"]},prefix="{record["prefix"]}", 52 path="{record.get("as-path", "")}",source="{record["source"]}" 53 {int(record['time']*1000000000)}\n 54 55 Args: 56 record (BGPElem) 57 """ 58 59 self.send_utf8( 60 ( 61 f"bgp,type={record['type']},project={record['project']}," 62 f"collector={record['collector']}," 63 f"country={record['country_code'] or ''}" 64 f' peer={record["peer_asn"]},prefix="{record["prefix"]}",' 65 f'path="{record.get("as-path", "")}",source="{record["source"]}"' 66 f" {int(record['time']*1000000000)}\n" 67 ) 68 ) 69 70 def send_utf8(self, msg): 71 """Encode message and send it to server""" 72 self.sock.sendall(msg.encode()) 73 74 ############## 75 # GETTER # 76 ############## 77 # The followings aren't used 78 79 var_names = { 80 "time_start": "time_start", 81 "time_end": "time_end", 82 "record_type": "type", 83 "peer_asn": "peer", 84 "collectors": "collector", 85 "countries": "country", 86 "as_numbers": "source", 87 "prefixes": "prefix", 88 "as_paths": "path", 89 } 90 # Used in `QuestDB.get` function as arg_name:db_column_name 91 92 def get( 93 self, 94 time_start, 95 time_end, 96 record_type=None, 97 peer_asn=None, 98 collectors=None, 99 countries=None, 100 as_numbers=None, 101 prefixes=None, 102 as_paths=None, 103 ): 104 """ 105 Retrieve data from QuestDB using psycopg2 (Postgre) connection 106 107 See `Database.get()` 108 """ 109 if not self.cursor: 110 self.cursor = self.connection.cursor() 111 112 query = "SELECT * FROM bgp WHERE" 113 params = {key: value for key, value in locals() if value is not None} 114 115 if len(params) > 0: 116 for k, v in params: 117 query += ( 118 f" {QuestDB.var_names[k]} " 119 + ("IN " if isinstance(v, collections.Sequence) else "== ") 120 + "%s and" 121 ) 122 query += query.rsplit(" ", 1)[0] 123 124 self.cursor.execute(query, params.values()) 125 return self.cursor.fetchall()
8class QuestDB(Database): 9 name = "quest" 10 11 def __init__(self, config): 12 super().__init__() 13 # For UDP, change socket.SOCK_STREAM to socket.SOCK_DGRAM 14 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 15 16 # Required args for line protocol and postgre connection 17 self.conf = { 18 "host": config["host"], 19 "tcp_port": int(config["tcp_port"]), 20 "pg_port": int(config["pg_port"]), 21 } 22 self.connection = None 23 24 def start(self): 25 """Connect to server using etc/config.cfg file""" 26 self.sock.connect((self.conf["host"], self.conf["tcp_port"])) 27 self.connection = psycopg2.connect( 28 host=self.conf["host"], 29 port=self.conf["pg_port"], 30 user="admin", 31 password="quest", 32 database="qdb", 33 ) 34 35 def stop(self): 36 """Close connection to server""" 37 self.sock.close() 38 if self.connection: 39 self.connection.close() 40 print("PostgreSQL connection is closed") 41 42 ############### 43 # INSERTS # 44 ############### 45 46 def save(self, record): 47 """Save bgp record using InfluxDB Line protocol 48 49 Format : 50 bgp,type={record['type']},project={record['project']}, 51 collector={record['collector']},country={record['country_code'] or ''} 52 peer={record["peer_asn"]},prefix="{record["prefix"]}", 53 path="{record.get("as-path", "")}",source="{record["source"]}" 54 {int(record['time']*1000000000)}\n 55 56 Args: 57 record (BGPElem) 58 """ 59 60 self.send_utf8( 61 ( 62 f"bgp,type={record['type']},project={record['project']}," 63 f"collector={record['collector']}," 64 f"country={record['country_code'] or ''}" 65 f' peer={record["peer_asn"]},prefix="{record["prefix"]}",' 66 f'path="{record.get("as-path", "")}",source="{record["source"]}"' 67 f" {int(record['time']*1000000000)}\n" 68 ) 69 ) 70 71 def send_utf8(self, msg): 72 """Encode message and send it to server""" 73 self.sock.sendall(msg.encode()) 74 75 ############## 76 # GETTER # 77 ############## 78 # The followings aren't used 79 80 var_names = { 81 "time_start": "time_start", 82 "time_end": "time_end", 83 "record_type": "type", 84 "peer_asn": "peer", 85 "collectors": "collector", 86 "countries": "country", 87 "as_numbers": "source", 88 "prefixes": "prefix", 89 "as_paths": "path", 90 } 91 # Used in `QuestDB.get` function as arg_name:db_column_name 92 93 def get( 94 self, 95 time_start, 96 time_end, 97 record_type=None, 98 peer_asn=None, 99 collectors=None, 100 countries=None, 101 as_numbers=None, 102 prefixes=None, 103 as_paths=None, 104 ): 105 """ 106 Retrieve data from QuestDB using psycopg2 (Postgre) connection 107 108 See `Database.get()` 109 """ 110 if not self.cursor: 111 self.cursor = self.connection.cursor() 112 113 query = "SELECT * FROM bgp WHERE" 114 params = {key: value for key, value in locals() if value is not None} 115 116 if len(params) > 0: 117 for k, v in params: 118 query += ( 119 f" {QuestDB.var_names[k]} " 120 + ("IN " if isinstance(v, collections.Sequence) else "== ") 121 + "%s and" 122 ) 123 query += query.rsplit(" ", 1)[0] 124 125 self.cursor.execute(query, params.values()) 126 return self.cursor.fetchall()
Helper class that provides a standard way to create an ABC using inheritance.
11 def __init__(self, config): 12 super().__init__() 13 # For UDP, change socket.SOCK_STREAM to socket.SOCK_DGRAM 14 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 15 16 # Required args for line protocol and postgre connection 17 self.conf = { 18 "host": config["host"], 19 "tcp_port": int(config["tcp_port"]), 20 "pg_port": int(config["pg_port"]), 21 } 22 self.connection = None
This variable is used for configuration file
You must specify it in config to load an instance
24 def start(self): 25 """Connect to server using etc/config.cfg file""" 26 self.sock.connect((self.conf["host"], self.conf["tcp_port"])) 27 self.connection = psycopg2.connect( 28 host=self.conf["host"], 29 port=self.conf["pg_port"], 30 user="admin", 31 password="quest", 32 database="qdb", 33 )
Connect to server using etc/config.cfg file
35 def stop(self): 36 """Close connection to server""" 37 self.sock.close() 38 if self.connection: 39 self.connection.close() 40 print("PostgreSQL connection is closed")
Close connection to server
46 def save(self, record): 47 """Save bgp record using InfluxDB Line protocol 48 49 Format : 50 bgp,type={record['type']},project={record['project']}, 51 collector={record['collector']},country={record['country_code'] or ''} 52 peer={record["peer_asn"]},prefix="{record["prefix"]}", 53 path="{record.get("as-path", "")}",source="{record["source"]}" 54 {int(record['time']*1000000000)}\n 55 56 Args: 57 record (BGPElem) 58 """ 59 60 self.send_utf8( 61 ( 62 f"bgp,type={record['type']},project={record['project']}," 63 f"collector={record['collector']}," 64 f"country={record['country_code'] or ''}" 65 f' peer={record["peer_asn"]},prefix="{record["prefix"]}",' 66 f'path="{record.get("as-path", "")}",source="{record["source"]}"' 67 f" {int(record['time']*1000000000)}\n" 68 ) 69 )
Save bgp record using InfluxDB Line protocol
Format : bgp,type={record['type']},project={record['project']}, collector={record['collector']},country={record['country_code'] or ''} peer={record["peer_asn"]},prefix="{record["prefix"]}", path="{record.get("as-path", "")}",source="{record["source"]}" {int(record['time']*1000000000)}
Args: record (BGPElem)
71 def send_utf8(self, msg): 72 """Encode message and send it to server""" 73 self.sock.sendall(msg.encode())
Encode message and send it to server
93 def get( 94 self, 95 time_start, 96 time_end, 97 record_type=None, 98 peer_asn=None, 99 collectors=None, 100 countries=None, 101 as_numbers=None, 102 prefixes=None, 103 as_paths=None, 104 ): 105 """ 106 Retrieve data from QuestDB using psycopg2 (Postgre) connection 107 108 See `Database.get()` 109 """ 110 if not self.cursor: 111 self.cursor = self.connection.cursor() 112 113 query = "SELECT * FROM bgp WHERE" 114 params = {key: value for key, value in locals() if value is not None} 115 116 if len(params) > 0: 117 for k, v in params: 118 query += ( 119 f" {QuestDB.var_names[k]} " 120 + ("IN " if isinstance(v, collections.Sequence) else "== ") 121 + "%s and" 122 ) 123 query += query.rsplit(" ", 1)[0] 124 125 self.cursor.execute(query, params.values()) 126 return self.cursor.fetchall()
Retrieve data from QuestDB using psycopg2 (Postgre) connection
See Database.get()