bin.bgpout
Ingest all records, queue, reformat, send, publish, save them.
1""" 2Ingest all records, queue, reformat, send, publish, save them. 3""" 4 5__all__ = ["BGPOut"] 6 7import os 8import sys 9import json 10from typing import TextIO 11from Databases.database import BGPDatabases 12from bgpgraph import BGPGraph 13 14 15class SetEncoder(json.JSONEncoder): 16 def default(self, obj): 17 if isinstance(obj, set): 18 return list(obj) 19 return json.JSONEncoder.default(self, obj) 20 21 22class BGPOut: 23 """ 24 BGP Output 25 - Ingest records, convert them to json, print to console and/or write them to files. 26 - Save all records in database. 27 - Can store bgp records in queue before processing them if desired. 28 """ 29 30 def __init__(self) -> None: 31 self.__expected_result = None 32 self.__json_out = None 33 self.verbose: bool = False 34 """print to console""" 35 self.isQueue: bool = False 36 """Enable queue, can prevent from blocking BGPStream""" 37 self.isStarted: bool = False 38 """Is the stream started or not""" 39 self.databases = BGPDatabases({}) 40 self.graph = BGPGraph() 41 42 ####################### 43 # GETTERS/SETTERS # 44 ####################### 45 46 @property 47 def json_out(self) -> TextIO: 48 """ 49 (File): Print JSON in it 50 51 Raises: 52 Exception: If unable write/read in it 53 """ 54 return self.__json_out 55 56 @json_out.setter 57 def json_out(self, json_out): 58 """ 59 Setter for JSON output 60 Parameters: 61 json_out (File): Where to output json 62 63 Raises: 64 Exception: If unable write/read in it 65 """ 66 if hasattr(json_out, "write"): 67 self.__json_out = json_out 68 69 @property 70 def expected_result(self) -> TextIO: 71 """Expected result when execution end""" 72 return self.__expected_result 73 74 @expected_result.setter 75 def expected_result(self, exp): 76 if exp is not None: 77 if hasattr(exp, "read"): 78 self.__expected_result = exp 79 else: 80 raise FileNotFoundError(f"Is {exp} a file ?") 81 82 ######## 83 # MAIN # 84 ######## 85 86 def start(self): 87 """ 88 Start output 89 90 - Init json output if specified 91 - Start queue if enabled 92 """ 93 if self.__json_out: 94 self.__json_out.write("[") 95 self.isStarted = True 96 97 def stop(self): 98 """ 99 - Set state as stopped 100 - Close file output 101 - Check if result is as expected 102 """ 103 if self.isStarted: 104 self.isStarted = False 105 self.databases.stop() 106 self.closeFile(self.__json_out) 107 if self.__expected_result: 108 print("Testing result: ") 109 with open(self.__json_out.name, "r+") as js: 110 print( 111 "Result is as expected" 112 if checkFiles(js, self.__expected_result) 113 else "Result is not as expected" 114 ) 115 116 def iteration(self, e): 117 """Iterate over queue to process each bgp element""" 118 # self.redis.xadd() 119 120 if self.verbose: 121 print(json.dumps(e, sort_keys=True, cls=SetEncoder) + "\n", flush=True) 122 if self.json_out: 123 self.json_out.write( 124 json.dumps(e, sort_keys=True, indent=4, cls=SetEncoder) + ",\n" 125 ) 126 self.graph.update(e) 127 self.databases.save(e) 128 129 def closeFile(self, file): 130 """Close a JSON file 131 132 Args: 133 file (File): The file to close 134 """ 135 if file == sys.stdout or file is None: 136 return 137 file.seek(file.tell() - 1, os.SEEK_SET) 138 file.truncate() 139 file.write("]") 140 file.close() 141 142 143def checkFiles(f1, f2) -> bool: 144 """ 145 Check if two json files are equal 146 147 Args: 148 f1, f2 (File): json files 149 """ 150 151 f1.seek(0, os.SEEK_SET) 152 f2.seek(0, os.SEEK_SET) 153 json1 = json.load(f1) 154 json2 = json.load(f2) 155 return json1 == json2
class
BGPOut:
23class BGPOut: 24 """ 25 BGP Output 26 - Ingest records, convert them to json, print to console and/or write them to files. 27 - Save all records in database. 28 - Can store bgp records in queue before processing them if desired. 29 """ 30 31 def __init__(self) -> None: 32 self.__expected_result = None 33 self.__json_out = None 34 self.verbose: bool = False 35 """print to console""" 36 self.isQueue: bool = False 37 """Enable queue, can prevent from blocking BGPStream""" 38 self.isStarted: bool = False 39 """Is the stream started or not""" 40 self.databases = BGPDatabases({}) 41 self.graph = BGPGraph() 42 43 ####################### 44 # GETTERS/SETTERS # 45 ####################### 46 47 @property 48 def json_out(self) -> TextIO: 49 """ 50 (File): Print JSON in it 51 52 Raises: 53 Exception: If unable write/read in it 54 """ 55 return self.__json_out 56 57 @json_out.setter 58 def json_out(self, json_out): 59 """ 60 Setter for JSON output 61 Parameters: 62 json_out (File): Where to output json 63 64 Raises: 65 Exception: If unable write/read in it 66 """ 67 if hasattr(json_out, "write"): 68 self.__json_out = json_out 69 70 @property 71 def expected_result(self) -> TextIO: 72 """Expected result when execution end""" 73 return self.__expected_result 74 75 @expected_result.setter 76 def expected_result(self, exp): 77 if exp is not None: 78 if hasattr(exp, "read"): 79 self.__expected_result = exp 80 else: 81 raise FileNotFoundError(f"Is {exp} a file ?") 82 83 ######## 84 # MAIN # 85 ######## 86 87 def start(self): 88 """ 89 Start output 90 91 - Init json output if specified 92 - Start queue if enabled 93 """ 94 if self.__json_out: 95 self.__json_out.write("[") 96 self.isStarted = True 97 98 def stop(self): 99 """ 100 - Set state as stopped 101 - Close file output 102 - Check if result is as expected 103 """ 104 if self.isStarted: 105 self.isStarted = False 106 self.databases.stop() 107 self.closeFile(self.__json_out) 108 if self.__expected_result: 109 print("Testing result: ") 110 with open(self.__json_out.name, "r+") as js: 111 print( 112 "Result is as expected" 113 if checkFiles(js, self.__expected_result) 114 else "Result is not as expected" 115 ) 116 117 def iteration(self, e): 118 """Iterate over queue to process each bgp element""" 119 # self.redis.xadd() 120 121 if self.verbose: 122 print(json.dumps(e, sort_keys=True, cls=SetEncoder) + "\n", flush=True) 123 if self.json_out: 124 self.json_out.write( 125 json.dumps(e, sort_keys=True, indent=4, cls=SetEncoder) + ",\n" 126 ) 127 self.graph.update(e) 128 self.databases.save(e) 129 130 def closeFile(self, file): 131 """Close a JSON file 132 133 Args: 134 file (File): The file to close 135 """ 136 if file == sys.stdout or file is None: 137 return 138 file.seek(file.tell() - 1, os.SEEK_SET) 139 file.truncate() 140 file.write("]") 141 file.close()
BGP Output
- Ingest records, convert them to json, print to console and/or write them to files.
- Save all records in database.
- Can store bgp records in queue before processing them if desired.
json_out: <class 'TextIO'>
Setter for JSON output Parameters: json_out (File): Where to output json
Raises: Exception: If unable write/read in it
def
start(self):
87 def start(self): 88 """ 89 Start output 90 91 - Init json output if specified 92 - Start queue if enabled 93 """ 94 if self.__json_out: 95 self.__json_out.write("[") 96 self.isStarted = True
Start output
- Init json output if specified
- Start queue if enabled
def
stop(self):
98 def stop(self): 99 """ 100 - Set state as stopped 101 - Close file output 102 - Check if result is as expected 103 """ 104 if self.isStarted: 105 self.isStarted = False 106 self.databases.stop() 107 self.closeFile(self.__json_out) 108 if self.__expected_result: 109 print("Testing result: ") 110 with open(self.__json_out.name, "r+") as js: 111 print( 112 "Result is as expected" 113 if checkFiles(js, self.__expected_result) 114 else "Result is not as expected" 115 )
- Set state as stopped
- Close file output
- Check if result is as expected
def
iteration(self, e):
117 def iteration(self, e): 118 """Iterate over queue to process each bgp element""" 119 # self.redis.xadd() 120 121 if self.verbose: 122 print(json.dumps(e, sort_keys=True, cls=SetEncoder) + "\n", flush=True) 123 if self.json_out: 124 self.json_out.write( 125 json.dumps(e, sort_keys=True, indent=4, cls=SetEncoder) + ",\n" 126 ) 127 self.graph.update(e) 128 self.databases.save(e)
Iterate over queue to process each bgp element
def
closeFile(self, file):
130 def closeFile(self, file): 131 """Close a JSON file 132 133 Args: 134 file (File): The file to close 135 """ 136 if file == sys.stdout or file is None: 137 return 138 file.seek(file.tell() - 1, os.SEEK_SET) 139 file.truncate() 140 file.write("]") 141 file.close()
Close a JSON file
Args: file (File): The file to close