Edit on GitHub

bin.bgpfilter

Contains a whole class and methods that retrieves bgp records filtered by prefixes, as numbers, countries, etc ...

  1"""
  2Contains a whole class and methods that retrieves bgp records filtered by prefixes,
  3 as numbers, countries, etc ...
  4"""
  5
  6__all__ = ["BGPFilter"]
  7
  8import os
  9import re
 10import sys
 11import time
 12import json
 13import bgpout
 14import datetime
 15import ipaddress
 16import maxminddb
 17import pycountry
 18import pybgpstream
 19import urllib.request
 20from typing import List, Tuple
 21
 22
 23def get_collectors():
 24    """Query the BGPStream broker and identify the collectors that are available"""
 25    data = json.load(urllib.request.urlopen(COLLECTORS_URL))
 26    result = dict((x, []) for x in PROJECTS)
 27    for coll in data["data"]["collectors"]:
 28        p = data["data"]["collectors"][coll]["project"]
 29        if p in PROJECTS:
 30            result[p].append(coll)
 31    return result
 32
 33
 34PROJECT_TYPES = {"ris": "ris-live", "routeviews": "routeviews-stream"}
 35PROJECTS = [i for i in PROJECT_TYPES.keys()]
 36COLLECTORS_URL = "https://broker.bgpstream.caida.org/v2/meta/collectors"
 37COLLECTORS = get_collectors()
 38
 39
 40class BGPFilter:
 41    """
 42    BGP stream filter
 43    """
 44
 45    def __init__(self):
 46        self.__isRecord = False
 47        self.__start_time = ""
 48        self.__end_time = ""
 49        self.__asn_filter = None
 50        self.__ipversion = ""
 51        self.__prefix_filter = None
 52        self.__asn_list = None
 53        self.__prefix_match_type_filter = None
 54        self.__project = PROJECTS[0]
 55        self.__collectors = None
 56        self.__countries_filter = None
 57        self.__data_source = {"source_type": "broker"}
 58        self.out: bgpout.BGPOut = None
 59        """`bgpout.BGPOut()` instance that will receive all records"""
 60        # multiprocessing.set_start_method("fork")
 61        # self.queue = faster_fifo.Queue()
 62        # self.process = multiprocessing.Process(
 63        # target=self.process, args=(self.queue,
 64        # ), daemon=True)
 65
 66    ###############
 67    #   GETTERS   #
 68    ###############
 69
 70    @property
 71    def isRecord(self) -> bool:
 72        """Retrieve in past or in live mode.
 73        see `BGPFilter.record_mode()`"""
 74        return self.__isRecord
 75
 76    @property
 77    def start_time(self) -> str:
 78        """Start of the interval for record mode.
 79        see `BGPFilter.record_mode()`"""
 80        return self.__start_time
 81
 82    @property
 83    def end_time(self) -> str:
 84        """End of the interval for record mode.
 85        see `BGPFilter.record_mode()`"""
 86        return self.__end_time
 87
 88    @property
 89    def prefix_filter(self) -> List[str]:
 90        """List of prefixes (CIDR format) to filter"""
 91        return self.__prefix_filter
 92
 93    @property
 94    def asn_filter(self) -> List[str]:
 95        """List of AS numbers"""
 96        return self.__asn_list
 97
 98    @property
 99    def collectors(self) -> List[str]:
