Edit on GitHub

bin.Databases.ClickHouseDB

  1import collections
  2import threading
  3from Databases.database import Database
  4from clickhouse_driver import Client
  5from queue import Queue
  6
  7
  8class ClickHouseDB(Database):
  9    name = "clickhouse"
 10
 11    def __init__(self, config):
 12        """
 13        Args:
 14            config (dict): Format :
 15                            {"host":"127.0.0.1", "port":9000, "batch_size":15000}
 16        """
 17        super().__init__()
 18        self.started = False
 19        self.BATCH_SIZE = int(config["batch_size"]) if "batch_size" in config else 15000
 20        self.queue = Queue()
 21        self.client = Client(
 22            host=config["host"], port=int(config["port"]), compression="lz4"
 23        )
 24        self.client.execute(
 25            "CREATE TABLE IF NOT EXISTS bgp ("
 26            "time DateTime,"
 27            "type FixedString(1),"
 28            "peer Int32,"
 29            "collector String,"
 30            "project String,"
 31            "country String,"
 32            "source String,"
 33            "prefix String,"
 34            "path String"
 35            ") ENGINE = MergeTree ORDER BY (time, prefix, path)"
 36            "SETTINGS old_parts_lifetime=10"
 37        )
 38        print("Clickhouse : Generated BGP table :D")
 39
 40    def start(self):
 41        """
 42        Create bgp table
 43        (time DateTime, type, peer, collector, country, source, prefix, path)
 44        Start thread for batch inserts
 45        """
 46        # self.client.execute("DROP TABLE IF EXISTS bgp")
 47        self.started = True
 48        threading.Thread(
 49            target=self.insert_batches, daemon=True, name="BGP monitor - clickhouse"
 50        ).start()
 51
 52    def stop(self):
 53        """Stop inserts"""
 54        if self.started:
 55            self.started = False
 56
 57    ###############
 58    #   INSERTS   #
 59    ###############
 60
 61    def save(self, data):
 62        """Input data in queue for processing
 63
 64        Args:
 65            data (BGPElem): bgp element to save
 66        """
 67        self.queue.put(data)
 68
 69    def get_data(self):
 70        """Retrieve data for inserts
 71        Yields:
 72            dict: Data to insert
 73        """
 74        for idx in range(self.BATCH_SIZE):
 75            rec = self.queue.get()
 76            yield {
 77                "time": int(rec["time"]),
 78                "type": rec["type"],
 79                "peer": rec["peer_asn"],
 80                "collector": rec["collector"],
 81                "country": rec["country_code"] or "",
 82                "source": rec["source"] or "",
 83                "prefix": rec["prefix"],
 84                "path": rec.get("as-path", ""),
 85                "project": rec["project"],
 86            }
 87            if self.queue.qsize() == 0 and not self.started:
 88                print("Clickhouse : Inserting last batch")
 89                return
 90
 91    def insert_batches(self):
 92        """
 93        Insert BATCH_SIZE(15000 lines) batches
 94        https://clickhouse.com/docs/en/about-us/performance/"""
 95        while self.started:
 96            self.client.execute("INSERT INTO bgp VALUES ", self.get_data())
 97
 98    ##############
 99    #   GETTER   #
