Edit on GitHub

bin.bgpout

Ingest all records, queue, reformat, send, publish, save them.

  1"""
  2Ingest all records, queue, reformat, send, publish, save them.
  3"""
  4
  5__all__ = ["BGPOut"]
  6
  7import os
  8import sys
  9import json
 10from typing import TextIO
 11from Databases.database import BGPDatabases
 12from bgpgraph import BGPGraph
 13
 14
 15class SetEncoder(json.JSONEncoder):
 16    def default(self, obj):
 17        if isinstance(obj, set):
 18            return list(obj)
 19        return json.JSONEncoder.default(self, obj)
 20
 21
 22class BGPOut:
 23    """
 24    BGP Output
 25    - Ingest records, convert them to json, print to console and/or write them to files.
 26    - Save all records in database.
 27    - Can store bgp records in queue before processing them if desired.
 28    """
 29
 30    def __init__(self) -> None:
 31        self.__expected_result = None
 32        self.__json_out = None
 33        self.verbose: bool = False
 34        """print to console"""
 35        self.isQueue: bool = False
 36        """Enable queue, can prevent from blocking BGPStream"""
 37        self.isStarted: bool = False
 38        """Is the stream started or not"""
 39        self.databases = BGPDatabases({})
 40        self.graph = BGPGraph()
 41
 42    #######################
 43    #   GETTERS/SETTERS   #
 44    #######################
 45
 46    @property
 47    def json_out(self) -> TextIO:
 48        """
 49        (File): Print JSON in it
 50
 51        Raises:
 52            Exception: If unable write/read in it
 53        """
 54        return self.__json_out
 55
 56    @json_out.setter
 57    def json_out(self, json_out):
 58        """
 59        Setter for JSON output
 60        Parameters:
 61            json_out (File): Where to output json
 62
 63        Raises:
 64            Exception: If unable write/read in it
 65        """
 66        if hasattr(json_out, "write"):
 67            self.__json_out = json_out
 68
 69    @property
 70    def expected_result(self) -> TextIO:
 71        """Expected result when execution end"""
 72        return self.__expected_result
 73
 74    @expected_result.setter
 75    def expected_result(self, exp):
 76        if exp is not None:
 77            if hasattr(exp, "read"):
 78                self.__expected_result = exp
 79            else:
 80                raise FileNotFoundError(f"Is {exp} a file ?")
 81
 82    ########
 83    # MAIN #
 84    ########
 85
 86    def start(self):
 87        """
 88        Start output
 89
 90        - Init json output if specified
 91        - Start queue if enabled
 92        """
 93        if self.__json_out:
 94            self.__json_out.write("[")
 95        self.isStarted = True
 96
 97    def stop(self):
 98        """
 99        - Set state as stopped
100        - Close file output
101        - Check if result is as expected
102        """
103        if self.isStarted:
104            self.isStarted = False
105            self.databases.stop()
106            self.closeFile(self.__json_out)
107            if self.__expected_result:
108                print("Testing result: ")
109                with open(self.__json_out.name, "r+") as js:
110                    print(
111                        "Result is as expected"
112                        if checkFiles(js, self.__expected_result)
113                        else "Result is not as expected"
114                    )
115
116    def iteration(self, e):
117        """Iterate over queue to process each bgp element"""
118        # self.redis.xadd()
119
120        if self.verbose:
121            print(json.dumps(e, sort_keys=True, cls=SetEncoder) + "\n", flush=True)
122        if self.json_out:
123            self.json_out.write(
124                json.dumps(e, sort_keys=True, indent=4, cls=SetEncoder) + ",\n"
125            )
126        self.graph.update(e)
127        self.databases.save(e)
128
129    def closeFile(self, file):
130        """Close a JSON file
131
132        Args:
133            file (File): The file to close
134        """
135        if file == sys.stdout or file is None:
136            return
137        file.seek(file.tell() - 1, os.SEEK_SET)
138        file.truncate()
139        file.write("]")
140        file.close()
141
142
143def checkFiles(f1, f2) -> bool:
144    """
145    Check if two json files are equal
146
147    Args:
148        f1, f2 (File): json files
149    """
150
151    f1.seek(0, os.SEEK_SET)
152    f2.seek(0, os.SEEK_SET)
153    json1 = json.load(f1)
154    json2 = json.load(f2)
155    return json1 == json2
class BGPOut:
 23class BGPOut:
 24    """
 25    BGP Output
 26    - Ingest records, convert them to json, print to console and/or write them to files.
 27    - Save all records in database.
 28    - Can store bgp records in queue before processing them if desired.
 29    """
 30
 31    def __init__(self) -> None:
 32        self.__expected_result = None
 33        self.__json_out = None
 34        self.verbose: bool = False
 35        """print to console"""
 36        self.isQueue: bool = False
 37        """Enable queue, can prevent from blocking BGPStream"""
 38        self.isStarted: bool = False
 39        """Is the stream started or not"""
 40        self.databases = BGPDatabases({})
 41        self.graph = BGPGraph()
 42
 43    #######################
 44    #   GETTERS/SETTERS   #
 45    #######################
 46
 47    @property
 48    def json_out(self) -> TextIO:
 49        """
 50        (File): Print JSON in it
 51
 52        Raises:
 53            Exception: If unable write/read in it
 54        """
 55        return self.__json_out
 56
 57    @json_out.setter
 58    def json_out(self, json_out):
 59        """
 60        Setter for JSON output
 61        Parameters:
 62            json_out (File): Where to output json
 63
 64        Raises:
 65            Exception: If unable write/read in it
 66        """
 67        if hasattr(json_out, "write"):
 68            self.__json_out = json_out
 69
 70    @property
 71    def expected_result(self) -> TextIO:
 72        """Expected result when execution end"""
 73        return self.__expected_result
 74
 75    @expected_result.setter
 76    def expected_result(self, exp):
 77        if exp is not None:
 78            if hasattr(exp, "read"):
 79                self.__expected_result = exp
 80            else:
 81                raise FileNotFoundError(f"Is {exp} a file ?")
 82
 83    ########
 84    # MAIN #
 85    ########
 86
 87    def start(self):
 88        """
 89        Start output
 90
 91        - Init json output if specified
 92        - Start queue if enabled
 93        """
 94        if self.__json_out:
 95            self.__json_out.write("[")
 96        self.isStarted = True
 97
 98    def stop(self):
 99        """
100        - Set state as stopped
101        - Close file output
102        - Check if result is as expected
103        """
104        if self.isStarted:
105            self.isStarted = False
106            self.databases.stop()
107            self.closeFile(self.__json_out)
108            if self.__expected_result:
109                print("Testing result: ")
110                with open(self.__json_out.name, "r+") as js:
111                    print(
112                        "Result is as expected"
113                        if checkFiles(js, self.__expected_result)
114                        else "Result is not as expected"
115                    )
116
117    def iteration(self, e):
118        """Iterate over queue to process each bgp element"""
119        # self.redis.xadd()
120
121        if self.verbose:
122            print(json.dumps(e, sort_keys=True, cls=SetEncoder) + "\n", flush=True)
123        if self.json_out:
124            self.json_out.write(
125                json.dumps(e, sort_keys=True, indent=4, cls=SetEncoder) + ",\n"
126            )
127        self.graph.update(e)
128        self.databases.save(e)
129
130    def closeFile(self, file):
131        """Close a JSON file
132
133        Args:
134            file (File): The file to close
135        """
136        if file == sys.stdout or file is None:
137            return
138        file.seek(file.tell() - 1, os.SEEK_SET)
139        file.truncate()
140        file.write("]")
141        file.close()