100        """List of collectors to list"""
101        return self.__collectors
102
103    @property
104    def project(self) -> str:
105        """Accepted project : `ris` or `routeviews`"""
106        return self.__project
107
108    @property
109    def ipversion(self) -> str:
110        """
111        Formatted string for ip version filtering.
112
113        Accepted versions : `4` or `6`
114        """
115        return self.__ipversion
116
117    ###############
118    #   SETTERS   #
119    ###############
120
121    def record_mode(self, isRecord, start, end):
122        """
123        Define if retrieve from an interval or live stream
124            start and end won't be modified if isRecord is False
125
126        Args:
127            - isRecord (bool) : record or live mode
128            - start (string): Beginning of the interval.
129
130                Timestamp format : YYYY-MM-DD hh:mm:ss -> Example: 2022-01-01 10:00:00
131
132            - end (string): End of the interval.
133
134                Timestamp format : YYYY-MM-DD hh:mm:ss -> Example: 2022-01-01 10:10:00
135
136        Raises:
137            - ValueError: Start > End
138            - ValueError: Start or end not defined
139            - ValueError: Invalid date format
140        """
141        self.__isRecord = isRecord
142        if isRecord:
143            if start is None or end is None:
144                raise ValueError(
145                    "Record mode requires the from_time and until_time arguments"
146                )
147            try:
148                st = datetime.datetime.strptime(start, "%Y-%m-%d %H:%M:%S")
149                en = datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S")
150            except ValueError:
151                raise ValueError(
152                    "Invalid record mode date format. Must be %Y-%m-%d %H:%M:%S"
153                )
154            if st > en:
155                raise ValueError(
156                    "Invalid record mode interval. Beginning must precede the end"
157                )
158
159            self.__start_time = start
160            self.__end_time = end
161        else:
162            self.__start_time = int(time.time())
163            self.__end_time = 0
164
165    def data_source(self, record_type: str, file_format: str, file_path: str):
166        """
167        Use single file as data source
168
169        Args:
170            record_type (str): Type of records stored in the file (rib or upd)
171            file_format (str): Format of the file (mrt, bmp or ris-live)
172            file_path (str): path to archive file
173
174        Raises:
175            FileNotFoundError
176            ValueError
177        """
178        if not os.path.isfile(file_path):
179            raise FileNotFoundError
180        if record_type not in ["rib", "upd"]:
181            raise ValueError("Input file type must be rib or upd")
182        if record_type == "rib" and file_format in ["mrt", "bmp"]:
183            raise ValueError("Accepted input format types for rib : mrt, bmp")
184        elif record_type == "upd" and file_format not in ["mrt", "bmp", "ris-live"]:
185            raise ValueError(
186                "Accepted input format types for upd : mrt, bmp or ris-live"
187            )
188        self.__data_source = {
189            "source_type": record_type,
190            "file_format": file_format,
191            "file_path": file_path,
192        }
193
194    @prefix_filter.setter
195    def prefix_filter(self, values: Tuple[List[str], str]):
196        """
197        Prefix filter option
198            Keep records that match to one of specified prefixes (cidr format).
199
200        Args:
201            prefix_list (List[string]):  Format: ip/subnet | Example: 130.0.192.0/21
202            match_type (string): Type of match
203                exact: Exact match
204                less: Exact match or less specific (contained by)
205                more: Exact match or more specific (contains)
206                any: less and more
207        """
208        try:
209            cidr_list, match_type = values
210        except ValueError:
211            raise ValueError(
212                "match type and prefixes list are required for filtering by prefixes"
213            )
214        if cidr_list is not None:
215            if len(cidr_list) == 0:
216                # "Please specify one or more prefixes when filtering by prefix"
217                return
218            if match_type not in ["exact", "less", "more", "any"]:
219                raise ValueError(
220                    "Match type must be specified and one of ['exact', 'less', 'more',"
221                    " 'any']"
222                )
223            for c in cidr_list:
224                ipaddress.ip_network(c)
225            self.__prefix_match_type_filter = match_type
226            self.__prefix_filter = cidr_list
227
228    @asn_filter.setter
229    def asn_filter(self, asn_list):
230        """Filter using specified AS number list
231            Skip a record if its source-AS is not one of specified AS numbers
232            Use _ symbol for negation
233
234        Args:
235            asn_list (list): List of AS numbers
236        """
237        self.__asn_filter = ""
238        f_list = []
239        not_f_list = []
240        if asn_list is not None and len(asn_list) >= 1:
241            self.__asn_list = []
242            for i in asn_list:
243                if re.match("_[0-9]+", i):
244                    not_f_list.append(i.replace("_", ""))
245                    self.__asn_list.append(i)
246                elif re.match("[0-9]+", i):
247                    f_list.append(i)
248                    self.__asn_list.append(i)
249
250            if len(f_list) >= 1:
251                self.__asn_filter += " and path (_" + "|_".join(f_list) + ")$"
252            if len(not_f_list) >= 1:
253                self.__asn_filter = " and path !(_" + "|_".join(not_f_list) + ")$"
254
255    @collectors.setter
256    def collectors(self, collectors):
257        if collectors is not None:
258            for c in collectors:
259                if c not in COLLECTORS[self.__project]:
260                    raise ValueError("Collector isn't available or isn't valid.")
261            self.__collectors = collectors
262
263    @project.setter
264    def project(self, project: str):
265        """
266        Args:
267            project (string): ris or routesviews
268        """
269        if self.__collectors != project and project in PROJECT_TYPES.keys():
270            self.__project = project
271            self.__collectors = None
272        else:
273            raise ValueError(
274                f"Invalid project name. Valid projects list : {PROJECT_TYPES.keys()}"
275            )
276
277    @ipversion.setter
278    def ipversion(self, version):
279        """Set string for filter field
280
281        Args:
282            version (Integer): Possible values ["4" or "6"]
283        """
284        self.__ipversion = " and ipversion " + version if version in ["4", "6"] else ""
285
286    ###############
287    #   COUNTRY   #
288    ###############
289
290    @property
291    def country_file(self) -> str:
292        """Path to the Geo Open MaxMindDB File"""
293        return self.__f_country_path
294
295    @property
296    def countries_filter(self) -> List[str]:
297        """
298        List of country codes to filter
299
300        Filter using specified country.
301            Keep records that the origin of their prefix is contained in country_list.
302
303        Args:
304            country_list (List[str]): List of country codes
305
306        Raises:
307            LookupError: If an element in country_list is not valid.
308        """
309        return self.__countries_filter
310
311    @countries_filter.setter
312    def countries_filter(self, country_list: List[str]):
313        """
314        Filter using specified country.
315            Keep records that the origin of their prefix is contained in country_list.
316
317        Args:
318            country_list (List[str]): List of country codes
319
320        Raises:
321            LookupError: If an element in country_list is not valid.
322        """
323        if country_list is not None:
324            for c in country_list:
325                pycountry.countries.lookup(c)
326        self.__countries_filter = country_list
327
328    @country_file.setter
329    def country_file(self, country_file_path: str):
330        """
331        Setter for the GeoOpen country mmdb file
332
333        Args:
334            country_file_path (String): Path to Geo Open MaxMindDB File
335        """
336        if not os.path.isfile(country_file_path):
337            raise FileNotFoundError
338        self.__f_country_path = country_file_path
339
340        self.__f_country = maxminddb.open_database(
341            self.__f_country_path, maxminddb.MODE_MMAP_EXT
342        )
343        print(f"Loaded Geo Open database : {self.__f_country_path}")
344
345    def __check_country(self, e):
346        """
347        Args:
348            e (bgp dict)
349
350
351        Returns:
352            boolean: if e.["country code"] is in self.__countries_filter list
353        """
354        return not (
355            self.__countries_filter is not None
356            and e["country_code"] not in self.__countries_filter
357        )
358
359    def __country_by_prefix(self, p):
360        """
361        Parameters:
362            p (prefix): CIDR format. Example: 130.0.192.0/21
363
364        Returns:
365            string: country code of the given prefix.
366                None if not found in GeoOpen database
367        """
368        if p is None:
369            return None
370        else:
371            r = self.__f_country.get(p.split("/", 1)[0])
372            return None if r is None else r["country"]["iso_code"]
373
374    ####################
375    # PUBLIC FUNCTIONS #
376    ####################
377
378    def _build_stream(self):
379        """Build the stream with the used filters
380
381        Returns:
382            (BGPStream)
383        """
384        self._stream = pybgpstream.BGPStream(
385            from_time=self.start_time,
386            until_time=self.end_time,
387            data_interface=(
388                "broker"
389                if self.__data_source["source_type"] == "broker"
390                else "singlefile"
391            ),
392            record_type="updates",
393            filter="elemtype announcements withdrawals"
394            + self.__asn_filter
395            + self.__ipversion,
396        )
397
398        if self.__prefix_match_type_filter is not None:
399            self._stream._maybe_add_filter(
400                "prefix-" + self.__prefix_match_type_filter, None, self.__prefix_filter
401            )
402
403        if self.__data_source["source_type"] != "broker":
404            self._stream.set_data_interface_option(
405                "singlefile",
406                self.__data_source["source_type"] + "-file",
407                self.__data_source["file_path"],
408            )
409            self._stream.set_data_interface_option(
410                "singlefile",
411                self.__data_source["source_type"] + "-type",
412                self.__data_source["file_format"],
413            )
414        else:
415            project = (
416                self.__project if self.__isRecord else PROJECT_TYPES[self.__project]
417            )
418            self._stream._maybe_add_filter("project", project, None)
419            self._stream._maybe_add_filter("collector", None, self.__collectors)
420
421        if self.__isRecord:
422            self._stream.stream.set_live_mode()
423        return self._stream
424
425    def cpt_update(self):
426        """For debugging. Print time each 100000 elem"""
427        if not hasattr(self, "timer"):
428            self.timer = {"cpt": 0, "tmp": None, "ft": time.time(), "ot": time.time()}
429        self.timer["cpt"] += 1
430        if self.timer["cpt"] % 100000 == 0:
431            self.timer["nt"] = time.time()
432            print(
433                f'Counter : {self.timer["cpt"]} '
434                f'- Time {self.timer["nt"] - self.timer["ft"]} '
435                f'- {self.timer["nt"] - self.timer["ot"]}s',
436                file=sys.stderr,
437            )
438            # sys.stderr.write(f"Queue size : {self.queue.qsize()}")
439            self.timer["ot"] = self.timer["nt"]
440
441    def start(self):
442        """
443        Start retrieving stream/records and filtering them
444        - Download and load Geo-Open database
445        - Build Stream
446        - Send messages to bgpout.py
447        """
448
449        self.out.start()
450        print("Loading stream...")
451        self._build_stream()
452        print("Starting")
453        self.isStarted = True
454
455        for e in self._stream:
456            self.cpt_update()
457            msg = {
458                "type": e.type,
459                "time": e.time,
460                "peer_address": e.peer_address,
461                "peer_asn": e.peer_asn,
462                "collector": e.collector,
463                "project": e.project,
464                "router": e.router,
465                "router_ip": e.router_ip,
466            }
467            msg |= e.fields
468            msg["country_code"] = self.__country_by_prefix(msg["prefix"])
469            msg["source"] = msg["as-path"].split()[-1] if "as-path" in msg else None
470
471            if self.__check_country(msg):
472                self.out.iteration(msg)
473
474    def stop(self):
475        """
476        Close output (JSON, Databases, etc) and stop BGPStream
477        """
478        print("Stream ended")
479        self.out.stop()
480        exit(0)
class BGPFilter:
 41class BGPFilter:
 42    """
 43    BGP stream filter
 44    """
 45
 46    def __init__(self):
 47        self.__isRecord = False
 48        self.__start_time = ""
 49        self.__end_time = ""
 50        self.__asn_filter = None
 51        self.__ipversion = ""
 52        self.__prefix_filter = None
 53        self.__asn_list = None
 54        self.__prefix_match_type_filter = None
 55        self.__project = PROJECTS[0]
 56        self.__collectors = None
 57        self.__countries_filter = None
 58        self.__data_source = {"source_type": "broker"}
 59        self.out: bgpout.BGPOut = None
 60        """`bgpout.BGPOut()` instance that will receive all records"""
 61        # multiprocessing.set_start_method("fork")
 62        # self.queue = faster_fifo.Queue()
 63        # self.process = multiprocessing.Process(
 64        # target=self.process, args=(self.queue,
 65        # ), daemon=True)
 66
 67    ###############
 68    #   GETTERS   #
 69    ###############
 70
 71    @property
 72    def isRecord(self) -> bool:
 73        """Retrieve in past or in live mode.
 74        see `BGPFilter.record_mode()`"""
 75        return self.__isRecord
 76
 77    @property
 78    def start_time(self) -> str:
 79        """Start of the interval for record mode.
 80        see `BGPFilter.record_mode()`"""
 81        return self.__start_time
 82
 83    @property
 84    def end_time(self) -> str:
 85        """End of the interval for record mode.
 86        see `BGPFilter.record_mode()`"""
 87        return self.__end_time
 88
 89    @property
 90    def prefix_filter(self) -> List[str]:
 91        """List of prefixes (CIDR format) to filter"""
 92        return self.__prefix_filter
 93
 94    @property
 95    def asn_filter(self) -> List[str]:
 96        """List of AS numbers"""
 97        return self.__asn_list
 98
 99    @property