100    ##############
101
102    var_names = {
103        "time_start": "time_start",
104        "time_end": "time_end",
105        "record_type": "type",
106        "peer_asn": "peer",
107        "collectors": "collector",
108        "countries": "country",
109        "as_numbers": "source",
110        "prefixes": "prefix",
111        "as_paths": "path",
112    }
113    # Used in `ClickHouseDB.get` function as arg_name:db_column_name
114
115    def get(
116        self,
117        time_start,
118        time_end,
119        record_type=None,
120        peer_asn=None,
121        collectors=None,
122        countries=None,
123        as_numbers=None,
124        prefixes=None,
125        as_paths=None,
126    ):
127        query = "SELECT * FROM bgp WHERE"
128        params = {key: value for key, value in locals() if value is not None}
129
130        if len(params) > 0:
131            for k, v in params:
132                query += (
133                    f" %({ClickHouseDB.var_names[k]}) "
134                    + ("IN " if isinstance(v, collections.Sequence) else "== ")
135                    + f"%({k}) and"
136                )
137            query += query.rsplit(" ", 1)[0]
138        return self.client.execute_iter(query, params)
class ClickHouseDB(Databases.database.Database):
  9class ClickHouseDB(Database):
 10    name = "clickhouse"
 11
 12    def __init__(self, config):
 13        """
 14        Args:
 15            config (dict): Format :
 16                            {"host":"127.0.0.1", "port":9000, "batch_size":15000}
 17        """
 18        super().__init__()
 19        self.started = False
 20        self.BATCH_SIZE = int(config["batch_size"]) if "batch_size" in config else 15000
 21        self.queue = Queue()
 22        self.client = Client(
 23            host=config["host"], port=int(config["port"]), compression="lz4"
 24        )
 25        self.client.execute(
 26            "CREATE TABLE IF NOT EXISTS bgp ("
 27            "time DateTime,"
 28            "type FixedString(1),"
 29            "peer Int32,"
 30            "collector String,"
 31            "project String,"
 32            "country String,"
 33            "source String,"
 34            "prefix String,"
 35            "path String"
 36            ") ENGINE = MergeTree ORDER BY (time, prefix, path)"
 37            "SETTINGS old_parts_lifetime=10"
 38        )
 39        print("Clickhouse : Generated BGP table :D")
 40
 41    def start(self):
 42        """
 43        Create bgp table
 44        (time DateTime, type, peer, collector, country, source, prefix, path)
 45        Start thread for batch inserts
 46        """
 47        # self.client.execute("DROP TABLE IF EXISTS bgp")
 48        self.started = True
 49        threading.Thread(
 50            target=self.insert_batches, daemon=True, name="BGP monitor - clickhouse"
 51        ).start()
 52
 53    def stop(self):
 54        """Stop inserts"""
 55        if self.started:
 56            self.started = False
 57
 58    ###############
 59    #   INSERTS   #
 60    ###############
 61
 62    def save(self, data):
 63        """Input data in queue for processing
 64
 65        Args:
 66            data (BGPElem): bgp element to save
 67        """
 68        self.queue.put(data)
 69
 70    def get_data(self):
 71        """Retrieve data for inserts
 72        Yields:
 73            dict: Data to insert
 74        """
 75        for idx in range(self.BATCH_SIZE):
 76            rec = self.queue.get()
 77            yield {
 78                "time": int(rec["time"]),
 79                "type": rec["type"],
 80                "peer": rec["peer_asn"],
 81                "collector": rec["collector"],
 82                "country": rec["country_code"] or "",
 83                "source": rec["source"] or "",
 84                "prefix": rec["prefix"],
 85                "path": rec.get("as-path", ""),
 86                "project": rec["project"],
 87            }
 88            if self.queue.qsize() == 0 and not self.started:
 89                print("Clickhouse : Inserting last batch")
 90                return
 91
 92    def insert_batches(self):
 93        """
 94        Insert BATCH_SIZE(15000 lines) batches
 95        https://clickhouse.com/docs/en/about-us/performance/"""
 96        while self.started:
 97            self.client.execute("INSERT INTO bgp VALUES ", self.get_data())
 98
 99    ##############
100    #   GETTER   #
101    ##############
102
103    var_names = {
104        "time_start": "time_start",
105        "time_end": "time_end",
106        "record_type": "type",
107        "peer_asn": "peer",
108        "collectors": "collector",
109        "countries": "country",
110        "as_numbers": "source",
111        "prefixes": "prefix",
112        "as_paths": "path",
113    }
114    # Used in `ClickHouseDB.get` function as arg_name:db_column_name
115
116    def get(
117        self,
118        time_start,
119        time_end,
120        record_type=None,
121        peer_asn=None,
122        collectors=None,
123        countries=None,
124        as_numbers=None,
125        prefixes=None,
126        as_paths=None,
127    ):
128        query = "SELECT * FROM bgp WHERE"
129        params = {key: value for key, value in locals() if value is not None}
130
131        if len(params) > 0:
132            for k, v in params:
133                query += (
134                    f" %({ClickHouseDB.var_names[k]}) "
135                    + ("IN " if isinstance(v, collections.Sequence) else "== ")
136                    + f"%({k}) and"
137                )
138            query += query.rsplit(" ", 1)[0]
139        return self.client.execute_iter(query, params)