BGP Output

  • Ingest records, convert them to json, print to console and/or write them to files.
  • Save all records in database.
  • Can store bgp records in queue before processing them if desired.
verbose: bool

print to console

isQueue: bool

Enable queue, can prevent from blocking BGPStream

isStarted: bool

Is the stream started or not

json_out: <class 'TextIO'>

Setter for JSON output Parameters: json_out (File): Where to output json

Raises: Exception: If unable write/read in it

expected_result: <class 'TextIO'>

Expected result when execution end

def start(self):
87    def start(self):
88        """
89        Start output
90
91        - Init json output if specified
92        - Start queue if enabled
93        """
94        if self.__json_out:
95            self.__json_out.write("[")
96        self.isStarted = True

Start output

  • Init json output if specified
  • Start queue if enabled
def stop(self):
 98    def stop(self):
 99        """
100        - Set state as stopped
101        - Close file output
102        - Check if result is as expected
103        """
104        if self.isStarted:
105            self.isStarted = False
106            self.databases.stop()
107            self.closeFile(self.__json_out)
108            if self.__expected_result:
109                print("Testing result: ")
110                with open(self.__json_out.name, "r+") as js:
111                    print(
112                        "Result is as expected"
113                        if checkFiles(js, self.__expected_result)
114                        else "Result is not as expected"
115                    )
  • Set state as stopped
  • Close file output
  • Check if result is as expected
def iteration(self, e):
117    def iteration(self, e):
118        """Iterate over queue to process each bgp element"""
119        # self.redis.xadd()
120
121        if self.verbose:
122            print(json.dumps(e, sort_keys=True, cls=SetEncoder) + "\n", flush=True)
123        if self.json_out:
124            self.json_out.write(
125                json.dumps(e, sort_keys=True, indent=4, cls=SetEncoder) + ",\n"
126            )
127        self.graph.update(e)
128        self.databases.save(e)

Iterate over queue to process each bgp element

def closeFile(self, file):
130    def closeFile(self, file):
131        """Close a JSON file
132
133        Args:
134            file (File): The file to close
135        """
136        if file == sys.stdout or file is None:
137            return
138        file.seek(file.tell() - 1, os.SEEK_SET)
139        file.truncate()
140        file.write("]")
141        file.close()

Close a JSON file

Args: file (File): The file to close