100    def collectors(self) -> List[str]:
101        """List of collectors to list"""
102        return self.__collectors
103
104    @property
105    def project(self) -> str:
106        """Accepted project : `ris` or `routeviews`"""
107        return self.__project
108
109    @property
110    def ipversion(self) -> str:
111        """
112        Formatted string for ip version filtering.
113
114        Accepted versions : `4` or `6`
115        """
116        return self.__ipversion
117
118    ###############
119    #   SETTERS   #
120    ###############
121
122    def record_mode(self, isRecord, start, end):
123        """
124        Define if retrieve from an interval or live stream
125            start and end won't be modified if isRecord is False
126
127        Args:
128            - isRecord (bool) : record or live mode
129            - start (string): Beginning of the interval.
130
131                Timestamp format : YYYY-MM-DD hh:mm:ss -> Example: 2022-01-01 10:00:00
132
133            - end (string): End of the interval.
134
135                Timestamp format : YYYY-MM-DD hh:mm:ss -> Example: 2022-01-01 10:10:00
136
137        Raises:
138            - ValueError: Start > End
139            - ValueError: Start or end not defined
140            - ValueError: Invalid date format
141        """
142        self.__isRecord = isRecord
143        if isRecord:
144            if start is None or end is None:
145                raise ValueError(
146                    "Record mode requires the from_time and until_time arguments"
147                )
148            try:
149                st = datetime.datetime.strptime(start, "%Y-%m-%d %H:%M:%S")
150                en = datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S")
151            except ValueError:
152                raise ValueError(
153                    "Invalid record mode date format. Must be %Y-%m-%d %H:%M:%S"
154                )
155            if st > en:
156                raise ValueError(
157                    "Invalid record mode interval. Beginning must precede the end"
158                )
159
160            self.__start_time = start
161            self.__end_time = end
162        else:
163            self.__start_time = int(time.time())
164            self.__end_time = 0
165
166    def data_source(self, record_type: str, file_format: str, file_path: str):
167        """
168        Use single file as data source
169
170        Args:
171            record_type (str): Type of records stored in the file (rib or upd)
172            file_format (str): Format of the file (mrt, bmp or ris-live)
173            file_path (str): path to archive file
174
175        Raises:
176            FileNotFoundError
177            ValueError
178        """
179        if not os.path.isfile(file_path):
180            raise FileNotFoundError
181        if record_type not in ["rib", "upd"]:
182            raise ValueError("Input file type must be rib or upd")
183        if record_type == "rib" and file_format in ["mrt", "bmp"]:
184            raise ValueError("Accepted input format types for rib : mrt, bmp")
185        elif record_type == "upd" and file_format not in ["mrt", "bmp", "ris-live"]:
186            raise ValueError(
187                "Accepted input format types for upd : mrt, bmp or ris-live"
188            )
189        self.__data_source = {
190            "source_type": record_type,
191            "file_format": file_format,
192            "file_path": file_path,
193        }
194
195    @prefix_filter.setter
196    def prefix_filter(self, values: Tuple[List[str], str]):
197        """
198        Prefix filter option
199            Keep records that match to one of specified prefixes (cidr format).
200
201        Args:
202            prefix_list (List[string]):  Format: ip/subnet | Example: 130.0.192.0/21
203            match_type (string): Type of match
204                exact: Exact match
205                less: Exact match or less specific (contained by)
206                more: Exact match or more specific (contains)
207                any: less and more
208        """
209        try:
210            cidr_list, match_type = values
211        except ValueError:
212            raise ValueError(
213                "match type and prefixes list are required for filtering by prefixes"
214            )
215        if cidr_list is not None:
216            if len(cidr_list) == 0:
217                # "Please specify one or more prefixes when filtering by prefix"
218                return
219            if match_type not in ["exact", "less", "more", "any"]:
220                raise ValueError(
221                    "Match type must be specified and one of ['exact', 'less', 'more',"
222                    " 'any']"
223                )
224            for c in cidr_list:
225                ipaddress.ip_network(c)
226            self.__prefix_match_type_filter = match_type
227            self.__prefix_filter = cidr_list
228
229    @asn_filter.setter
230    def asn_filter(self, asn_list):
231        """Filter using specified AS number list
232            Skip a record if its source-AS is not one of specified AS numbers
233            Use _ symbol for negation
234
235        Args:
236            asn_list (list): List of AS numbers
237        """
238        self.__asn_filter = ""
239        f_list = []
240        not_f_list = []
241        if asn_list is not None and len(asn_list) >= 1:
242            self.__asn_list = []
243            for i in asn_list:
244                if re.match("_[0-9]+", i):
245                    not_f_list.append(i.replace("_", ""))
246                    self.__asn_list.append(i)
247                elif re.match("[0-9]+", i):
248                    f_list.append(i)
249                    self.__asn_list.append(i)
250
251            if len(f_list) >= 1:
252                self.__asn_filter += " and path (_" + "|_".join(f_list) + ")$"
253            if len(not_f_list) >= 1:
254                self.__asn_filter = " and path !(_" + "|_".join(not_f_list) + ")$"
255
256    @collectors.setter
257    def collectors(self, collectors):
258        if collectors is not None:
259            for c in collectors:
260                if c not in COLLECTORS[self.__project]:
261                    raise ValueError("Collector isn't available or isn't valid.")
262            self.__collectors = collectors
263
264    @project.setter
265    def project(self, project: str):
266        """
267        Args:
268            project (string): ris or routesviews
269        """
270        if self.__collectors != project and project in PROJECT_TYPES.keys():
271            self.__project = project
272            self.__collectors = None
273        else:
274            raise ValueError(
275                f"Invalid project name. Valid projects list : {PROJECT_TYPES.keys()}"
276            )
277
278    @ipversion.setter
279    def ipversion(self, version):
280        """Set string for filter field
281
282        Args:
283            version (Integer): Possible values ["4" or "6"]
284        """
285        self.__ipversion = " and ipversion " + version if version in ["4", "6"] else ""
286
287    ###############
288    #   COUNTRY   #
289    ###############
290
291    @property
292    def country_file(self) -> str:
293        """Path to the Geo Open MaxMindDB File"""
294        return self.__f_country_path
295
296    @property
297    def countries_filter(self) -> List[str]:
298        """
299        List of country codes to filter
300
301        Filter using specified country.
302            Keep records that the origin of their prefix is contained in country_list.
303
304        Args:
305            country_list (List[str]): List of country codes
306
307        Raises:
308            LookupError: If an element in country_list is not valid.
309        """
310        return self.__countries_filter
311
312    @countries_filter.setter
313    def countries_filter(self, country_list: List[str]):
314        """
315        Filter using specified country.
316            Keep records that the origin of their prefix is contained in country_list.
317
318        Args:
319            country_list (List[str]): List of country codes
320
321        Raises:
322            LookupError: If an element in country_list is not valid.
323        """
324        if country_list is not None:
325            for c in country_list:
326                pycountry.countries.lookup(c)
327        self.__countries_filter = country_list
328
329    @country_file.setter
330    def country_file(self, country_file_path: str):
331        """
332        Setter for the GeoOpen country mmdb file
333
334        Args:
335            country_file_path (String): Path to Geo Open MaxMindDB File
336        """
337        if not os.path.isfile(country_file_path):
338            raise FileNotFoundError
339        self.__f_country_path = country_file_path
340
341        self.__f_country = maxminddb.open_database(
342            self.__f_country_path, maxminddb.MODE_MMAP_EXT
343        )
344        print(f"Loaded Geo Open database : {self.__f_country_path}")
345
346    def __check_country(self, e):
347        """
348        Args:
349            e (bgp dict)
350
351
352        Returns:
353            boolean: if e.["country code"] is in self.__countries_filter list
354        """
355        return not (
356            self.__countries_filter is not None
357            and e["country_code"] not in self.__countries_filter
358        )
359
360    def __country_by_prefix(self, p):
361        """
362        Parameters:
363            p (prefix): CIDR format. Example: 130.0.192.0/21
364
365        Returns:
366            string: country code of the given prefix.
367                None if not found in GeoOpen database
368        """
369        if p is None:
370            return None
371        else:
372            r = self.__f_country.get(p.split("/", 1)[0])
373            return None if r is None else r["country"]["iso_code"]
374
375    ####################
376    # PUBLIC FUNCTIONS #
377    ####################
378
379    def _build_stream(self):
380        """Build the stream with the used filters
381
382        Returns:
383            (BGPStream)
384        """
385        self._stream = pybgpstream.BGPStream(
386            from_time=self.start_time,
387            until_time=self.end_time,
388            data_interface=(
389                "broker"
390                if self.__data_source["source_type"] == "broker"
391                else "singlefile"
392            ),
393            record_type="updates",
394            filter="elemtype announcements withdrawals"
395            + self.__asn_filter
396            + self.__ipversion,
397        )
398
399        if self.__prefix_match_type_filter is not None:
400            self._stream._maybe_add_filter(
401                "prefix-" + self.__prefix_match_type_filter, None, self.__prefix_filter
402            )
403
404        if self.__data_source["source_type"] != "broker":
405            self._stream.set_data_interface_option(
406                "singlefile",
407                self.__data_source["source_type"] + "-file",
408                self.__data_source["file_path"],
409            )
410            self._stream.set_data_interface_option(
411                "singlefile",
412                self.__data_source["source_type"] + "-type",
413                self.__data_source["file_format"],
414            )
415        else:
416            project = (
417                self.__project if self.__isRecord else PROJECT_TYPES[self.__project]
418            )
419            self._stream._maybe_add_filter("project", project, None)
420            self._stream._maybe_add_filter("collector", None, self.__collectors)
421
422        if self.__isRecord:
423            self._stream.stream.set_live_mode()
424        return self._stream
425
426    def cpt_update(self):
427        """For debugging. Print time each 100000 elem"""
428        if not hasattr(self, "timer"):
429            self.timer = {"cpt": 0, "tmp": None, "ft": time.time(), "ot": time.time()}
430        self.timer["cpt"] += 1
431        if self.timer["cpt"] % 100000 == 0:
432            self.timer["nt"] = time.time()
433            print(
434                f'Counter : {self.timer["cpt"]} '
435                f'- Time {self.timer["nt"] - self.timer["ft"]} '
436                f'- {self.timer["nt"] - self.timer["ot"]}s',
437                file=sys.stderr,
438            )
439            # sys.stderr.write(f"Queue size : {self.queue.qsize()}")
440            self.timer["ot"] = self.timer["nt"]
441
442    def start(self):
443        """
444        Start retrieving stream/records and filtering them
445        - Download and load Geo-Open database
446        - Build Stream
447        - Send messages to bgpout.py
448        """
449
450        self.out.start()
451        print("Loading stream...")
452        self._build_stream()
453        print("Starting")
454        self.isStarted = True
455
456        for e in self._stream:
457            self.cpt_update()
458            msg = {
459                "type": e.type,
460                "time": e.time,
461                "peer_address": e.peer_address,
462                "peer_asn": e.peer_asn,
463                "collector": e.collector,
464                "project": e.project,
465                "router": e.router,
466                "router_ip": e.router_ip,
467            }
468            msg |= e.fields
469            msg["country_code"] = self.__country_by_prefix(msg["prefix"])
470            msg["source"] = msg["as-path"].split()[-1] if "as-path" in msg else None
471
472            if self.__check_country(msg):
473                self.out.iteration(msg)
474
475    def stop(self):
476        """
477        Close output (JSON, Databases, etc) and stop BGPStream
478        """
479        print("Stream ended")
480        self.out.stop()
481        exit(0)

