Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@ geoip2.egg-info/
MANIFEST
.mypy_cache/
*.pyc
pylint.txt
.pyre
.pytype
*.swp
.tox
violations.pyflakes.txt
/venv
/venv
6 changes: 4 additions & 2 deletions examples/benchmark.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/python

"""Simple benchmarking script."""

import argparse
import contextlib
Expand All @@ -9,6 +9,7 @@
import timeit

import geoip2.database
import geoip2.errors

parser = argparse.ArgumentParser(description="Benchmark maxminddb.")
parser.add_argument("--count", default=250000, type=int, help="number of lookups")
Expand All @@ -21,6 +22,7 @@


def lookup_ip_address() -> None:
"""Look up IP address."""
ip = socket.inet_ntoa(struct.pack("!L", random.getrandbits(32)))
with contextlib.suppress(geoip2.errors.AddressNotFoundError):
reader.city(str(ip))
Expand All @@ -32,4 +34,4 @@ def lookup_ip_address() -> None:
number=args.count,
)

print(args.count / elapsed, "lookups per second")
print(args.count / elapsed, "lookups per second") # noqa: T201
2 changes: 1 addition & 1 deletion geoip2/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# pylint:disable=C0111
"""geoip2 client library."""

__title__ = "geoip2"
__version__ = "5.1.0"
Expand Down
9 changes: 3 additions & 6 deletions geoip2/_internal.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
"""Internal utilities."""

# pylint: disable=too-few-public-methods
from abc import ABCMeta


class Model(metaclass=ABCMeta):
class Model(metaclass=ABCMeta): # noqa: B024
"""Shared methods for MaxMind model classes."""

def __eq__(self, other: object) -> bool:
return isinstance(other, self.__class__) and self.to_dict() == other.to_dict()

def __ne__(self, other) -> bool:
def __ne__(self, other: object) -> bool:
return not self.__eq__(other)

# pylint: disable=too-many-branches
def to_dict(self) -> dict:
def to_dict(self) -> dict: # noqa: C901, PLR0912
"""Return a dict of the object suitable for serialization."""
result = {}
for key, value in self.__dict__.items():
Expand Down Expand Up @@ -42,7 +40,6 @@ def to_dict(self) -> dict:
result[key] = value

