bin.bgpfilter
Contains a whole class and methods that retrieves bgp records filtered by prefixes, as numbers, countries, etc ...
1""" 2Contains a whole class and methods that retrieves bgp records filtered by prefixes, 3 as numbers, countries, etc ... 4""" 5 6__all__ = ["BGPFilter"] 7 8import os 9import re 10import sys 11import time 12import json 13import bgpout 14import datetime 15import ipaddress 16import maxminddb 17import pycountry 18import pybgpstream 19import urllib.request 20from typing import List, Tuple 21 22 23def get_collectors(): 24 """Query the BGPStream broker and identify the collectors that are available""" 25 data = json.load(urllib.request.urlopen(COLLECTORS_URL)) 26 result = dict((x, []) for x in PROJECTS) 27 for coll in data["data"]["collectors"]: 28 p = data["data"]["collectors"][coll]["project"] 29 if p in PROJECTS: 30 result[p].append(coll) 31 return result 32 33 34PROJECT_TYPES = {"ris": "ris-live", "routeviews": "routeviews-stream"} 35PROJECTS = [i for i in PROJECT_TYPES.keys()] 36COLLECTORS_URL = "https://broker.bgpstream.caida.org/v2/meta/collectors" 37COLLECTORS = get_collectors() 38 39 40class BGPFilter: 41 """ 42 BGP stream filter 43 """ 44 45 def __init__(self): 46 self.__isRecord = False 47 self.__start_time = "" 48 self.__end_time = "" 49 self.__asn_filter = None 50 self.__ipversion = "" 51 self.__prefix_filter = None 52 self.__asn_list = None 53 self.__prefix_match_type_filter = None 54 self.__project = PROJECTS[0] 55 self.__collectors = None 56 self.__countries_filter = None 57 self.__data_source = {"source_type": "broker"} 58 self.out: bgpout.BGPOut = None 59 """`bgpout.BGPOut()` instance that will receive all records""" 60 # multiprocessing.set_start_method("fork") 61 # self.queue = faster_fifo.Queue() 62 # self.process = multiprocessing.Process( 63 # target=self.process, args=(self.queue, 64 # ), daemon=True) 65 66 ############### 67 # GETTERS # 68 ############### 69 70 @property 71 def isRecord(self) -> bool: 72 """Retrieve in past or in live mode. 73 see `BGPFilter.record_mode()`""" 74 return self.__isRecord 75 76 @property 77 def start_time(self) -> str: 78 """Start of the interval for record mode. 79 see `BGPFilter.record_mode()`""" 80 return self.__start_time 81 82 @property 83 def end_time(self) -> str: 84 """End of the interval for record mode. 85 see `BGPFilter.record_mode()`""" 86 return self.__end_time 87 88 @property 89 def prefix_filter(self) -> List[str]: 90 """List of prefixes (CIDR format) to filter""" 91 return self.__prefix_filter 92 93 @property 94 def asn_filter(self) -> List[str]: 95 """List of AS numbers""" 96 return self.__asn_list 97 98 @property 99 def collectors(self) -> List[str]: 100 """List of collectors to list""" 101 return self.__collectors 102 103 @property 104 def project(self) -> str: 105 """Accepted project : `ris` or `routeviews`""" 106 return self.__project 107 108 @property 109 def ipversion(self) -> str: 110 """ 111 Formatted string for ip version filtering. 112 113 Accepted versions : `4` or `6` 114 """ 115 return self.__ipversion 116 117 ############### 118 # SETTERS # 119 ############### 120 121 def record_mode(self, isRecord, start, end): 122 """ 123 Define if retrieve from an interval or live stream 124 start and end won't be modified if isRecord is False 125 126 Args: 127 - isRecord (bool) : record or live mode 128 - start (string): Beginning of the interval. 129 130 Timestamp format : YYYY-MM-DD hh:mm:ss -> Example: 2022-01-01 10:00:00 131 132 - end (string): End of the interval. 133 134 Timestamp format : YYYY-MM-DD hh:mm:ss -> Example: 2022-01-01 10:10:00 135 136 Raises: 137 - ValueError: Start > End 138 - ValueError: Start or end not defined 139 - ValueError: Invalid date format 140 """ 141 self.__isRecord = isRecord 142 if isRecord: 143 if start is None or end is None: 144 raise ValueError( 145 "Record mode requires the from_time and until_time arguments" 146 ) 147 try: 148 st = datetime.datetime.strptime(start, "%Y-%m-%d %H:%M:%S") 149 en = datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S") 150 except ValueError: 151 raise ValueError( 152 "Invalid record mode date format. Must be %Y-%m-%d %H:%M:%S" 153 ) 154 if st > en: 155 raise ValueError( 156 "Invalid record mode interval. Beginning must precede the end" 157 ) 158 159 self.__start_time = start 160 self.__end_time = end 161 else: 162 self.__start_time = int(time.time()) 163 self.__end_time = 0 164 165 def data_source(self, record_type: str, file_format: str, file_path: str): 166 """ 167 Use single file as data source 168 169 Args: 170 record_type (str): Type of records stored in the file (rib or upd) 171 file_format (str): Format of the file (mrt, bmp or ris-live) 172 file_path (str): path to archive file 173 174 Raises: 175 FileNotFoundError 176 ValueError 177 """ 178 if not os.path.isfile(file_path): 179 raise FileNotFoundError 180 if record_type not in ["rib", "upd"]: 181 raise ValueError("Input file type must be rib or upd") 182 if record_type == "rib" and file_format in ["mrt", "bmp"]: 183 raise ValueError("Accepted input format types for rib : mrt, bmp") 184 elif record_type == "upd" and file_format not in ["mrt", "bmp", "ris-live"]: 185 raise ValueError( 186 "Accepted input format types for upd : mrt, bmp or ris-live" 187 ) 188 self.__data_source = { 189 "source_type": record_type, 190 "file_format": file_format, 191 "file_path": file_path, 192 } 193 194 @prefix_filter.setter 195 def prefix_filter(self, values: Tuple[List[str], str]): 196 """ 197 Prefix filter option 198 Keep records that match to one of specified prefixes (cidr format). 199 200 Args: 201 prefix_list (List[string]): Format: ip/subnet | Example: 130.0.192.0/21 202 match_type (string): Type of match 203 exact: Exact match 204 less: Exact match or less specific (contained by) 205 more: Exact match or more specific (contains) 206 any: less and more 207 """ 208 try: 209 cidr_list, match_type = values 210 except ValueError: 211 raise ValueError( 212 "match type and prefixes list are required for filtering by prefixes" 213 ) 214 if cidr_list is not None: 215 if len(cidr_list) == 0: 216 # "Please specify one or more prefixes when filtering by prefix" 217 return 218 if match_type not in ["exact", "less", "more", "any"]: 219 raise ValueError( 220 "Match type must be specified and one of ['exact', 'less', 'more'," 221 " 'any']" 222 ) 223 for c in cidr_list: 224 ipaddress.ip_network(c) 225 self.__prefix_match_type_filter = match_type 226 self.__prefix_filter = cidr_list 227 228 @asn_filter.setter 229 def asn_filter(self, asn_list): 230 """Filter using specified AS number list 231 Skip a record if its source-AS is not one of specified AS numbers 232 Use _ symbol for negation 233 234 Args: 235 asn_list (list): List of AS numbers 236 """ 237 self.__asn_filter = "" 238 f_list = [] 239 not_f_list = [] 240 if asn_list is not None and len(asn_list) >= 1: 241 self.__asn_list = [] 242 for i in asn_list: 243 if re.match("_[0-9]+", i): 244 not_f_list.append(i.replace("_", "")) 245 self.__asn_list.append(i) 246 elif re.match("[0-9]+", i): 247 f_list.append(i) 248 self.__asn_list.append(i) 249 250 if len(f_list) >= 1: 251 self.__asn_filter += " and path (_" + "|_".join(f_list) + ")$" 252 if len(not_f_list) >= 1: 253 self.__asn_filter = " and path !(_" + "|_".join(not_f_list) + ")$" 254 255 @collectors.setter 256 def collectors(self, collectors): 257 if collectors is not None: 258 for c in collectors: 259 if c not in COLLECTORS[self.__project]: 260 raise ValueError("Collector isn't available or isn't valid.") 261 self.__collectors = collectors 262 263 @project.setter 264 def project(self, project: str): 265 """ 266 Args: 267 project (string): ris or routesviews 268 """ 269 if self.__collectors != project and project in PROJECT_TYPES.keys(): 270 self.__project = project 271 self.__collectors = None 272 else: 273 raise ValueError( 274 f"Invalid project name. Valid projects list : {PROJECT_TYPES.keys()}" 275 ) 276 277 @ipversion.setter 278 def ipversion(self, version): 279 """Set string for filter field 280 281 Args: 282 version (Integer): Possible values ["4" or "6"] 283 """ 284 self.__ipversion = " and ipversion " + version if version in ["4", "6"] else "" 285 286 ############### 287 # COUNTRY # 288 ############### 289 290 @property 291 def country_file(self) -> str: 292 """Path to the Geo Open MaxMindDB File""" 293 return self.__f_country_path 294 295 @property 296 def countries_filter(self) -> List[str]: 297 """ 298 List of country codes to filter 299 300 Filter using specified country. 301 Keep records that the origin of their prefix is contained in country_list. 302 303 Args: 304 country_list (List[str]): List of country codes 305 306 Raises: 307 LookupError: If an element in country_list is not valid. 308 """ 309 return self.__countries_filter 310 311 @countries_filter.setter 312 def countries_filter(self, country_list: List[str]): 313 """ 314 Filter using specified country. 315 Keep records that the origin of their prefix is contained in country_list. 316 317 Args: 318 country_list (List[str]): List of country codes 319 320 Raises: 321 LookupError: If an element in country_list is not valid. 322 """ 323 if country_list is not None: 324 for c in country_list: 325 pycountry.countries.lookup(c) 326 self.__countries_filter = country_list 327 328 @country_file.setter 329 def country_file(self, country_file_path: str): 330 """ 331 Setter for the GeoOpen country mmdb file 332 333 Args: 334 country_file_path (String): Path to Geo Open MaxMindDB File 335 """ 336 if not os.path.isfile(country_file_path): 337 raise FileNotFoundError 338 self.__f_country_path = country_file_path 339 340 self.__f_country = maxminddb.open_database( 341 self.__f_country_path, maxminddb.MODE_MMAP_EXT 342 ) 343 print(f"Loaded Geo Open database : {self.__f_country_path}") 344 345 def __check_country(self, e): 346 """ 347 Args: 348 e (bgp dict) 349 350 351 Returns: 352 boolean: if e.["country code"] is in self.__countries_filter list 353 """ 354 return not ( 355 self.__countries_filter is not None 356 and e["country_code"] not in self.__countries_filter 357 ) 358 359 def __country_by_prefix(self, p): 360 """ 361 Parameters: 362 p (prefix): CIDR format. Example: 130.0.192.0/21 363 364 Returns: 365 string: country code of the given prefix. 366 None if not found in GeoOpen database 367 """ 368 if p is None: 369 return None 370 else: 371 r = self.__f_country.get(p.split("/", 1)[0]) 372 return None if r is None else r["country"]["iso_code"] 373 374 #################### 375 # PUBLIC FUNCTIONS # 376 #################### 377 378 def _build_stream(self): 379 """Build the stream with the used filters 380 381 Returns: 382 (BGPStream) 383 """ 384 self._stream = pybgpstream.BGPStream( 385 from_time=self.start_time, 386 until_time=self.end_time, 387 data_interface=( 388 "broker" 389 if self.__data_source["source_type"] == "broker" 390 else "singlefile" 391 ), 392 record_type="updates", 393 filter="elemtype announcements withdrawals" 394 + self.__asn_filter 395 + self.__ipversion, 396 ) 397 398 if self.__prefix_match_type_filter is not None: 399 self._stream._maybe_add_filter( 400 "prefix-" + self.__prefix_match_type_filter, None, self.__prefix_filter 401 ) 402 403 if self.__data_source["source_type"] != "broker": 404 self._stream.set_data_interface_option( 405 "singlefile", 406 self.__data_source["source_type"] + "-file", 407 self.__data_source["file_path"], 408 ) 409 self._stream.set_data_interface_option( 410 "singlefile", 411 self.__data_source["source_type"] + "-type", 412 self.__data_source["file_format"], 413 ) 414 else: 415 project = ( 416 self.__project if self.__isRecord else PROJECT_TYPES[self.__project] 417 ) 418 self._stream._maybe_add_filter("project", project, None) 419 self._stream._maybe_add_filter("collector", None, self.__collectors) 420 421 if self.__isRecord: 422 self._stream.stream.set_live_mode() 423 return self._stream 424 425 def cpt_update(self): 426 """For debugging. Print time each 100000 elem""" 427 if not hasattr(self, "timer"): 428 self.timer = {"cpt": 0, "tmp": None, "ft": time.time(), "ot": time.time()} 429 self.timer["cpt"] += 1 430 if self.timer["cpt"] % 100000 == 0: 431 self.timer["nt"] = time.time() 432 print( 433 f'Counter : {self.timer["cpt"]} ' 434 f'- Time {self.timer["nt"] - self.timer["ft"]} ' 435 f'- {self.timer["nt"] - self.timer["ot"]}s', 436 file=sys.stderr, 437 ) 438 # sys.stderr.write(f"Queue size : {self.queue.qsize()}") 439 self.timer["ot"] = self.timer["nt"] 440 441 def start(self): 442 """ 443 Start retrieving stream/records and filtering them 444 - Download and load Geo-Open database 445 - Build Stream 446 - Send messages to bgpout.py 447 """ 448 449 self.out.start() 450 print("Loading stream...") 451 self._build_stream() 452 print("Starting") 453 self.isStarted = True 454 455 for e in self._stream: 456 self.cpt_update() 457 msg = { 458 "type": e.type, 459 "time": e.time, 460 "peer_address": e.peer_address, 461 "peer_asn": e.peer_asn, 462 "collector": e.collector, 463 "project": e.project, 464 "router": e.router, 465 "router_ip": e.router_ip, 466 } 467 msg |= e.fields 468 msg["country_code"] = self.__country_by_prefix(msg["prefix"]) 469 msg["source"] = msg["as-path"].split()[-1] if "as-path" in msg else None 470 471 if self.__check_country(msg): 472 self.out.iteration(msg) 473 474 def stop(self): 475 """ 476 Close output (JSON, Databases, etc) and stop BGPStream 477 """ 478 print("Stream ended") 479 self.out.stop() 480 exit(0)
41class BGPFilter: 42 """ 43 BGP stream filter 44 """ 45 46 def __init__(self): 47 self.__isRecord = False 48 self.__start_time = "" 49 self.__end_time = "" 50 self.__asn_filter = None 51 self.__ipversion = "" 52 self.__prefix_filter = None 53 self.__asn_list = None 54 self.__prefix_match_type_filter = None 55 self.__project = PROJECTS[0] 56 self.__collectors = None 57 self.__countries_filter = None 58 self.__data_source = {"source_type": "broker"} 59 self.out: bgpout.BGPOut = None 60 """`bgpout.BGPOut()` instance that will receive all records""" 61 # multiprocessing.set_start_method("fork") 62 # self.queue = faster_fifo.Queue() 63 # self.process = multiprocessing.Process( 64 # target=self.process, args=(self.queue, 65 # ), daemon=True) 66 67 ############### 68 # GETTERS # 69 ############### 70 71 @property 72 def isRecord(self) -> bool: 73 """Retrieve in past or in live mode. 74 see `BGPFilter.record_mode()`""" 75 return self.__isRecord 76 77 @property 78 def start_time(self) -> str: 79 """Start of the interval for record mode. 80 see `BGPFilter.record_mode()`""" 81 return self.__start_time 82 83 @property 84 def end_time(self) -> str: 85 """End of the interval for record mode. 86 see `BGPFilter.record_mode()`""" 87 return self.__end_time 88 89 @property 90 def prefix_filter(self) -> List[str]: 91 """List of prefixes (CIDR format) to filter""" 92 return self.__prefix_filter 93 94 @property 95 def asn_filter(self) -> List[str]: 96 """List of AS numbers""" 97 return self.__asn_list 98 99 @property 100 def collectors(self) -> List[str]: 101 """List of collectors to list""" 102 return self.__collectors 103 104 @property 105 def project(self) -> str: 106 """Accepted project : `ris` or `routeviews`""" 107 return self.__project 108 109 @property 110 def ipversion(self) -> str: 111 """ 112 Formatted string for ip version filtering. 113 114 Accepted versions : `4` or `6` 115 """ 116 return self.__ipversion 117 118 ############### 119 # SETTERS # 120 ############### 121 122 def record_mode(self, isRecord, start, end): 123 """ 124 Define if retrieve from an interval or live stream 125 start and end won't be modified if isRecord is False 126 127 Args: 128 - isRecord (bool) : record or live mode 129 - start (string): Beginning of the interval. 130 131 Timestamp format : YYYY-MM-DD hh:mm:ss -> Example: 2022-01-01 10:00:00 132 133 - end (string): End of the interval. 134 135 Timestamp format : YYYY-MM-DD hh:mm:ss -> Example: 2022-01-01 10:10:00 136 137 Raises: 138 - ValueError: Start > End 139 - ValueError: Start or end not defined 140 - ValueError: Invalid date format 141 """ 142 self.__isRecord = isRecord 143 if isRecord: 144 if start is None or end is None: 145 raise ValueError( 146 "Record mode requires the from_time and until_time arguments" 147 ) 148 try: 149 st = datetime.datetime.strptime(start, "%Y-%m-%d %H:%M:%S") 150 en = datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S") 151 except ValueError: 152 raise ValueError( 153 "Invalid record mode date format. Must be %Y-%m-%d %H:%M:%S" 154 ) 155 if st > en: 156 raise ValueError( 157 "Invalid record mode interval. Beginning must precede the end" 158 ) 159 160 self.__start_time = start 161 self.__end_time = end 162 else: 163 self.__start_time = int(time.time()) 164 self.__end_time = 0 165 166 def data_source(self, record_type: str, file_format: str, file_path: str): 167 """ 168 Use single file as data source 169 170 Args: 171 record_type (str): Type of records stored in the file (rib or upd) 172 file_format (str): Format of the file (mrt, bmp or ris-live) 173 file_path (str): path to archive file 174 175 Raises: 176 FileNotFoundError 177 ValueError 178 """ 179 if not os.path.isfile(file_path): 180 raise FileNotFoundError 181 if record_type not in ["rib", "upd"]: 182 raise ValueError("Input file type must be rib or upd") 183 if record_type == "rib" and file_format in ["mrt", "bmp"]: 184 raise ValueError("Accepted input format types for rib : mrt, bmp") 185 elif record_type == "upd" and file_format not in ["mrt", "bmp", "ris-live"]: 186 raise ValueError( 187 "Accepted input format types for upd : mrt, bmp or ris-live" 188 ) 189 self.__data_source = { 190 "source_type": record_type, 191 "file_format": file_format, 192 "file_path": file_path, 193 } 194 195 @prefix_filter.setter 196 def prefix_filter(self, values: Tuple[List[str], str]): 197 """ 198 Prefix filter option 199 Keep records that match to one of specified prefixes (cidr format). 200 201 Args: 202 prefix_list (List[string]): Format: ip/subnet | Example: 130.0.192.0/21 203 match_type (string): Type of match 204 exact: Exact match 205 less: Exact match or less specific (contained by) 206 more: Exact match or more specific (contains) 207 any: less and more 208 """ 209 try: 210 cidr_list, match_type = values 211 except ValueError: 212 raise ValueError( 213 "match type and prefixes list are required for filtering by prefixes" 214 ) 215 if cidr_list is not None: 216 if len(cidr_list) == 0: 217 # "Please specify one or more prefixes when filtering by prefix" 218 return 219 if match_type not in ["exact", "less", "more", "any"]: 220 raise ValueError( 221 "Match type must be specified and one of ['exact', 'less', 'more'," 222 " 'any']" 223 ) 224 for c in cidr_list: 225 ipaddress.ip_network(c) 226 self.__prefix_match_type_filter = match_type 227 self.__prefix_filter = cidr_list 228 229 @asn_filter.setter 230 def asn_filter(self, asn_list): 231 """Filter using specified AS number list 232 Skip a record if its source-AS is not one of specified AS numbers 233 Use _ symbol for negation 234 235 Args: 236 asn_list (list): List of AS numbers 237 """ 238 self.__asn_filter = "" 239 f_list = [] 240 not_f_list = [] 241 if asn_list is not None and len(asn_list) >= 1: 242 self.__asn_list = [] 243 for i in asn_list: 244 if re.match("_[0-9]+", i): 245 not_f_list.append(i.replace("_", "")) 246 self.__asn_list.append(i) 247 elif re.match("[0-9]+", i): 248 f_list.append(i) 249 self.__asn_list.append(i) 250 251 if len(f_list) >= 1: 252 self.__asn_filter += " and path (_" + "|_".join(f_list) + ")$" 253 if len(not_f_list) >= 1: 254 self.__asn_filter = " and path !(_" + "|_".join(not_f_list) + ")$" 255 256 @collectors.setter 257 def collectors(self, collectors): 258 if collectors is not None: 259 for c in collectors: 260 if c not in COLLECTORS[self.__project]: 261 raise ValueError("Collector isn't available or isn't valid.") 262 self.__collectors = collectors 263 264 @project.setter 265 def project(self, project: str): 266 """ 267 Args: 268 project (string): ris or routesviews 269 """ 270 if self.__collectors != project and project in PROJECT_TYPES.keys(): 271 self.__project = project 272 self.__collectors = None 273 else: 274 raise ValueError( 275 f"Invalid project name. Valid projects list : {PROJECT_TYPES.keys()}" 276 ) 277 278 @ipversion.setter 279 def ipversion(self, version): 280 """Set string for filter field 281 282 Args: 283 version (Integer): Possible values ["4" or "6"] 284 """ 285 self.__ipversion = " and ipversion " + version if version in ["4", "6"] else "" 286 287 ############### 288 # COUNTRY # 289 ############### 290 291 @property 292 def country_file(self) -> str: 293 """Path to the Geo Open MaxMindDB File""" 294 return self.__f_country_path 295 296 @property 297 def countries_filter(self) -> List[str]: 298 """ 299 List of country codes to filter 300 301 Filter using specified country. 302 Keep records that the origin of their prefix is contained in country_list. 303 304 Args: 305 country_list (List[str]): List of country codes 306 307 Raises: 308 LookupError: If an element in country_list is not valid. 309 """ 310 return self.__countries_filter 311 312 @countries_filter.setter 313 def countries_filter(self, country_list: List[str]): 314 """ 315 Filter using specified country. 316 Keep records that the origin of their prefix is contained in country_list. 317 318 Args: 319 country_list (List[str]): List of country codes 320 321 Raises: 322 LookupError: If an element in country_list is not valid. 323 """ 324 if country_list is not None: 325 for c in country_list: 326 pycountry.countries.lookup(c) 327 self.__countries_filter = country_list 328 329 @country_file.setter 330 def country_file(self, country_file_path: str): 331 """ 332 Setter for the GeoOpen country mmdb file 333 334 Args: 335 country_file_path (String): Path to Geo Open MaxMindDB File 336 """ 337 if not os.path.isfile(country_file_path): 338 raise FileNotFoundError 339 self.__f_country_path = country_file_path 340 341 self.__f_country = maxminddb.open_database( 342 self.__f_country_path, maxminddb.MODE_MMAP_EXT 343 ) 344 print(f"Loaded Geo Open database : {self.__f_country_path}") 345 346 def __check_country(self, e): 347 """ 348 Args: 349 e (bgp dict) 350 351 352 Returns: 353 boolean: if e.["country code"] is in self.__countries_filter list 354 """ 355 return not ( 356 self.__countries_filter is not None 357 and e["country_code"] not in self.__countries_filter 358 ) 359 360 def __country_by_prefix(self, p): 361 """ 362 Parameters: 363 p (prefix): CIDR format. Example: 130.0.192.0/21 364 365 Returns: 366 string: country code of the given prefix. 367 None if not found in GeoOpen database 368 """ 369 if p is None: 370 return None 371 else: 372 r = self.__f_country.get(p.split("/", 1)[0]) 373 return None if r is None else r["country"]["iso_code"] 374 375 #################### 376 # PUBLIC FUNCTIONS # 377 #################### 378 379 def _build_stream(self): 380 """Build the stream with the used filters 381 382 Returns: 383 (BGPStream) 384 """ 385 self._stream = pybgpstream.BGPStream( 386 from_time=self.start_time, 387 until_time=self.end_time, 388 data_interface=( 389 "broker" 390 if self.__data_source["source_type"] == "broker" 391 else "singlefile" 392 ), 393 record_type="updates", 394 filter="elemtype announcements withdrawals" 395 + self.__asn_filter 396 + self.__ipversion, 397 ) 398 399 if self.__prefix_match_type_filter is not None: 400 self._stream._maybe_add_filter( 401 "prefix-" + self.__prefix_match_type_filter, None, self.__prefix_filter 402 ) 403 404 if self.__data_source["source_type"] != "broker": 405 self._stream.set_data_interface_option( 406 "singlefile", 407 self.__data_source["source_type"] + "-file", 408 self.__data_source["file_path"], 409 ) 410 self._stream.set_data_interface_option( 411 "singlefile", 412 self.__data_source["source_type"] + "-type", 413 self.__data_source["file_format"], 414 ) 415 else: 416 project = ( 417 self.__project if self.__isRecord else PROJECT_TYPES[self.__project] 418 ) 419 self._stream._maybe_add_filter("project", project, None) 420 self._stream._maybe_add_filter("collector", None, self.__collectors) 421 422 if self.__isRecord: 423 self._stream.stream.set_live_mode() 424 return self._stream 425 426 def cpt_update(self): 427 """For debugging. Print time each 100000 elem""" 428 if not hasattr(self, "timer"): 429 self.timer = {"cpt": 0, "tmp": None, "ft": time.time(), "ot": time.time()} 430 self.timer["cpt"] += 1 431 if self.timer["cpt"] % 100000 == 0: 432 self.timer["nt"] = time.time() 433 print( 434 f'Counter : {self.timer["cpt"]} ' 435 f'- Time {self.timer["nt"] - self.timer["ft"]} ' 436 f'- {self.timer["nt"] - self.timer["ot"]}s', 437 file=sys.stderr, 438 ) 439 # sys.stderr.write(f"Queue size : {self.queue.qsize()}") 440 self.timer["ot"] = self.timer["nt"] 441 442 def start(self): 443 """ 444 Start retrieving stream/records and filtering them 445 - Download and load Geo-Open database 446 - Build Stream 447 - Send messages to bgpout.py 448 """ 449 450 self.out.start() 451 print("Loading stream...") 452 self._build_stream() 453 print("Starting") 454 self.isStarted = True 455 456 for e in self._stream: 457 self.cpt_update() 458 msg = { 459 "type": e.type, 460 "time": e.time, 461 "peer_address": e.peer_address, 462 "peer_asn": e.peer_asn, 463 "collector": e.collector, 464 "project": e.project, 465 "router": e.router, 466 "router_ip": e.router_ip, 467 } 468 msg |= e.fields 469 msg["country_code"] = self.__country_by_prefix(msg["prefix"]) 470 msg["source"] = msg["as-path"].split()[-1] if "as-path" in msg else None 471 472 if self.__check_country(msg): 473 self.out.iteration(msg) 474 475 def stop(self): 476 """ 477 Close output (JSON, Databases, etc) and stop BGPStream 478 """ 479 print("Stream ended") 480 self.out.stop() 481 exit(0)
BGP stream filter
Prefix filter option Keep records that match to one of specified prefixes (cidr format).
Args: prefix_list (List[string]): Format: ip/subnet | Example: 130.0.192.0/21 match_type (string): Type of match exact: Exact match less: Exact match or less specific (contained by) more: Exact match or more specific (contains) any: less and more
Filter using specified AS number list Skip a record if its source-AS is not one of specified AS numbers Use _ symbol for negation
Args: asn_list (list): List of AS numbers
122 def record_mode(self, isRecord, start, end): 123 """ 124 Define if retrieve from an interval or live stream 125 start and end won't be modified if isRecord is False 126 127 Args: 128 - isRecord (bool) : record or live mode 129 - start (string): Beginning of the interval. 130 131 Timestamp format : YYYY-MM-DD hh:mm:ss -> Example: 2022-01-01 10:00:00 132 133 - end (string): End of the interval. 134 135 Timestamp format : YYYY-MM-DD hh:mm:ss -> Example: 2022-01-01 10:10:00 136 137 Raises: 138 - ValueError: Start > End 139 - ValueError: Start or end not defined 140 - ValueError: Invalid date format 141 """ 142 self.__isRecord = isRecord 143 if isRecord: 144 if start is None or end is None: 145 raise ValueError( 146 "Record mode requires the from_time and until_time arguments" 147 ) 148 try: 149 st = datetime.datetime.strptime(start, "%Y-%m-%d %H:%M:%S") 150 en = datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S") 151 except ValueError: 152 raise ValueError( 153 "Invalid record mode date format. Must be %Y-%m-%d %H:%M:%S" 154 ) 155 if st > en: 156 raise ValueError( 157 "Invalid record mode interval. Beginning must precede the end" 158 ) 159 160 self.__start_time = start 161 self.__end_time = end 162 else: 163 self.__start_time = int(time.time()) 164 self.__end_time = 0
Define if retrieve from an interval or live stream start and end won't be modified if isRecord is False
Args: - isRecord (bool) : record or live mode - start (string): Beginning of the interval.
Timestamp format : YYYY-MM-DD hh:mm:ss -> Example: 2022-01-01 10:00:00
- end (string): End of the interval.
Timestamp format : YYYY-MM-DD hh:mm:ss -> Example: 2022-01-01 10:10:00
Raises: - ValueError: Start > End - ValueError: Start or end not defined - ValueError: Invalid date format
166 def data_source(self, record_type: str, file_format: str, file_path: str): 167 """ 168 Use single file as data source 169 170 Args: 171 record_type (str): Type of records stored in the file (rib or upd) 172 file_format (str): Format of the file (mrt, bmp or ris-live) 173 file_path (str): path to archive file 174 175 Raises: 176 FileNotFoundError 177 ValueError 178 """ 179 if not os.path.isfile(file_path): 180 raise FileNotFoundError 181 if record_type not in ["rib", "upd"]: 182 raise ValueError("Input file type must be rib or upd") 183 if record_type == "rib" and file_format in ["mrt", "bmp"]: 184 raise ValueError("Accepted input format types for rib : mrt, bmp") 185 elif record_type == "upd" and file_format not in ["mrt", "bmp", "ris-live"]: 186 raise ValueError( 187 "Accepted input format types for upd : mrt, bmp or ris-live" 188 ) 189 self.__data_source = { 190 "source_type": record_type, 191 "file_format": file_format, 192 "file_path": file_path, 193 }
Use single file as data source
Args: record_type (str): Type of records stored in the file (rib or upd) file_format (str): Format of the file (mrt, bmp or ris-live) file_path (str): path to archive file
Raises: FileNotFoundError ValueError
Setter for the GeoOpen country mmdb file
Args: country_file_path (String): Path to Geo Open MaxMindDB File
Filter using specified country. Keep records that the origin of their prefix is contained in country_list.
Args: country_list (List[str]): List of country codes
Raises: LookupError: If an element in country_list is not valid.
426 def cpt_update(self): 427 """For debugging. Print time each 100000 elem""" 428 if not hasattr(self, "timer"): 429 self.timer = {"cpt": 0, "tmp": None, "ft": time.time(), "ot": time.time()} 430 self.timer["cpt"] += 1 431 if self.timer["cpt"] % 100000 == 0: 432 self.timer["nt"] = time.time() 433 print( 434 f'Counter : {self.timer["cpt"]} ' 435 f'- Time {self.timer["nt"] - self.timer["ft"]} ' 436 f'- {self.timer["nt"] - self.timer["ot"]}s', 437 file=sys.stderr, 438 ) 439 # sys.stderr.write(f"Queue size : {self.queue.qsize()}") 440 self.timer["ot"] = self.timer["nt"]
For debugging. Print time each 100000 elem
442 def start(self): 443 """ 444 Start retrieving stream/records and filtering them 445 - Download and load Geo-Open database 446 - Build Stream 447 - Send messages to bgpout.py 448 """ 449 450 self.out.start() 451 print("Loading stream...") 452 self._build_stream() 453 print("Starting") 454 self.isStarted = True 455 456 for e in self._stream: 457 self.cpt_update() 458 msg = { 459 "type": e.type, 460 "time": e.time, 461 "peer_address": e.peer_address, 462 "peer_asn": e.peer_asn, 463 "collector": e.collector, 464 "project": e.project, 465 "router": e.router, 466 "router_ip": e.router_ip, 467 } 468 msg |= e.fields 469 msg["country_code"] = self.__country_by_prefix(msg["prefix"]) 470 msg["source"] = msg["as-path"].split()[-1] if "as-path" in msg else None 471 472 if self.__check_country(msg): 473 self.out.iteration(msg)
Start retrieving stream/records and filtering them
- Download and load Geo-Open database
- Build Stream
- Send messages to bgpout.py