BGP stream filter

out: bgpout.BGPOut

bgpout.BGPOut() instance that will receive all records

isRecord: bool

Retrieve in past or in live mode. see BGPFilter.record_mode()

start_time: str

Start of the interval for record mode. see BGPFilter.record_mode()

end_time: str

End of the interval for record mode. see BGPFilter.record_mode()

prefix_filter: List[str]

Prefix filter option Keep records that match to one of specified prefixes (cidr format).

Args: prefix_list (List[string]): Format: ip/subnet | Example: 130.0.192.0/21 match_type (string): Type of match exact: Exact match less: Exact match or less specific (contained by) more: Exact match or more specific (contains) any: less and more

asn_filter: List[str]

Filter using specified AS number list Skip a record if its source-AS is not one of specified AS numbers Use _ symbol for negation

Args: asn_list (list): List of AS numbers

collectors: List[str]

List of collectors to list

project: str

Args: project (string): ris or routesviews

ipversion: str

Set string for filter field

Args: version (Integer): Possible values ["4" or "6"]

def record_mode(self, isRecord, start, end):
122    def record_mode(self, isRecord, start, end):
123        """
124        Define if retrieve from an interval or live stream
125            start and end won't be modified if isRecord is False
126
127        Args:
128            - isRecord (bool) : record or live mode
129            - start (string): Beginning of the interval.
130
131                Timestamp format : YYYY-MM-DD hh:mm:ss -> Example: 2022-01-01 10:00:00
132
133            - end (string): End of the interval.
134
135                Timestamp format : YYYY-MM-DD hh:mm:ss -> Example: 2022-01-01 10:10:00
136
137        Raises:
138            - ValueError: Start > End
139            - ValueError: Start or end not defined
140            - ValueError: Invalid date format
141        """
142        self.__isRecord = isRecord
143        if isRecord:
144            if start is None or end is None:
145                raise ValueError(
146                    "Record mode requires the from_time and until_time arguments"
147                )
148            try:
149                st = datetime.datetime.strptime(start, "%Y-%m-%d %H:%M:%S")
150                en = datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S")
151            except ValueError:
152                raise ValueError(
153                    "Invalid record mode date format. Must be %Y-%m-%d %H:%M:%S"
154                )
155            if st > en:
156                raise ValueError(
157                    "Invalid record mode interval. Beginning must precede the end"
158                )
159
160            self.__start_time = start
161            self.__end_time = end
162        else:
163            self.__start_time = int(time.time())
164            self.__end_time = 0

