Edit on GitHub

bin.Databases.QuestDB

  1import collections
  2from Databases.database import Database
  3import socket
  4import psycopg2
  5
  6
  7class QuestDB(Database):
  8    name = "quest"
  9
 10    def __init__(self, config):
 11        super().__init__()
 12        # For UDP, change socket.SOCK_STREAM to socket.SOCK_DGRAM
 13        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 14
 15        # Required args for line protocol and postgre connection
 16        self.conf = {
 17            "host": config["host"],
 18            "tcp_port": int(config["tcp_port"]),
 19            "pg_port": int(config["pg_port"]),
 20        }
 21        self.connection = None
 22
 23    def start(self):
 24        """Connect to server using etc/config.cfg file"""
 25        self.sock.connect((self.conf["host"], self.conf["tcp_port"]))
 26        self.connection = psycopg2.connect(
 27            host=self.conf["host"],
 28            port=self.conf["pg_port"],
 29            user="admin",
 30            password="quest",
 31            database="qdb",
 32        )
 33
 34    def stop(self):
 35        """Close connection to server"""
 36        self.sock.close()
 37        if self.connection:
 38            self.connection.close()
 39            print("PostgreSQL connection is closed")
 40
 41    ###############
 42    #   INSERTS   #
 43    ###############
 44
 45    def save(self, record):
 46        """Save bgp record using InfluxDB Line protocol
 47
 48        Format :
 49                bgp,type={record['type']},project={record['project']},
 50                collector={record['collector']},country={record['country_code'] or ''}
 51                 peer={record["peer_asn"]},prefix="{record["prefix"]}",
 52                path="{record.get("as-path", "")}",source="{record["source"]}"
 53                 {int(record['time']*1000000000)}\n
 54
 55        Args:
 56            record (BGPElem)
 57        """
 58
 59        self.send_utf8(
 60            (
 61                f"bgp,type={record['type']},project={record['project']},"
 62                f"collector={record['collector']},"
 63                f"country={record['country_code'] or ''}"
 64                f' peer={record["peer_asn"]},prefix="{record["prefix"]}",'
 65                f'path="{record.get("as-path", "")}",source="{record["source"]}"'
 66                f" {int(record['time']*1000000000)}\n"
 67            )
 68        )
 69
 70    def send_utf8(self, msg):
 71        """Encode message and send it to server"""
 72        self.sock.sendall(msg.encode())
 73
 74    ##############
 75    #   GETTER   #
 76    ##############
 77    # The followings aren't used
 78
 79    var_names = {
 80        "time_start": "time_start",
 81        "time_end": "time_end",
 82        "record_type": "type",
 83        "peer_asn": "peer",
 84        "collectors": "collector",
 85        "countries": "country",
 86        "as_numbers": "source",
 87        "prefixes": "prefix",
 88        "as_paths": "path",
 89    }
 90    # Used in `QuestDB.get` function as arg_name:db_column_name
 91
 92    def get(
 93        self,
 94        time_start,
 95        time_end,
 96        record_type=None,
 97        peer_asn=None,
 98        collectors=None,
 99        countries=None,
100        as_numbers=None,
101        prefixes=None,
102        as_paths=None,
103    ):
104        """
105        Retrieve data from QuestDB using psycopg2 (Postgre) connection
106
107        See `Database.get()`
108        """
109        if not self.cursor:
110            self.cursor = self.connection.cursor()
111
112        query = "SELECT * FROM bgp WHERE"
113        params = {key: value for key, value in locals() if value is not None}
114
115        if len(params) > 0:
116            for k, v in params:
117                query += (
118                    f" {QuestDB.var_names[k]} "
119                    + ("IN " if isinstance(v, collections.Sequence) else "== ")
120                    + "%s and"
121                )
122            query += query.rsplit(" ", 1)[0]
123
124        self.cursor.execute(query, params.values())
125        return self.cursor.fetchall()
class QuestDB(Databases.database.Database):
  8class QuestDB(Database):
  9    name = "quest"
 10
 11    def __init__(self, config):
 12        super().__init__()
 13        # For UDP, change socket.SOCK_STREAM to socket.SOCK_DGRAM
 14        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 15
 16        # Required args for line protocol and postgre connection
 17        self.conf = {
 18            "host": config["host"],
 19            "tcp_port": int(config["tcp_port"]),
 20            "pg_port": int(config["pg_port"]),
 21        }
 22        self.connection = None
 23
 24    def start(self):
 25        """Connect to server using etc/config.cfg file"""
 26        self.sock.connect((self.conf["host"], self.conf["tcp_port"]))
 27        self.connection = psycopg2.connect(
 28            host=self.conf["host"],
 29            port=self.conf["pg_port"],
 30            user="admin",
 31            password="quest",
 32            database="qdb",
 33        )
 34
 35    def stop(self):
 36        """Close connection to server"""
 37        self.sock.close()
 38        if self.connection:
 39            self.connection.close()
 40            print("PostgreSQL connection is closed")
 41
 42    ###############
 43    #   INSERTS   #
 44    ###############
 45
 46    def save(self, record):
 47        """Save bgp record using InfluxDB Line protocol
 48
 49        Format :
 50                bgp,type={record['type']},project={record['project']},
 51                collector={record['collector']},country={record['country_code'] or ''}
 52                 peer={record["peer_asn"]},prefix="{record["prefix"]}",
 53                path="{record.get("as-path", "")}",source="{record["source"]}"
 54                 {int(record['time']*1000000000)}\n
 55
 56        Args:
 57            record (BGPElem)
 58        """
 59
 60        self.send_utf8(
 61            (
 62                f"bgp,type={record['type']},project={record['project']},"
 63                f"collector={record['collector']},"
 64                f"country={record['country_code'] or ''}"
 65                f' peer={record["peer_asn"]},prefix="{record["prefix"]}",'
 66                f'path="{record.get("as-path", "")}",source="{record["source"]}"'
 67                f" {int(record['time']*1000000000)}\n"
 68            )
 69        )
 70
 71    def send_utf8(self, msg):
 72        """Encode message and send it to server"""
 73        self.sock.sendall(msg.encode())
 74
 75    ##############
 76    #   GETTER   #
 77    ##############
 78    # The followings aren't used
 79
 80    var_names = {
 81        "time_start": "time_start",
 82        "time_end": "time_end",
 83        "record_type": "type",
 84        "peer_asn": "peer",
 85        "collectors": "collector",
 86        "countries": "country",
 87        "as_numbers": "source",
 88        "prefixes": "prefix",
 89        "as_paths": "path",
 90    }
 91    # Used in `QuestDB.get` function as arg_name:db_column_name
 92
 93    def get(
 94        self,
 95        time_start,
 96        time_end,
 97        record_type=None,
 98        peer_asn=None,
 99        collectors=None,
100        countries=None,
101        as_numbers=None,
102        prefixes=None,
103        as_paths=None,
104    ):
105        """
106        Retrieve data from QuestDB using psycopg2 (Postgre) connection
107
108        See `Database.get()`
109        """
110        if not self.cursor:
111            self.cursor = self.connection.cursor()
112
113        query = "SELECT * FROM bgp WHERE"
114        params = {key: value for key, value in locals() if value is not None}
115
116        if len(params) > 0:
117            for k, v in params:
118                query += (
119                    f" {QuestDB.var_names[k]} "
120                    + ("IN " if isinstance(v, collections.Sequence) else "== ")
121                    + "%s and"
122                )
123            query += query.rsplit(" ", 1)[0]
124
125        self.cursor.execute(query, params.values())
126        return self.cursor.fetchall()