# network and ip_address are properties for performance reasons
# pylint: disable=no-member
if hasattr(self, "ip_address") and self.ip_address is not None:
result["ip_address"] = str(self.ip_address)
if hasattr(self, "network") and self.network is not None:
Expand Down
95 changes: 52 additions & 43 deletions geoip2/database.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""The database reader for MaxMind MMDB files."""

from __future__ import annotations

import inspect
import os
from collections.abc import Sequence
from typing import IO, Any, AnyStr, Optional, Union, cast
from typing import IO, TYPE_CHECKING, AnyStr, cast

import maxminddb
from maxminddb import (
Expand All @@ -13,23 +13,31 @@
MODE_MEMORY,
MODE_MMAP,
MODE_MMAP_EXT,
InvalidDatabaseError,
)

import geoip2
import geoip2.errors
import geoip2.models
from geoip2.models import (
ASN,
ISP,
AnonymousIP,
AnonymousPlus,
City,
ConnectionType,
Country,
Domain,
Enterprise,
)
from geoip2.types import IPAddress

if TYPE_CHECKING:
import os
from collections.abc import Sequence

from typing_extensions import Self

from geoip2.models import (
ASN,
ISP,
AnonymousIP,
AnonymousPlus,
City,
ConnectionType,
Country,
Domain,
Enterprise,
)
from geoip2.types import IPAddress

__all__ = [
"MODE_AUTO",
Expand Down Expand Up @@ -67,8 +75,8 @@ class Reader:

def __init__(
self,
fileish: Union[AnyStr, int, os.PathLike, IO],
locales: Optional[Sequence[str]] = None,
fileish: AnyStr | int | os.PathLike | IO,
locales: Sequence[str] | None = None,
mode: int = MODE_AUTO,
) -> None:
"""Create GeoIP2 Reader.
Expand Down Expand Up @@ -117,10 +125,10 @@ def __init__(
self._db_type = self._db_reader.metadata().database_type
self._locales = locales

def __enter__(self) -> "Reader":
def __enter__(self) -> Self:
return self

def __exit__(self, exc_type: None, exc_value: None, traceback: None) -> None:
def __exit__(self, exc_type: object, exc_value: object, traceback: object) -> None:
self.close()

def country(self, ip_address: IPAddress) -> Country:
Expand All @@ -132,7 +140,7 @@ def country(self, ip_address: IPAddress) -> Country:

"""
return cast(
Country,
"Country",
self._model_for(geoip2.models.Country, "Country", ip_address),
)

Expand All @@ -144,7 +152,7 @@ def city(self, ip_address: IPAddress) -> City:
:returns: :py:class:`geoip2.models.City` object

"""
return cast(City, self._model_for(geoip2.models.City, "City", ip_address))
return cast("City", self._model_for(geoip2.models.City, "City", ip_address))

def anonymous_ip(self, ip_address: IPAddress) -> AnonymousIP:
"""Get the AnonymousIP object for the IP address.
Expand All @@ -155,7 +163,7 @@ def anonymous_ip(self, ip_address: IPAddress) -> AnonymousIP:

"""
return cast(
AnonymousIP,
"AnonymousIP",
self._flat_model_for(
geoip2.models.AnonymousIP,
"GeoIP2-Anonymous-IP",
Expand All @@ -172,7 +180,7 @@ def anonymous_plus(self, ip_address: IPAddress) -> AnonymousPlus:

"""
return cast(
AnonymousPlus,
"AnonymousPlus",
self._flat_model_for(
geoip2.models.AnonymousPlus,
"GeoIP-Anonymous-Plus",
Expand All @@ -189,7 +197,7 @@ def asn(self, ip_address: IPAddress) -> ASN:

"""
return cast(
ASN,
"ASN",
self._flat_model_for(geoip2.models.ASN, "GeoLite2-ASN", ip_address),
)

Expand All @@ -202,7 +210,7 @@ def connection_type(self, ip_address: IPAddress) -> ConnectionType:

"""
return cast(
ConnectionType,
"ConnectionType",
self._flat_model_for(
geoip2.models.ConnectionType,
"GeoIP2-Connection-Type",
Expand All @@ -219,7 +227,7 @@ def domain(self, ip_address: IPAddress) -> Domain:

"""
return cast(
Domain,
"Domain",
self._flat_model_for(geoip2.models.Domain, "GeoIP2-Domain", ip_address),
)

Expand All @@ -232,7 +240,7 @@ def enterprise(self, ip_address: IPAddress) -> Enterprise:

"""
return cast(
Enterprise,
"Enterprise",
self._model_for(geoip2.models.Enterprise, "Enterprise", ip_address),
)

Expand All @@ -245,31 +253,38 @@ def isp(self, ip_address: IPAddress) -> ISP:

"""
return cast(
ISP,
"ISP",
self._flat_model_for(geoip2.models.ISP, "GeoIP2-ISP", ip_address),
)

def _get(self, database_type: str, ip_address: IPAddress) -> Any:
def _get(self, database_type: str, ip_address: IPAddress) -> tuple[dict, int]:
if database_type not in self._db_type:
caller = inspect.stack()[2][3]
msg = (
f"The {caller} method cannot be used with the {self._db_type} database"
)
raise TypeError(
f"The {caller} method cannot be used with the {self._db_type} database",
msg,
)
(record, prefix_len) = self._db_reader.get_with_prefix_len(ip_address)
if record is None:
msg = f"The address {ip_address} is not in the database."
raise geoip2.errors.AddressNotFoundError(
f"The address {ip_address} is not in the database.",
msg,
str(ip_address),
prefix_len,
)
if not isinstance(record, dict):
msg = f"Expected record to be a dict but was f{type(record)}"
raise InvalidDatabaseError(msg)
return record, prefix_len

def _model_for(
self,
model_class: Union[type[Country], type[Enterprise], type[City]],
model_class: type[City | Country | Enterprise],
types: str,
ip_address: IPAddress,
) -> Union[Country, Enterprise, City]:
) -> City | Country | Enterprise:
(record, prefix_len) = self._get(types, ip_address)
return model_class(
self._locales,
Expand All @@ -280,28 +295,22 @@ def _model_for(

def _flat_model_for(
self,
model_class: Union[
type[Domain],
type[ISP],
type[ConnectionType],
type[ASN],
type[AnonymousIP],
],
model_class: type[Domain | ISP | ConnectionType | ASN | AnonymousIP],
types: str,
ip_address: IPAddress,
) -> Union[ConnectionType, ISP, AnonymousIP, Domain, ASN]:
) -> ConnectionType | ISP | AnonymousIP | Domain | ASN:
(record, prefix_len) = self._get(types, ip_address)
return model_class(ip_address, prefix_len=prefix_len, **record)

def metadata(
self,
) -> maxminddb.reader.Metadata:
"""The metadata for the open database.
"""Get the metadata for the open database.

:returns: :py:class:`maxminddb.reader.Metadata` object
"""
return self._db_reader.metadata()

def close(self) -> None:
"""Closes the GeoIP2 database."""
"""Close the GeoIP2 database."""
self._db_reader.close()
Loading
Loading