Define if retrieve from an interval or live stream start and end won't be modified if isRecord is False

Args: - isRecord (bool) : record or live mode - start (string): Beginning of the interval.

    Timestamp format : YYYY-MM-DD hh:mm:ss -> Example: 2022-01-01 10:00:00

- end (string): End of the interval.

    Timestamp format : YYYY-MM-DD hh:mm:ss -> Example: 2022-01-01 10:10:00

Raises: - ValueError: Start > End - ValueError: Start or end not defined - ValueError: Invalid date format

def data_source(self, record_type: str, file_format: str, file_path: str):
166    def data_source(self, record_type: str, file_format: str, file_path: str):
167        """
168        Use single file as data source
169
170        Args:
171            record_type (str): Type of records stored in the file (rib or upd)
172            file_format (str): Format of the file (mrt, bmp or ris-live)
173            file_path (str): path to archive file
174
175        Raises:
176            FileNotFoundError
177            ValueError
178        """
179        if not os.path.isfile(file_path):
180            raise FileNotFoundError
181        if record_type not in ["rib", "upd"]:
182            raise ValueError("Input file type must be rib or upd")
183        if record_type == "rib" and file_format in ["mrt", "bmp"]:
184            raise ValueError("Accepted input format types for rib : mrt, bmp")
185        elif record_type == "upd" and file_format not in ["mrt", "bmp", "ris-live"]:
186            raise ValueError(
187                "Accepted input format types for upd : mrt, bmp or ris-live"
188            )
189        self.__data_source = {
190            "source_type": record_type,
191            "file_format": file_format,
192            "file_path": file_path,
193        }