Helper class that provides a standard way to create an ABC using inheritance.

QuestDB(config)
11    def __init__(self, config):
12        super().__init__()
13        # For UDP, change socket.SOCK_STREAM to socket.SOCK_DGRAM
14        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
15
16        # Required args for line protocol and postgre connection
17        self.conf = {
18            "host": config["host"],
19            "tcp_port": int(config["tcp_port"]),
20            "pg_port": int(config["pg_port"]),
21        }
22        self.connection = None
name = 'quest'

This variable is used for configuration file

You must specify it in config to load an instance

def start(self):
24    def start(self):
25        """Connect to server using etc/config.cfg file"""
26        self.sock.connect((self.conf["host"], self.conf["tcp_port"]))
27        self.connection = psycopg2.connect(
28            host=self.conf["host"],
29            port=self.conf["pg_port"],
30            user="admin",
31            password="quest",
32            database="qdb",
33        )

Connect to server using etc/config.cfg file

def stop(self):
35    def stop(self):
36        """Close connection to server"""
37        self.sock.close()
38        if self.connection:
39            self.connection.close()
40            print("PostgreSQL connection is closed")

Close connection to server

def save(self, record):
46    def save(self, record):
47        """Save bgp record using InfluxDB Line protocol
48
49        Format :
50                bgp,type={record['type']},project={record['project']},
51                collector={record['collector']},country={record['country_code'] or ''}
52                 peer={record["peer_asn"]},prefix="{record["prefix"]}",
53                path="{record.get("as-path", "")}",source="{record["source"]}"
54                 {int(record['time']*1000000000)}\n
55
56        Args:
57            record (BGPElem)
58        """
59
60        self.send_utf8(
61            (
62                f"bgp,type={record['type']},project={record['project']},"
63                f"collector={record['collector']},"
64                f"country={record['country_code'] or ''}"
65                f' peer={record["peer_asn"]},prefix="{record["prefix"]}",'
66                f'path="{record.get("as-path", "")}",source="{record["source"]}"'
67                f" {int(record['time']*1000000000)}\n"
68            )
69        )

Save bgp record using InfluxDB Line protocol

Format : bgp,type={record['type']},project={record['project']}, collector={record['collector']},country={record['country_code'] or ''} peer={record["peer_asn"]},prefix="{record["prefix"]}", path="{record.get("as-path", "")}",source="{record["source"]}" {int(record['time']*1000000000)}

Args: record (BGPElem)

def send_utf8(self, msg):
71    def send_utf8(self, msg):
72        """Encode message and send it to server"""
73        self.sock.sendall(msg.encode())

Encode message and send it to server

def get( self, time_start, time_end, record_type=None, peer_asn=None, collectors=None, countries=None, as_numbers=None, prefixes=None, as_paths=None):
 93    def get(
 94        self,
 95        time_start,
 96        time_end,
 97        record_type=None,
 98        peer_asn=None,
 99        collectors=None,
100        countries=None,
101        as_numbers=None,
102        prefixes=None,
103        as_paths=None,
104    ):
105        """
106        Retrieve data from QuestDB using psycopg2 (Postgre) connection
107
108        See `Database.get()`
109        """
110        if not self.cursor:
111            self.cursor = self.connection.cursor()
112
113        query = "SELECT * FROM bgp WHERE"
114        params = {key: value for key, value in locals() if value is not None}
115
116        if len(params) > 0:
117            for k, v in params:
118                query += (
119                    f" {QuestDB.var_names[k]} "
120                    + ("IN " if isinstance(v, collections.Sequence) else "== ")
121                    + "%s and"
122                )
123            query += query.rsplit(" ", 1)[0]
124
125        self.cursor.execute(query, params.values())
126        return self.cursor.fetchall()

Retrieve data from QuestDB using psycopg2 (Postgre) connection

See Database.get()