Helper class that provides a standard way to create an ABC using inheritance.

ClickHouseDB(config)
12    def __init__(self, config):
13        """
14        Args:
15            config (dict): Format :
16                            {"host":"127.0.0.1", "port":9000, "batch_size":15000}
17        """
18        super().__init__()
19        self.started = False
20        self.BATCH_SIZE = int(config["batch_size"]) if "batch_size" in config else 15000
21        self.queue = Queue()
22        self.client = Client(
23            host=config["host"], port=int(config["port"]), compression="lz4"
24        )
25        self.client.execute(
26            "CREATE TABLE IF NOT EXISTS bgp ("
27            "time DateTime,"
28            "type FixedString(1),"
29            "peer Int32,"
30            "collector String,"
31            "project String,"
32            "country String,"
33            "source String,"
34            "prefix String,"
35            "path String"
36            ") ENGINE = MergeTree ORDER BY (time, prefix, path)"
37            "SETTINGS old_parts_lifetime=10"
38        )
39        print("Clickhouse : Generated BGP table :D")

Args: config (dict): Format : {"host":"127.0.0.1", "port":9000, "batch_size":15000}

name = 'clickhouse'

This variable is used for configuration file

You must specify it in config to load an instance

def start(self):
41    def start(self):
42        """
43        Create bgp table
44        (time DateTime, type, peer, collector, country, source, prefix, path)
45        Start thread for batch inserts
46        """
47        # self.client.execute("DROP TABLE IF EXISTS bgp")
48        self.started = True
49        threading.Thread(
50            target=self.insert_batches, daemon=True, name="BGP monitor - clickhouse"
51        ).start()

Create bgp table (time DateTime, type, peer, collector, country, source, prefix, path) Start thread for batch inserts

def stop(self):
53    def stop(self):
54        """Stop inserts"""
55        if self.started:
56            self.started = False

Stop inserts

def save(self, data):
62    def save(self, data):
63        """Input data in queue for processing
64
65        Args:
66            data (BGPElem): bgp element to save
67        """
68        self.queue.put(data)

Input data in queue for processing

Args: data (BGPElem): bgp element to save

def get_data(self):
70    def get_data(self):
71        """Retrieve data for inserts
72        Yields:
73            dict: Data to insert
74        """
75        for idx in range(self.BATCH_SIZE):
76            rec = self.queue.get()
77            yield {
78                "time": int(rec["time"]),
79                "type": rec["type"],
80                "peer": rec["peer_asn"],
81                "collector": rec["collector"],
82                "country": rec["country_code"] or "",
83                "source": rec["source"] or "",
84                "prefix": rec["prefix"],
85                "path": rec.get("as-path", ""),
86                "project": rec["project"],
87            }
88            if self.queue.qsize() == 0 and not self.started:
89                print("Clickhouse : Inserting last batch")
90                return

Retrieve data for inserts Yields: dict: Data to insert

def insert_batches(self):
92    def insert_batches(self):
93        """
94        Insert BATCH_SIZE(15000 lines) batches
95        https://clickhouse.com/docs/en/about-us/performance/"""
96        while self.started:
97            self.client.execute("INSERT INTO bgp VALUES ", self.get_data())

Insert BATCH_SIZE(15000 lines) batches https://clickhouse.com/docs/en/about-us/performance/

def get( self, time_start, time_end, record_type=None, peer_asn=None, collectors=None, countries=None, as_numbers=None, prefixes=None, as_paths=None):
116    def get(
117        self,
118        time_start,
119        time_end,
120        record_type=None,
121        peer_asn=None,
122        collectors=None,
123        countries=None,
124        as_numbers=None,
125        prefixes=None,
126        as_paths=None,
127    ):
128        query = "SELECT * FROM bgp WHERE"
129        params = {key: value for key, value in locals() if value is not None}
130
131        if len(params) > 0:
132            for k, v in params:
133                query += (
134                    f" %({ClickHouseDB.var_names[k]}) "
135                    + ("IN " if isinstance(v, collections.Sequence) else "== ")
136                    + f"%({k}) and"
137                )
138            query += query.rsplit(" ", 1)[0]
139        return self.client.execute_iter(query, params)