Use single file as data source

Args: record_type (str): Type of records stored in the file (rib or upd) file_format (str): Format of the file (mrt, bmp or ris-live) file_path (str): path to archive file

Raises: FileNotFoundError ValueError

country_file: str

Setter for the GeoOpen country mmdb file

Args: country_file_path (String): Path to Geo Open MaxMindDB File

countries_filter: List[str]

Filter using specified country. Keep records that the origin of their prefix is contained in country_list.

Args: country_list (List[str]): List of country codes

Raises: LookupError: If an element in country_list is not valid.

def cpt_update(self):
426    def cpt_update(self):
427        """For debugging. Print time each 100000 elem"""
428        if not hasattr(self, "timer"):
429            self.timer = {"cpt": 0, "tmp": None, "ft": time.time(), "ot": time.time()}
430        self.timer["cpt"] += 1
431        if self.timer["cpt"] % 100000 == 0:
432            self.timer["nt"] = time.time()
433            print(
434                f'Counter : {self.timer["cpt"]} '
435                f'- Time {self.timer["nt"] - self.timer["ft"]} '
436                f'- {self.timer["nt"] - self.timer["ot"]}s',
437                file=sys.stderr,
438            )
439            # sys.stderr.write(f"Queue size : {self.queue.qsize()}")
440            self.timer["ot"] = self.timer["nt"]

