bin.Databases.ClickHouseDB
1import collections 2import threading 3from Databases.database import Database 4from clickhouse_driver import Client 5from queue import Queue 6 7 8class ClickHouseDB(Database): 9 name = "clickhouse" 10 11 def __init__(self, config): 12 """ 13 Args: 14 config (dict): Format : 15 {"host":"127.0.0.1", "port":9000, "batch_size":15000} 16 """ 17 super().__init__() 18 self.started = False 19 self.BATCH_SIZE = int(config["batch_size"]) if "batch_size" in config else 15000 20 self.queue = Queue() 21 self.client = Client( 22 host=config["host"], port=int(config["port"]), compression="lz4" 23 ) 24 self.client.execute( 25 "CREATE TABLE IF NOT EXISTS bgp (" 26 "time DateTime," 27 "type FixedString(1)," 28 "peer Int32," 29 "collector String," 30 "project String," 31 "country String," 32 "source String," 33 "prefix String," 34 "path String" 35 ") ENGINE = MergeTree ORDER BY (time, prefix, path)" 36 "SETTINGS old_parts_lifetime=10" 37 ) 38 print("Clickhouse : Generated BGP table :D") 39 40 def start(self): 41 """ 42 Create bgp table 43 (time DateTime, type, peer, collector, country, source, prefix, path) 44 Start thread for batch inserts 45 """ 46 # self.client.execute("DROP TABLE IF EXISTS bgp") 47 self.started = True 48 threading.Thread( 49 target=self.insert_batches, daemon=True, name="BGP monitor - clickhouse" 50 ).start() 51 52 def stop(self): 53 """Stop inserts""" 54 if self.started: 55 self.started = False 56 57 ############### 58 # INSERTS # 59 ############### 60 61 def save(self, data): 62 """Input data in queue for processing 63 64 Args: 65 data (BGPElem): bgp element to save 66 """ 67 self.queue.put(data) 68 69 def get_data(self): 70 """Retrieve data for inserts 71 Yields: 72 dict: Data to insert 73 """ 74 for idx in range(self.BATCH_SIZE): 75 rec = self.queue.get() 76 yield { 77 "time": int(rec["time"]), 78 "type": rec["type"], 79 "peer": rec["peer_asn"], 80 "collector": rec["collector"], 81 "country": rec["country_code"] or "", 82 "source": rec["source"] or "", 83 "prefix": rec["prefix"], 84 "path": rec.get("as-path", ""), 85 "project": rec["project"], 86 } 87 if self.queue.qsize() == 0 and not self.started: 88 print("Clickhouse : Inserting last batch") 89 return 90 91 def insert_batches(self): 92 """ 93 Insert BATCH_SIZE(15000 lines) batches 94 https://clickhouse.com/docs/en/about-us/performance/""" 95 while self.started: 96 self.client.execute("INSERT INTO bgp VALUES ", self.get_data()) 97 98 ############## 99 # GETTER # 100 ############## 101 102 var_names = { 103 "time_start": "time_start", 104 "time_end": "time_end", 105 "record_type": "type", 106 "peer_asn": "peer", 107 "collectors": "collector", 108 "countries": "country", 109 "as_numbers": "source", 110 "prefixes": "prefix", 111 "as_paths": "path", 112 } 113 # Used in `ClickHouseDB.get` function as arg_name:db_column_name 114 115 def get( 116 self, 117 time_start, 118 time_end, 119 record_type=None, 120 peer_asn=None, 121 collectors=None, 122 countries=None, 123 as_numbers=None, 124 prefixes=None, 125 as_paths=None, 126 ): 127 query = "SELECT * FROM bgp WHERE" 128 params = {key: value for key, value in locals() if value is not None} 129 130 if len(params) > 0: 131 for k, v in params: 132 query += ( 133 f" %({ClickHouseDB.var_names[k]}) " 134 + ("IN " if isinstance(v, collections.Sequence) else "== ") 135 + f"%({k}) and" 136 ) 137 query += query.rsplit(" ", 1)[0] 138 return self.client.execute_iter(query, params)
class
ClickHouseDB(Databases.database.Database):
9class ClickHouseDB(Database): 10 name = "clickhouse" 11 12 def __init__(self, config): 13 """ 14 Args: 15 config (dict): Format : 16 {"host":"127.0.0.1", "port":9000, "batch_size":15000} 17 """ 18 super().__init__() 19 self.started = False 20 self.BATCH_SIZE = int(config["batch_size"]) if "batch_size" in config else 15000 21 self.queue = Queue() 22 self.client = Client( 23 host=config["host"], port=int(config["port"]), compression="lz4" 24 ) 25 self.client.execute( 26 "CREATE TABLE IF NOT EXISTS bgp (" 27 "time DateTime," 28 "type FixedString(1)," 29 "peer Int32," 30 "collector String," 31 "project String," 32 "country String," 33 "source String," 34 "prefix String," 35 "path String" 36 ") ENGINE = MergeTree ORDER BY (time, prefix, path)" 37 "SETTINGS old_parts_lifetime=10" 38 ) 39 print("Clickhouse : Generated BGP table :D") 40 41 def start(self): 42 """ 43 Create bgp table 44 (time DateTime, type, peer, collector, country, source, prefix, path) 45 Start thread for batch inserts 46 """ 47 # self.client.execute("DROP TABLE IF EXISTS bgp") 48 self.started = True 49 threading.Thread( 50 target=self.insert_batches, daemon=True, name="BGP monitor - clickhouse" 51 ).start() 52 53 def stop(self): 54 """Stop inserts""" 55 if self.started: 56 self.started = False 57 58 ############### 59 # INSERTS # 60 ############### 61 62 def save(self, data): 63 """Input data in queue for processing 64 65 Args: 66 data (BGPElem): bgp element to save 67 """ 68 self.queue.put(data) 69 70 def get_data(self): 71 """Retrieve data for inserts 72 Yields: 73 dict: Data to insert 74 """ 75 for idx in range(self.BATCH_SIZE): 76 rec = self.queue.get() 77 yield { 78 "time": int(rec["time"]), 79 "type": rec["type"], 80 "peer": rec["peer_asn"], 81 "collector": rec["collector"], 82 "country": rec["country_code"] or "", 83 "source": rec["source"] or "", 84 "prefix": rec["prefix"], 85 "path": rec.get("as-path", ""), 86 "project": rec["project"], 87 } 88 if self.queue.qsize() == 0 and not self.started: 89 print("Clickhouse : Inserting last batch") 90 return 91 92 def insert_batches(self): 93 """ 94 Insert BATCH_SIZE(15000 lines) batches 95 https://clickhouse.com/docs/en/about-us/performance/""" 96 while self.started: 97 self.client.execute("INSERT INTO bgp VALUES ", self.get_data()) 98 99 ############## 100 # GETTER # 101 ############## 102 103 var_names = { 104 "time_start": "time_start", 105 "time_end": "time_end", 106 "record_type": "type", 107 "peer_asn": "peer", 108 "collectors": "collector", 109 "countries": "country", 110 "as_numbers": "source", 111 "prefixes": "prefix", 112 "as_paths": "path", 113 } 114 # Used in `ClickHouseDB.get` function as arg_name:db_column_name 115 116 def get( 117 self, 118 time_start, 119 time_end, 120 record_type=None, 121 peer_asn=None, 122 collectors=None, 123 countries=None, 124 as_numbers=None, 125 prefixes=None, 126 as_paths=None, 127 ): 128 query = "SELECT * FROM bgp WHERE" 129 params = {key: value for key, value in locals() if value is not None} 130 131 if len(params) > 0: 132 for k, v in params: 133 query += ( 134 f" %({ClickHouseDB.var_names[k]}) " 135 + ("IN " if isinstance(v, collections.Sequence) else "== ") 136 + f"%({k}) and" 137 ) 138 query += query.rsplit(" ", 1)[0] 139 return self.client.execute_iter(query, params)
Helper class that provides a standard way to create an ABC using inheritance.
ClickHouseDB(config)
12 def __init__(self, config): 13 """ 14 Args: 15 config (dict): Format : 16 {"host":"127.0.0.1", "port":9000, "batch_size":15000} 17 """ 18 super().__init__() 19 self.started = False 20 self.BATCH_SIZE = int(config["batch_size"]) if "batch_size" in config else 15000 21 self.queue = Queue() 22 self.client = Client( 23 host=config["host"], port=int(config["port"]), compression="lz4" 24 ) 25 self.client.execute( 26 "CREATE TABLE IF NOT EXISTS bgp (" 27 "time DateTime," 28 "type FixedString(1)," 29 "peer Int32," 30 "collector String," 31 "project String," 32 "country String," 33 "source String," 34 "prefix String," 35 "path String" 36 ") ENGINE = MergeTree ORDER BY (time, prefix, path)" 37 "SETTINGS old_parts_lifetime=10" 38 ) 39 print("Clickhouse : Generated BGP table :D")
Args: config (dict): Format : {"host":"127.0.0.1", "port":9000, "batch_size":15000}
name =
'clickhouse'
This variable is used for configuration file
You must specify it in config to load an instance
def
start(self):
41 def start(self): 42 """ 43 Create bgp table 44 (time DateTime, type, peer, collector, country, source, prefix, path) 45 Start thread for batch inserts 46 """ 47 # self.client.execute("DROP TABLE IF EXISTS bgp") 48 self.started = True 49 threading.Thread( 50 target=self.insert_batches, daemon=True, name="BGP monitor - clickhouse" 51 ).start()
Create bgp table (time DateTime, type, peer, collector, country, source, prefix, path) Start thread for batch inserts
def
save(self, data):
62 def save(self, data): 63 """Input data in queue for processing 64 65 Args: 66 data (BGPElem): bgp element to save 67 """ 68 self.queue.put(data)
Input data in queue for processing
Args: data (BGPElem): bgp element to save
def
get_data(self):
70 def get_data(self): 71 """Retrieve data for inserts 72 Yields: 73 dict: Data to insert 74 """ 75 for idx in range(self.BATCH_SIZE): 76 rec = self.queue.get() 77 yield { 78 "time": int(rec["time"]), 79 "type": rec["type"], 80 "peer": rec["peer_asn"], 81 "collector": rec["collector"], 82 "country": rec["country_code"] or "", 83 "source": rec["source"] or "", 84 "prefix": rec["prefix"], 85 "path": rec.get("as-path", ""), 86 "project": rec["project"], 87 } 88 if self.queue.qsize() == 0 and not self.started: 89 print("Clickhouse : Inserting last batch") 90 return
Retrieve data for inserts Yields: dict: Data to insert
def
insert_batches(self):
92 def insert_batches(self): 93 """ 94 Insert BATCH_SIZE(15000 lines) batches 95 https://clickhouse.com/docs/en/about-us/performance/""" 96 while self.started: 97 self.client.execute("INSERT INTO bgp VALUES ", self.get_data())
Insert BATCH_SIZE(15000 lines) batches https://clickhouse.com/docs/en/about-us/performance/
def
get( self, time_start, time_end, record_type=None, peer_asn=None, collectors=None, countries=None, as_numbers=None, prefixes=None, as_paths=None):
116 def get( 117 self, 118 time_start, 119 time_end, 120 record_type=None, 121 peer_asn=None, 122 collectors=None, 123 countries=None, 124 as_numbers=None, 125 prefixes=None, 126 as_paths=None, 127 ): 128 query = "SELECT * FROM bgp WHERE" 129 params = {key: value for key, value in locals() if value is not None} 130 131 if len(params) > 0: 132 for k, v in params: 133 query += ( 134 f" %({ClickHouseDB.var_names[k]}) " 135 + ("IN " if isinstance(v, collections.Sequence) else "== ") 136 + f"%({k}) and" 137 ) 138 query += query.rsplit(" ", 1)[0] 139 return self.client.execute_iter(query, params)