For debugging. Print time each 100000 elem

def start(self):
442    def start(self):
443        """
444        Start retrieving stream/records and filtering them
445        - Download and load Geo-Open database
446        - Build Stream
447        - Send messages to bgpout.py
448        """
449
450        self.out.start()
451        print("Loading stream...")
452        self._build_stream()
453        print("Starting")
454        self.isStarted = True
455
456        for e in self._stream:
457            self.cpt_update()
458            msg = {
459                "type": e.type,
460                "time": e.time,
461                "peer_address": e.peer_address,
462                "peer_asn": e.peer_asn,
463                "collector": e.collector,
464                "project": e.project,
465                "router": e.router,
466                "router_ip": e.router_ip,
467            }
468            msg |= e.fields
469            msg["country_code"] = self.__country_by_prefix(msg["prefix"])
470            msg["source"] = msg["as-path"].split()[-1] if "as-path" in msg else None
471
472            if self.__check_country(msg):
473                self.out.iteration(msg)

Start retrieving stream/records and filtering them

  • Download and load Geo-Open database
  • Build Stream
  • Send messages to bgpout.py
def stop(self):
475    def stop(self):
476        """
477        Close output (JSON, Databases, etc) and stop BGPStream
478        """
479        print("Stream ended")
480        self.out.stop()
481        exit(0)

Close output (JSON, Databases, etc) and stop BGPStream