1313 lines
40 KiB
Python
1313 lines
40 KiB
Python
"""The networks module contains types for common network-related fields."""
|
||
|
||
from __future__ import annotations as _annotations
|
||
|
||
import dataclasses as _dataclasses
|
||
import re
|
||
from dataclasses import fields
|
||
from functools import lru_cache
|
||
from importlib.metadata import version
|
||
from ipaddress import IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network
|
||
from typing import TYPE_CHECKING, Annotated, Any, ClassVar
|
||
|
||
from pydantic_core import (
|
||
MultiHostHost,
|
||
PydanticCustomError,
|
||
PydanticSerializationUnexpectedValue,
|
||
SchemaSerializer,
|
||
core_schema,
|
||
)
|
||
from pydantic_core import MultiHostUrl as _CoreMultiHostUrl
|
||
from pydantic_core import Url as _CoreUrl
|
||
from typing_extensions import Self, TypeAlias
|
||
|
||
from pydantic.errors import PydanticUserError
|
||
|
||
from ._internal import _repr, _schema_generation_shared
|
||
from ._migration import getattr_migration
|
||
from .annotated_handlers import GetCoreSchemaHandler
|
||
from .json_schema import JsonSchemaValue
|
||
from .type_adapter import TypeAdapter
|
||
|
||
if TYPE_CHECKING:
|
||
import email_validator
|
||
|
||
NetworkType: TypeAlias = 'str | bytes | int | tuple[str | bytes | int, str | int]'
|
||
|
||
else:
|
||
email_validator = None
|
||
|
||
|
||
__all__ = [
|
||
'AnyUrl',
|
||
'AnyHttpUrl',
|
||
'FileUrl',
|
||
'FtpUrl',
|
||
'HttpUrl',
|
||
'WebsocketUrl',
|
||
'AnyWebsocketUrl',
|
||
'UrlConstraints',
|
||
'EmailStr',
|
||
'NameEmail',
|
||
'IPvAnyAddress',
|
||
'IPvAnyInterface',
|
||
'IPvAnyNetwork',
|
||
'PostgresDsn',
|
||
'CockroachDsn',
|
||
'AmqpDsn',
|
||
'RedisDsn',
|
||
'MongoDsn',
|
||
'KafkaDsn',
|
||
'NatsDsn',
|
||
'validate_email',
|
||
'MySQLDsn',
|
||
'MariaDBDsn',
|
||
'ClickHouseDsn',
|
||
'SnowflakeDsn',
|
||
]
|
||
|
||
|
||
@_dataclasses.dataclass
|
||
class UrlConstraints:
|
||
"""Url constraints.
|
||
|
||
Attributes:
|
||
max_length: The maximum length of the url. Defaults to `None`.
|
||
allowed_schemes: The allowed schemes. Defaults to `None`.
|
||
host_required: Whether the host is required. Defaults to `None`.
|
||
default_host: The default host. Defaults to `None`.
|
||
default_port: The default port. Defaults to `None`.
|
||
default_path: The default path. Defaults to `None`.
|
||
"""
|
||
|
||
max_length: int | None = None
|
||
allowed_schemes: list[str] | None = None
|
||
host_required: bool | None = None
|
||
default_host: str | None = None
|
||
default_port: int | None = None
|
||
default_path: str | None = None
|
||
|
||
def __hash__(self) -> int:
|
||
return hash(
|
||
(
|
||
self.max_length,
|
||
tuple(self.allowed_schemes) if self.allowed_schemes is not None else None,
|
||
self.host_required,
|
||
self.default_host,
|
||
self.default_port,
|
||
self.default_path,
|
||
)
|
||
)
|
||
|
||
@property
|
||
def defined_constraints(self) -> dict[str, Any]:
|
||
"""Fetch a key / value mapping of constraints to values that are not None. Used for core schema updates."""
|
||
return {field.name: value for field in fields(self) if (value := getattr(self, field.name)) is not None}
|
||
|
||
def __get_pydantic_core_schema__(self, source: Any, handler: GetCoreSchemaHandler) -> core_schema.CoreSchema:
|
||
schema = handler(source)
|
||
|
||
# for function-wrap schemas, url constraints is applied to the inner schema
|
||
# because when we generate schemas for urls, we wrap a core_schema.url_schema() with a function-wrap schema
|
||
# that helps with validation on initialization, see _BaseUrl and _BaseMultiHostUrl below.
|
||
schema_to_mutate = schema['schema'] if schema['type'] == 'function-wrap' else schema
|
||
if annotated_type := schema_to_mutate['type'] not in ('url', 'multi-host-url'):
|
||
raise PydanticUserError(
|
||
f"'UrlConstraints' cannot annotate '{annotated_type}'.", code='invalid-annotated-type'
|
||
)
|
||
for constraint_key, constraint_value in self.defined_constraints.items():
|
||
schema_to_mutate[constraint_key] = constraint_value
|
||
return schema
|
||
|
||
|
||
class _BaseUrl:
|
||
_constraints: ClassVar[UrlConstraints] = UrlConstraints()
|
||
_url: _CoreUrl
|
||
|
||
def __init__(self, url: str | _CoreUrl | _BaseUrl) -> None:
|
||
self._url = _build_type_adapter(self.__class__).validate_python(url)._url
|
||
|
||
@property
|
||
def scheme(self) -> str:
|
||
"""The scheme part of the URL.
|
||
|
||
e.g. `https` in `https://user:pass@host:port/path?query#fragment`
|
||
"""
|
||
return self._url.scheme
|
||
|
||
@property
|
||
def username(self) -> str | None:
|
||
"""The username part of the URL, or `None`.
|
||
|
||
e.g. `user` in `https://user:pass@host:port/path?query#fragment`
|
||
"""
|
||
return self._url.username
|
||
|
||
@property
|
||
def password(self) -> str | None:
|
||
"""The password part of the URL, or `None`.
|
||
|
||
e.g. `pass` in `https://user:pass@host:port/path?query#fragment`
|
||
"""
|
||
return self._url.password
|
||
|
||
@property
|
||
def host(self) -> str | None:
|
||
"""The host part of the URL, or `None`.
|
||
|
||
If the URL must be punycode encoded, this is the encoded host, e.g if the input URL is `https://£££.com`,
|
||
`host` will be `xn--9aaa.com`
|
||
"""
|
||
return self._url.host
|
||
|
||
def unicode_host(self) -> str | None:
|
||
"""The host part of the URL as a unicode string, or `None`.
|
||
|
||
e.g. `host` in `https://user:pass@host:port/path?query#fragment`
|
||
|
||
If the URL must be punycode encoded, this is the decoded host, e.g if the input URL is `https://£££.com`,
|
||
`unicode_host()` will be `£££.com`
|
||
"""
|
||
return self._url.unicode_host()
|
||
|
||
@property
|
||
def port(self) -> int | None:
|
||
"""The port part of the URL, or `None`.
|
||
|
||
e.g. `port` in `https://user:pass@host:port/path?query#fragment`
|
||
"""
|
||
return self._url.port
|
||
|
||
@property
|
||
def path(self) -> str | None:
|
||
"""The path part of the URL, or `None`.
|
||
|
||
e.g. `/path` in `https://user:pass@host:port/path?query#fragment`
|
||
"""
|
||
return self._url.path
|
||
|
||
@property
|
||
def query(self) -> str | None:
|
||
"""The query part of the URL, or `None`.
|
||
|
||
e.g. `query` in `https://user:pass@host:port/path?query#fragment`
|
||
"""
|
||
return self._url.query
|
||
|
||
def query_params(self) -> list[tuple[str, str]]:
|
||
"""The query part of the URL as a list of key-value pairs.
|
||
|
||
e.g. `[('foo', 'bar')]` in `https://user:pass@host:port/path?foo=bar#fragment`
|
||
"""
|
||
return self._url.query_params()
|
||
|
||
@property
|
||
def fragment(self) -> str | None:
|
||
"""The fragment part of the URL, or `None`.
|
||
|
||
e.g. `fragment` in `https://user:pass@host:port/path?query#fragment`
|
||
"""
|
||
return self._url.fragment
|
||
|
||
def unicode_string(self) -> str:
|
||
"""The URL as a unicode string, unlike `__str__()` this will not punycode encode the host.
|
||
|
||
If the URL must be punycode encoded, this is the decoded string, e.g if the input URL is `https://£££.com`,
|
||
`unicode_string()` will be `https://£££.com`
|
||
"""
|
||
return self._url.unicode_string()
|
||
|
||
def encoded_string(self) -> str:
|
||
"""The URL's encoded string representation via __str__().
|
||
|
||
This returns the punycode-encoded host version of the URL as a string.
|
||
"""
|
||
return str(self)
|
||
|
||
def __str__(self) -> str:
|
||
"""The URL as a string, this will punycode encode the host if required."""
|
||
return str(self._url)
|
||
|
||
def __repr__(self) -> str:
|
||
return f'{self.__class__.__name__}({str(self._url)!r})'
|
||
|
||
def __deepcopy__(self, memo: dict) -> Self:
|
||
return self.__class__(self._url)
|
||
|
||
def __eq__(self, other: Any) -> bool:
|
||
return self.__class__ is other.__class__ and self._url == other._url
|
||
|
||
def __lt__(self, other: Any) -> bool:
|
||
return self.__class__ is other.__class__ and self._url < other._url
|
||
|
||
def __gt__(self, other: Any) -> bool:
|
||
return self.__class__ is other.__class__ and self._url > other._url
|
||
|
||
def __le__(self, other: Any) -> bool:
|
||
return self.__class__ is other.__class__ and self._url <= other._url
|
||
|
||
def __ge__(self, other: Any) -> bool:
|
||
return self.__class__ is other.__class__ and self._url >= other._url
|
||
|
||
def __hash__(self) -> int:
|
||
return hash(self._url)
|
||
|
||
def __len__(self) -> int:
|
||
return len(str(self._url))
|
||
|
||
@classmethod
|
||
def build(
|
||
cls,
|
||
*,
|
||
scheme: str,
|
||
username: str | None = None,
|
||
password: str | None = None,
|
||
host: str,
|
||
port: int | None = None,
|
||
path: str | None = None,
|
||
query: str | None = None,
|
||
fragment: str | None = None,
|
||
) -> Self:
|
||
"""Build a new `Url` instance from its component parts.
|
||
|
||
Args:
|
||
scheme: The scheme part of the URL.
|
||
username: The username part of the URL, or omit for no username.
|
||
password: The password part of the URL, or omit for no password.
|
||
host: The host part of the URL.
|
||
port: The port part of the URL, or omit for no port.
|
||
path: The path part of the URL, or omit for no path.
|
||
query: The query part of the URL, or omit for no query.
|
||
fragment: The fragment part of the URL, or omit for no fragment.
|
||
|
||
Returns:
|
||
An instance of URL
|
||
"""
|
||
return cls(
|
||
_CoreUrl.build(
|
||
scheme=scheme,
|
||
username=username,
|
||
password=password,
|
||
host=host,
|
||
port=port,
|
||
path=path,
|
||
query=query,
|
||
fragment=fragment,
|
||
)
|
||
)
|
||
|
||
@classmethod
|
||
def serialize_url(cls, url: Any, info: core_schema.SerializationInfo) -> str | Self:
|
||
if not isinstance(url, cls):
|
||
raise PydanticSerializationUnexpectedValue(
|
||
f"Expected `{cls}` but got `{type(url)}` with value `'{url}'` - serialized value may not be as expected."
|
||
)
|
||
if info.mode == 'json':
|
||
return str(url)
|
||
return url
|
||
|
||
@classmethod
|
||
def __get_pydantic_core_schema__(
|
||
cls, source: type[_BaseUrl], handler: GetCoreSchemaHandler
|
||
) -> core_schema.CoreSchema:
|
||
def wrap_val(v, h):
|
||
if isinstance(v, source):
|
||
return v
|
||
if isinstance(v, _BaseUrl):
|
||
v = str(v)
|
||
core_url = h(v)
|
||
instance = source.__new__(source)
|
||
instance._url = core_url
|
||
return instance
|
||
|
||
return core_schema.no_info_wrap_validator_function(
|
||
wrap_val,
|
||
schema=core_schema.url_schema(**cls._constraints.defined_constraints),
|
||
serialization=core_schema.plain_serializer_function_ser_schema(
|
||
cls.serialize_url, info_arg=True, when_used='always'
|
||
),
|
||
)
|
||
|
||
@classmethod
|
||
def __get_pydantic_json_schema__(
|
||
cls, core_schema: core_schema.CoreSchema, handler: _schema_generation_shared.GetJsonSchemaHandler
|
||
) -> JsonSchemaValue:
|
||
# we use the url schema for json schema generation, but we might have to extract it from
|
||
# the function-wrap schema we use as a tool for validation on initialization
|
||
inner_schema = core_schema['schema'] if core_schema['type'] == 'function-wrap' else core_schema
|
||
return handler(inner_schema)
|
||
|
||
__pydantic_serializer__ = SchemaSerializer(core_schema.any_schema(serialization=core_schema.to_string_ser_schema()))
|
||
|
||
|
||
class _BaseMultiHostUrl:
|
||
_constraints: ClassVar[UrlConstraints] = UrlConstraints()
|
||
_url: _CoreMultiHostUrl
|
||
|
||
def __init__(self, url: str | _CoreMultiHostUrl | _BaseMultiHostUrl) -> None:
|
||
self._url = _build_type_adapter(self.__class__).validate_python(url)._url
|
||
|
||
@property
|
||
def scheme(self) -> str:
|
||
"""The scheme part of the URL.
|
||
|
||
e.g. `https` in `https://foo.com,bar.com/path?query#fragment`
|
||
"""
|
||
return self._url.scheme
|
||
|
||
@property
|
||
def path(self) -> str | None:
|
||
"""The path part of the URL, or `None`.
|
||
|
||
e.g. `/path` in `https://foo.com,bar.com/path?query#fragment`
|
||
"""
|
||
return self._url.path
|
||
|
||
@property
|
||
def query(self) -> str | None:
|
||
"""The query part of the URL, or `None`.
|
||
|
||
e.g. `query` in `https://foo.com,bar.com/path?query#fragment`
|
||
"""
|
||
return self._url.query
|
||
|
||
def query_params(self) -> list[tuple[str, str]]:
|
||
"""The query part of the URL as a list of key-value pairs.
|
||
|
||
e.g. `[('foo', 'bar')]` in `https://foo.com,bar.com/path?foo=bar#fragment`
|
||
"""
|
||
return self._url.query_params()
|
||
|
||
@property
|
||
def fragment(self) -> str | None:
|
||
"""The fragment part of the URL, or `None`.
|
||
|
||
e.g. `fragment` in `https://foo.com,bar.com/path?query#fragment`
|
||
"""
|
||
return self._url.fragment
|
||
|
||
def hosts(self) -> list[MultiHostHost]:
|
||
'''The hosts of the `MultiHostUrl` as [`MultiHostHost`][pydantic_core.MultiHostHost] typed dicts.
|
||
|
||
```python
|
||
from pydantic_core import MultiHostUrl
|
||
|
||
mhu = MultiHostUrl('https://foo.com:123,foo:bar@bar.com/path')
|
||
print(mhu.hosts())
|
||
"""
|
||
[
|
||
{'username': None, 'password': None, 'host': 'foo.com', 'port': 123},
|
||
{'username': 'foo', 'password': 'bar', 'host': 'bar.com', 'port': 443}
|
||
]
|
||
```
|
||
Returns:
|
||
A list of dicts, each representing a host.
|
||
'''
|
||
return self._url.hosts()
|
||
|
||
def encoded_string(self) -> str:
|
||
"""The URL's encoded string representation via __str__().
|
||
|
||
This returns the punycode-encoded host version of the URL as a string.
|
||
"""
|
||
return str(self)
|
||
|
||
def unicode_string(self) -> str:
|
||
"""The URL as a unicode string, unlike `__str__()` this will not punycode encode the hosts."""
|
||
return self._url.unicode_string()
|
||
|
||
def __str__(self) -> str:
|
||
"""The URL as a string, this will punycode encode the host if required."""
|
||
return str(self._url)
|
||
|
||
def __repr__(self) -> str:
|
||
return f'{self.__class__.__name__}({str(self._url)!r})'
|
||
|
||
def __deepcopy__(self, memo: dict) -> Self:
|
||
return self.__class__(self._url)
|
||
|
||
def __eq__(self, other: Any) -> bool:
|
||
return self.__class__ is other.__class__ and self._url == other._url
|
||
|
||
def __hash__(self) -> int:
|
||
return hash(self._url)
|
||
|
||
def __len__(self) -> int:
|
||
return len(str(self._url))
|
||
|
||
@classmethod
|
||
def build(
|
||
cls,
|
||
*,
|
||
scheme: str,
|
||
hosts: list[MultiHostHost] | None = None,
|
||
username: str | None = None,
|
||
password: str | None = None,
|
||
host: str | None = None,
|
||
port: int | None = None,
|
||
path: str | None = None,
|
||
query: str | None = None,
|
||
fragment: str | None = None,
|
||
) -> Self:
|
||
"""Build a new `MultiHostUrl` instance from its component parts.
|
||
|
||
This method takes either `hosts` - a list of `MultiHostHost` typed dicts, or the individual components
|
||
`username`, `password`, `host` and `port`.
|
||
|
||
Args:
|
||
scheme: The scheme part of the URL.
|
||
hosts: Multiple hosts to build the URL from.
|
||
username: The username part of the URL.
|
||
password: The password part of the URL.
|
||
host: The host part of the URL.
|
||
port: The port part of the URL.
|
||
path: The path part of the URL.
|
||
query: The query part of the URL, or omit for no query.
|
||
fragment: The fragment part of the URL, or omit for no fragment.
|
||
|
||
Returns:
|
||
An instance of `MultiHostUrl`
|
||
"""
|
||
return cls(
|
||
_CoreMultiHostUrl.build(
|
||
scheme=scheme,
|
||
hosts=hosts,
|
||
username=username,
|
||
password=password,
|
||
host=host,
|
||
port=port,
|
||
path=path,
|
||
query=query,
|
||
fragment=fragment,
|
||
)
|
||
)
|
||
|
||
@classmethod
|
||
def serialize_url(cls, url: Any, info: core_schema.SerializationInfo) -> str | Self:
|
||
if not isinstance(url, cls):
|
||
raise PydanticSerializationUnexpectedValue(
|
||
f"Expected `{cls}` but got `{type(url)}` with value `'{url}'` - serialized value may not be as expected."
|
||
)
|
||
if info.mode == 'json':
|
||
return str(url)
|
||
return url
|
||
|
||
@classmethod
|
||
def __get_pydantic_core_schema__(
|
||
cls, source: type[_BaseMultiHostUrl], handler: GetCoreSchemaHandler
|
||
) -> core_schema.CoreSchema:
|
||
def wrap_val(v, h):
|
||
if isinstance(v, source):
|
||
return v
|
||
if isinstance(v, _BaseMultiHostUrl):
|
||
v = str(v)
|
||
core_url = h(v)
|
||
instance = source.__new__(source)
|
||
instance._url = core_url
|
||
return instance
|
||
|
||
return core_schema.no_info_wrap_validator_function(
|
||
wrap_val,
|
||
schema=core_schema.multi_host_url_schema(**cls._constraints.defined_constraints),
|
||
serialization=core_schema.plain_serializer_function_ser_schema(
|
||
cls.serialize_url, info_arg=True, when_used='always'
|
||
),
|
||
)
|
||
|
||
@classmethod
|
||
def __get_pydantic_json_schema__(
|
||
cls, core_schema: core_schema.CoreSchema, handler: _schema_generation_shared.GetJsonSchemaHandler
|
||
) -> JsonSchemaValue:
|
||
# we use the url schema for json schema generation, but we might have to extract it from
|
||
# the function-wrap schema we use as a tool for validation on initialization
|
||
inner_schema = core_schema['schema'] if core_schema['type'] == 'function-wrap' else core_schema
|
||
return handler(inner_schema)
|
||
|
||
__pydantic_serializer__ = SchemaSerializer(core_schema.any_schema(serialization=core_schema.to_string_ser_schema()))
|
||
|
||
|
||
@lru_cache
|
||
def _build_type_adapter(cls: type[_BaseUrl | _BaseMultiHostUrl]) -> TypeAdapter:
|
||
return TypeAdapter(cls)
|
||
|
||
|
||
class AnyUrl(_BaseUrl):
|
||
"""Base type for all URLs.
|
||
|
||
* Any scheme allowed
|
||
* Top-level domain (TLD) not required
|
||
* Host not required
|
||
|
||
Assuming an input URL of `http://samuel:pass@example.com:8000/the/path/?query=here#fragment=is;this=bit`,
|
||
the types export the following properties:
|
||
|
||
- `scheme`: the URL scheme (`http`), always set.
|
||
- `host`: the URL host (`example.com`).
|
||
- `username`: optional username if included (`samuel`).
|
||
- `password`: optional password if included (`pass`).
|
||
- `port`: optional port (`8000`).
|
||
- `path`: optional path (`/the/path/`).
|
||
- `query`: optional URL query (for example, `GET` arguments or "search string", such as `query=here`).
|
||
- `fragment`: optional fragment (`fragment=is;this=bit`).
|
||
"""
|
||
|
||
|
||
# Note: all single host urls inherit from `AnyUrl` to preserve compatibility with pre-v2.10 code
|
||
# Where urls were annotated variants of `AnyUrl`, which was an alias to `pydantic_core.Url`
|
||
|
||
|
||
class AnyHttpUrl(AnyUrl):
|
||
"""A type that will accept any http or https URL.
|
||
|
||
* TLD not required
|
||
* Host not required
|
||
"""
|
||
|
||
_constraints = UrlConstraints(allowed_schemes=['http', 'https'])
|
||
|
||
|
||
class HttpUrl(AnyUrl):
|
||
"""A type that will accept any http or https URL.
|
||
|
||
* TLD not required
|
||
* Host not required
|
||
* Max length 2083
|
||
|
||
```python
|
||
from pydantic import BaseModel, HttpUrl, ValidationError
|
||
|
||
class MyModel(BaseModel):
|
||
url: HttpUrl
|
||
|
||
m = MyModel(url='http://www.example.com') # (1)!
|
||
print(m.url)
|
||
#> http://www.example.com/
|
||
|
||
try:
|
||
MyModel(url='ftp://invalid.url')
|
||
except ValidationError as e:
|
||
print(e)
|
||
'''
|
||
1 validation error for MyModel
|
||
url
|
||
URL scheme should be 'http' or 'https' [type=url_scheme, input_value='ftp://invalid.url', input_type=str]
|
||
'''
|
||
|
||
try:
|
||
MyModel(url='not a url')
|
||
except ValidationError as e:
|
||
print(e)
|
||
'''
|
||
1 validation error for MyModel
|
||
url
|
||
Input should be a valid URL, relative URL without a base [type=url_parsing, input_value='not a url', input_type=str]
|
||
'''
|
||
```
|
||
|
||
1. Note: mypy would prefer `m = MyModel(url=HttpUrl('http://www.example.com'))`, but Pydantic will convert the string to an HttpUrl instance anyway.
|
||
|
||
"International domains" (e.g. a URL where the host or TLD includes non-ascii characters) will be encoded via
|
||
[punycode](https://en.wikipedia.org/wiki/Punycode) (see
|
||
[this article](https://www.xudongz.com/blog/2017/idn-phishing/) for a good description of why this is important):
|
||
|
||
```python
|
||
from pydantic import BaseModel, HttpUrl
|
||
|
||
class MyModel(BaseModel):
|
||
url: HttpUrl
|
||
|
||
m1 = MyModel(url='http://puny£code.com')
|
||
print(m1.url)
|
||
#> http://xn--punycode-eja.com/
|
||
m2 = MyModel(url='https://www.аррӏе.com/')
|
||
print(m2.url)
|
||
#> https://www.xn--80ak6aa92e.com/
|
||
m3 = MyModel(url='https://www.example.珠宝/')
|
||
print(m3.url)
|
||
#> https://www.example.xn--pbt977c/
|
||
```
|
||
|
||
|
||
!!! warning "Underscores in Hostnames"
|
||
In Pydantic, underscores are allowed in all parts of a domain except the TLD.
|
||
Technically this might be wrong - in theory the hostname cannot have underscores, but subdomains can.
|
||
|
||
To explain this; consider the following two cases:
|
||
|
||
- `exam_ple.co.uk`: the hostname is `exam_ple`, which should not be allowed since it contains an underscore.
|
||
- `foo_bar.example.com` the hostname is `example`, which should be allowed since the underscore is in the subdomain.
|
||
|
||
Without having an exhaustive list of TLDs, it would be impossible to differentiate between these two. Therefore
|
||
underscores are allowed, but you can always do further validation in a validator if desired.
|
||
|
||
Also, Chrome, Firefox, and Safari all currently accept `http://exam_ple.com` as a URL, so we're in good
|
||
(or at least big) company.
|
||
"""
|
||
|
||
_constraints = UrlConstraints(max_length=2083, allowed_schemes=['http', 'https'])
|
||
|
||
|
||
class AnyWebsocketUrl(AnyUrl):
|
||
"""A type that will accept any ws or wss URL.
|
||
|
||
* TLD not required
|
||
* Host not required
|
||
"""
|
||
|
||
_constraints = UrlConstraints(allowed_schemes=['ws', 'wss'])
|
||
|
||
|
||
class WebsocketUrl(AnyUrl):
|
||
"""A type that will accept any ws or wss URL.
|
||
|
||
* TLD not required
|
||
* Host not required
|
||
* Max length 2083
|
||
"""
|
||
|
||
_constraints = UrlConstraints(max_length=2083, allowed_schemes=['ws', 'wss'])
|
||
|
||
|
||
class FileUrl(AnyUrl):
|
||
"""A type that will accept any file URL.
|
||
|
||
* Host not required
|
||
"""
|
||
|
||
_constraints = UrlConstraints(allowed_schemes=['file'])
|
||
|
||
|
||
class FtpUrl(AnyUrl):
|
||
"""A type that will accept ftp URL.
|
||
|
||
* TLD not required
|
||
* Host not required
|
||
"""
|
||
|
||
_constraints = UrlConstraints(allowed_schemes=['ftp'])
|
||
|
||
|
||
class PostgresDsn(_BaseMultiHostUrl):
|
||
"""A type that will accept any Postgres DSN.
|
||
|
||
* User info required
|
||
* TLD not required
|
||
* Host required
|
||
* Supports multiple hosts
|
||
|
||
If further validation is required, these properties can be used by validators to enforce specific behaviour:
|
||
|
||
```python
|
||
from pydantic import (
|
||
BaseModel,
|
||
HttpUrl,
|
||
PostgresDsn,
|
||
ValidationError,
|
||
field_validator,
|
||
)
|
||
|
||
class MyModel(BaseModel):
|
||
url: HttpUrl
|
||
|
||
m = MyModel(url='http://www.example.com')
|
||
|
||
# the repr() method for a url will display all properties of the url
|
||
print(repr(m.url))
|
||
#> HttpUrl('http://www.example.com/')
|
||
print(m.url.scheme)
|
||
#> http
|
||
print(m.url.host)
|
||
#> www.example.com
|
||
print(m.url.port)
|
||
#> 80
|
||
|
||
class MyDatabaseModel(BaseModel):
|
||
db: PostgresDsn
|
||
|
||
@field_validator('db')
|
||
def check_db_name(cls, v):
|
||
assert v.path and len(v.path) > 1, 'database must be provided'
|
||
return v
|
||
|
||
m = MyDatabaseModel(db='postgres://user:pass@localhost:5432/foobar')
|
||
print(m.db)
|
||
#> postgres://user:pass@localhost:5432/foobar
|
||
|
||
try:
|
||
MyDatabaseModel(db='postgres://user:pass@localhost:5432')
|
||
except ValidationError as e:
|
||
print(e)
|
||
'''
|
||
1 validation error for MyDatabaseModel
|
||
db
|
||
Assertion failed, database must be provided
|
||
assert (None)
|
||
+ where None = PostgresDsn('postgres://user:pass@localhost:5432').path [type=assertion_error, input_value='postgres://user:pass@localhost:5432', input_type=str]
|
||
'''
|
||
```
|
||
"""
|
||
|
||
_constraints = UrlConstraints(
|
||
host_required=True,
|
||
allowed_schemes=[
|
||
'postgres',
|
||
'postgresql',
|
||
'postgresql+asyncpg',
|
||
'postgresql+pg8000',
|
||
'postgresql+psycopg',
|
||
'postgresql+psycopg2',
|
||
'postgresql+psycopg2cffi',
|
||
'postgresql+py-postgresql',
|
||
'postgresql+pygresql',
|
||
],
|
||
)
|
||
|
||
@property
|
||
def host(self) -> str:
|
||
"""The required URL host."""
|
||
return self._url.host # pyright: ignore[reportAttributeAccessIssue]
|
||
|
||
|
||
class CockroachDsn(AnyUrl):
|
||
"""A type that will accept any Cockroach DSN.
|
||
|
||
* User info required
|
||
* TLD not required
|
||
* Host required
|
||
"""
|
||
|
||
_constraints = UrlConstraints(
|
||
host_required=True,
|
||
allowed_schemes=[
|
||
'cockroachdb',
|
||
'cockroachdb+psycopg2',
|
||
'cockroachdb+asyncpg',
|
||
],
|
||
)
|
||
|
||
@property
|
||
def host(self) -> str:
|
||
"""The required URL host."""
|
||
return self._url.host # pyright: ignore[reportReturnType]
|
||
|
||
|
||
class AmqpDsn(AnyUrl):
|
||
"""A type that will accept any AMQP DSN.
|
||
|
||
* User info required
|
||
* TLD not required
|
||
* Host not required
|
||
"""
|
||
|
||
_constraints = UrlConstraints(allowed_schemes=['amqp', 'amqps'])
|
||
|
||
|
||
class RedisDsn(AnyUrl):
|
||
"""A type that will accept any Redis DSN.
|
||
|
||
* User info required
|
||
* TLD not required
|
||
* Host required (e.g., `rediss://:pass@localhost`)
|
||
"""
|
||
|
||
_constraints = UrlConstraints(
|
||
allowed_schemes=['redis', 'rediss'],
|
||
default_host='localhost',
|
||
default_port=6379,
|
||
default_path='/0',
|
||
host_required=True,
|
||
)
|
||
|
||
@property
|
||
def host(self) -> str:
|
||
"""The required URL host."""
|
||
return self._url.host # pyright: ignore[reportReturnType]
|
||
|
||
|
||
class MongoDsn(_BaseMultiHostUrl):
|
||
"""A type that will accept any MongoDB DSN.
|
||
|
||
* User info not required
|
||
* Database name not required
|
||
* Port not required
|
||
* User info may be passed without user part (e.g., `mongodb://mongodb0.example.com:27017`).
|
||
"""
|
||
|
||
_constraints = UrlConstraints(allowed_schemes=['mongodb', 'mongodb+srv'], default_port=27017)
|
||
|
||
|
||
class KafkaDsn(AnyUrl):
|
||
"""A type that will accept any Kafka DSN.
|
||
|
||
* User info required
|
||
* TLD not required
|
||
* Host not required
|
||
"""
|
||
|
||
_constraints = UrlConstraints(allowed_schemes=['kafka'], default_host='localhost', default_port=9092)
|
||
|
||
|
||
class NatsDsn(_BaseMultiHostUrl):
|
||
"""A type that will accept any NATS DSN.
|
||
|
||
NATS is a connective technology built for the ever increasingly hyper-connected world.
|
||
It is a single technology that enables applications to securely communicate across
|
||
any combination of cloud vendors, on-premise, edge, web and mobile, and devices.
|
||
More: https://nats.io
|
||
"""
|
||
|
||
_constraints = UrlConstraints(
|
||
allowed_schemes=['nats', 'tls', 'ws', 'wss'], default_host='localhost', default_port=4222
|
||
)
|
||
|
||
|
||
class MySQLDsn(AnyUrl):
|
||
"""A type that will accept any MySQL DSN.
|
||
|
||
* User info required
|
||
* TLD not required
|
||
* Host not required
|
||
"""
|
||
|
||
_constraints = UrlConstraints(
|
||
allowed_schemes=[
|
||
'mysql',
|
||
'mysql+mysqlconnector',
|
||
'mysql+aiomysql',
|
||
'mysql+asyncmy',
|
||
'mysql+mysqldb',
|
||
'mysql+pymysql',
|
||
'mysql+cymysql',
|
||
'mysql+pyodbc',
|
||
],
|
||
default_port=3306,
|
||
host_required=True,
|
||
)
|
||
|
||
|
||
class MariaDBDsn(AnyUrl):
|
||
"""A type that will accept any MariaDB DSN.
|
||
|
||
* User info required
|
||
* TLD not required
|
||
* Host not required
|
||
"""
|
||
|
||
_constraints = UrlConstraints(
|
||
allowed_schemes=['mariadb', 'mariadb+mariadbconnector', 'mariadb+pymysql'],
|
||
default_port=3306,
|
||
)
|
||
|
||
|
||
class ClickHouseDsn(AnyUrl):
|
||
"""A type that will accept any ClickHouse DSN.
|
||
|
||
* User info required
|
||
* TLD not required
|
||
* Host not required
|
||
"""
|
||
|
||
_constraints = UrlConstraints(
|
||
allowed_schemes=[
|
||
'clickhouse+native',
|
||
'clickhouse+asynch',
|
||
'clickhouse+http',
|
||
'clickhouse',
|
||
'clickhouses',
|
||
'clickhousedb',
|
||
],
|
||
default_host='localhost',
|
||
default_port=9000,
|
||
)
|
||
|
||
|
||
class SnowflakeDsn(AnyUrl):
|
||
"""A type that will accept any Snowflake DSN.
|
||
|
||
* User info required
|
||
* TLD not required
|
||
* Host required
|
||
"""
|
||
|
||
_constraints = UrlConstraints(
|
||
allowed_schemes=['snowflake'],
|
||
host_required=True,
|
||
)
|
||
|
||
@property
|
||
def host(self) -> str:
|
||
"""The required URL host."""
|
||
return self._url.host # pyright: ignore[reportReturnType]
|
||
|
||
|
||
def import_email_validator() -> None:
|
||
global email_validator
|
||
try:
|
||
import email_validator
|
||
except ImportError as e:
|
||
raise ImportError('email-validator is not installed, run `pip install pydantic[email]`') from e
|
||
if not version('email-validator').partition('.')[0] == '2':
|
||
raise ImportError('email-validator version >= 2.0 required, run pip install -U email-validator')
|
||
|
||
|
||
if TYPE_CHECKING:
|
||
EmailStr = Annotated[str, ...]
|
||
else:
|
||
|
||
class EmailStr:
|
||
"""
|
||
Info:
|
||
To use this type, you need to install the optional
|
||
[`email-validator`](https://github.com/JoshData/python-email-validator) package:
|
||
|
||
```bash
|
||
pip install email-validator
|
||
```
|
||
|
||
Validate email addresses.
|
||
|
||
```python
|
||
from pydantic import BaseModel, EmailStr
|
||
|
||
class Model(BaseModel):
|
||
email: EmailStr
|
||
|
||
print(Model(email='contact@mail.com'))
|
||
#> email='contact@mail.com'
|
||
```
|
||
""" # noqa: D212
|
||
|
||
@classmethod
|
||
def __get_pydantic_core_schema__(
|
||
cls,
|
||
_source: type[Any],
|
||
_handler: GetCoreSchemaHandler,
|
||
) -> core_schema.CoreSchema:
|
||
import_email_validator()
|
||
return core_schema.no_info_after_validator_function(cls._validate, core_schema.str_schema())
|
||
|
||
@classmethod
|
||
def __get_pydantic_json_schema__(
|
||
cls, core_schema: core_schema.CoreSchema, handler: _schema_generation_shared.GetJsonSchemaHandler
|
||
) -> JsonSchemaValue:
|
||
field_schema = handler(core_schema)
|
||
field_schema.update(type='string', format='email')
|
||
return field_schema
|
||
|
||
@classmethod
|
||
def _validate(cls, input_value: str, /) -> str:
|
||
return validate_email(input_value)[1]
|
||
|
||
|
||
class NameEmail(_repr.Representation):
|
||
"""
|
||
Info:
|
||
To use this type, you need to install the optional
|
||
[`email-validator`](https://github.com/JoshData/python-email-validator) package:
|
||
|
||
```bash
|
||
pip install email-validator
|
||
```
|
||
|
||
Validate a name and email address combination, as specified by
|
||
[RFC 5322](https://datatracker.ietf.org/doc/html/rfc5322#section-3.4).
|
||
|
||
The `NameEmail` has two properties: `name` and `email`.
|
||
In case the `name` is not provided, it's inferred from the email address.
|
||
|
||
```python
|
||
from pydantic import BaseModel, NameEmail
|
||
|
||
class User(BaseModel):
|
||
email: NameEmail
|
||
|
||
user = User(email='Fred Bloggs <fred.bloggs@example.com>')
|
||
print(user.email)
|
||
#> Fred Bloggs <fred.bloggs@example.com>
|
||
print(user.email.name)
|
||
#> Fred Bloggs
|
||
|
||
user = User(email='fred.bloggs@example.com')
|
||
print(user.email)
|
||
#> fred.bloggs <fred.bloggs@example.com>
|
||
print(user.email.name)
|
||
#> fred.bloggs
|
||
```
|
||
""" # noqa: D212
|
||
|
||
__slots__ = 'name', 'email'
|
||
|
||
def __init__(self, name: str, email: str):
|
||
self.name = name
|
||
self.email = email
|
||
|
||
def __eq__(self, other: Any) -> bool:
|
||
return isinstance(other, NameEmail) and (self.name, self.email) == (other.name, other.email)
|
||
|
||
@classmethod
|
||
def __get_pydantic_json_schema__(
|
||
cls, core_schema: core_schema.CoreSchema, handler: _schema_generation_shared.GetJsonSchemaHandler
|
||
) -> JsonSchemaValue:
|
||
field_schema = handler(core_schema)
|
||
field_schema.update(type='string', format='name-email')
|
||
return field_schema
|
||
|
||
@classmethod
|
||
def __get_pydantic_core_schema__(
|
||
cls,
|
||
_source: type[Any],
|
||
_handler: GetCoreSchemaHandler,
|
||
) -> core_schema.CoreSchema:
|
||
import_email_validator()
|
||
|
||
return core_schema.no_info_after_validator_function(
|
||
cls._validate,
|
||
core_schema.json_or_python_schema(
|
||
json_schema=core_schema.str_schema(),
|
||
python_schema=core_schema.union_schema(
|
||
[core_schema.is_instance_schema(cls), core_schema.str_schema()],
|
||
custom_error_type='name_email_type',
|
||
custom_error_message='Input is not a valid NameEmail',
|
||
),
|
||
serialization=core_schema.to_string_ser_schema(),
|
||
),
|
||
)
|
||
|
||
@classmethod
|
||
def _validate(cls, input_value: Self | str, /) -> Self:
|
||
if isinstance(input_value, str):
|
||
name, email = validate_email(input_value)
|
||
return cls(name, email)
|
||
else:
|
||
return input_value
|
||
|
||
def __str__(self) -> str:
|
||
if '@' in self.name:
|
||
return f'"{self.name}" <{self.email}>'
|
||
|
||
return f'{self.name} <{self.email}>'
|
||
|
||
|
||
IPvAnyAddressType: TypeAlias = 'IPv4Address | IPv6Address'
|
||
IPvAnyInterfaceType: TypeAlias = 'IPv4Interface | IPv6Interface'
|
||
IPvAnyNetworkType: TypeAlias = 'IPv4Network | IPv6Network'
|
||
|
||
if TYPE_CHECKING:
|
||
IPvAnyAddress = IPvAnyAddressType
|
||
IPvAnyInterface = IPvAnyInterfaceType
|
||
IPvAnyNetwork = IPvAnyNetworkType
|
||
else:
|
||
|
||
class IPvAnyAddress:
|
||
"""Validate an IPv4 or IPv6 address.
|
||
|
||
```python
|
||
from pydantic import BaseModel
|
||
from pydantic.networks import IPvAnyAddress
|
||
|
||
class IpModel(BaseModel):
|
||
ip: IPvAnyAddress
|
||
|
||
print(IpModel(ip='127.0.0.1'))
|
||
#> ip=IPv4Address('127.0.0.1')
|
||
|
||
try:
|
||
IpModel(ip='http://www.example.com')
|
||
except ValueError as e:
|
||
print(e.errors())
|
||
'''
|
||
[
|
||
{
|
||
'type': 'ip_any_address',
|
||
'loc': ('ip',),
|
||
'msg': 'value is not a valid IPv4 or IPv6 address',
|
||
'input': 'http://www.example.com',
|
||
}
|
||
]
|
||
'''
|
||
```
|
||
"""
|
||
|
||
__slots__ = ()
|
||
|
||
def __new__(cls, value: Any) -> IPvAnyAddressType:
|
||
"""Validate an IPv4 or IPv6 address."""
|
||
try:
|
||
return IPv4Address(value)
|
||
except ValueError:
|
||
pass
|
||
|
||
try:
|
||
return IPv6Address(value)
|
||
except ValueError:
|
||
raise PydanticCustomError('ip_any_address', 'value is not a valid IPv4 or IPv6 address')
|
||
|
||
@classmethod
|
||
def __get_pydantic_json_schema__(
|
||
cls, core_schema: core_schema.CoreSchema, handler: _schema_generation_shared.GetJsonSchemaHandler
|
||
) -> JsonSchemaValue:
|
||
field_schema = {}
|
||
field_schema.update(type='string', format='ipvanyaddress')
|
||
return field_schema
|
||
|
||
@classmethod
|
||
def __get_pydantic_core_schema__(
|
||
cls,
|
||
_source: type[Any],
|
||
_handler: GetCoreSchemaHandler,
|
||
) -> core_schema.CoreSchema:
|
||
return core_schema.no_info_plain_validator_function(
|
||
cls._validate, serialization=core_schema.to_string_ser_schema()
|
||
)
|
||
|
||
@classmethod
|
||
def _validate(cls, input_value: Any, /) -> IPvAnyAddressType:
|
||
return cls(input_value) # type: ignore[return-value]
|
||
|
||
class IPvAnyInterface:
|
||
"""Validate an IPv4 or IPv6 interface."""
|
||
|
||
__slots__ = ()
|
||
|
||
def __new__(cls, value: NetworkType) -> IPvAnyInterfaceType:
|
||
"""Validate an IPv4 or IPv6 interface."""
|
||
try:
|
||
return IPv4Interface(value)
|
||
except ValueError:
|
||
pass
|
||
|
||
try:
|
||
return IPv6Interface(value)
|
||
except ValueError:
|
||
raise PydanticCustomError('ip_any_interface', 'value is not a valid IPv4 or IPv6 interface')
|
||
|
||
@classmethod
|
||
def __get_pydantic_json_schema__(
|
||
cls, core_schema: core_schema.CoreSchema, handler: _schema_generation_shared.GetJsonSchemaHandler
|
||
) -> JsonSchemaValue:
|
||
field_schema = {}
|
||
field_schema.update(type='string', format='ipvanyinterface')
|
||
return field_schema
|
||
|
||
@classmethod
|
||
def __get_pydantic_core_schema__(
|
||
cls,
|
||
_source: type[Any],
|
||
_handler: GetCoreSchemaHandler,
|
||
) -> core_schema.CoreSchema:
|
||
return core_schema.no_info_plain_validator_function(
|
||
cls._validate, serialization=core_schema.to_string_ser_schema()
|
||
)
|
||
|
||
@classmethod
|
||
def _validate(cls, input_value: NetworkType, /) -> IPvAnyInterfaceType:
|
||
return cls(input_value) # type: ignore[return-value]
|
||
|
||
class IPvAnyNetwork:
|
||
"""Validate an IPv4 or IPv6 network."""
|
||
|
||
__slots__ = ()
|
||
|
||
def __new__(cls, value: NetworkType) -> IPvAnyNetworkType:
|
||
"""Validate an IPv4 or IPv6 network."""
|
||
# Assume IP Network is defined with a default value for `strict` argument.
|
||
# Define your own class if you want to specify network address check strictness.
|
||
try:
|
||
return IPv4Network(value)
|
||
except ValueError:
|
||
pass
|
||
|
||
try:
|
||
return IPv6Network(value)
|
||
except ValueError:
|
||
raise PydanticCustomError('ip_any_network', 'value is not a valid IPv4 or IPv6 network')
|
||
|
||
@classmethod
|
||
def __get_pydantic_json_schema__(
|
||
cls, core_schema: core_schema.CoreSchema, handler: _schema_generation_shared.GetJsonSchemaHandler
|
||
) -> JsonSchemaValue:
|
||
field_schema = {}
|
||
field_schema.update(type='string', format='ipvanynetwork')
|
||
return field_schema
|
||
|
||
@classmethod
|
||
def __get_pydantic_core_schema__(
|
||
cls,
|
||
_source: type[Any],
|
||
_handler: GetCoreSchemaHandler,
|
||
) -> core_schema.CoreSchema:
|
||
return core_schema.no_info_plain_validator_function(
|
||
cls._validate, serialization=core_schema.to_string_ser_schema()
|
||
)
|
||
|
||
@classmethod
|
||
def _validate(cls, input_value: NetworkType, /) -> IPvAnyNetworkType:
|
||
return cls(input_value) # type: ignore[return-value]
|
||
|
||
|
||
def _build_pretty_email_regex() -> re.Pattern[str]:
|
||
name_chars = r'[\w!#$%&\'*+\-/=?^_`{|}~]'
|
||
unquoted_name_group = rf'((?:{name_chars}+\s+)*{name_chars}+)'
|
||
quoted_name_group = r'"((?:[^"]|\")+)"'
|
||
email_group = r'<(.+)>'
|
||
return re.compile(rf'\s*(?:{unquoted_name_group}|{quoted_name_group})?\s*{email_group}\s*')
|
||
|
||
|
||
pretty_email_regex = _build_pretty_email_regex()
|
||
|
||
MAX_EMAIL_LENGTH = 2048
|
||
"""Maximum length for an email.
|
||
A somewhat arbitrary but very generous number compared to what is allowed by most implementations.
|
||
"""
|
||
|
||
|
||
def validate_email(value: str) -> tuple[str, str]:
|
||
"""Email address validation using [email-validator](https://pypi.org/project/email-validator/).
|
||
|
||
Returns:
|
||
A tuple containing the local part of the email (or the name for "pretty" email addresses)
|
||
and the normalized email.
|
||
|
||
Raises:
|
||
PydanticCustomError: If the email is invalid.
|
||
|
||
Note:
|
||
Note that:
|
||
|
||
* Raw IP address (literal) domain parts are not allowed.
|
||
* `"John Doe <local_part@domain.com>"` style "pretty" email addresses are processed.
|
||
* Spaces are striped from the beginning and end of addresses, but no error is raised.
|
||
"""
|
||
if email_validator is None:
|
||
import_email_validator()
|
||
|
||
if len(value) > MAX_EMAIL_LENGTH:
|
||
raise PydanticCustomError(
|
||
'value_error',
|
||
'value is not a valid email address: {reason}',
|
||
{'reason': f'Length must not exceed {MAX_EMAIL_LENGTH} characters'},
|
||
)
|
||
|
||
m = pretty_email_regex.fullmatch(value)
|
||
name: str | None = None
|
||
if m:
|
||
unquoted_name, quoted_name, value = m.groups()
|
||
name = unquoted_name or quoted_name
|
||
|
||
email = value.strip()
|
||
|
||
try:
|
||
parts = email_validator.validate_email(email, check_deliverability=False)
|
||
except email_validator.EmailNotValidError as e:
|
||
raise PydanticCustomError(
|
||
'value_error', 'value is not a valid email address: {reason}', {'reason': str(e.args[0])}
|
||
) from e
|
||
|
||
email = parts.normalized
|
||
assert email is not None
|
||
name = name or parts.local_part
|
||
return name, email
|
||
|
||
|
||
__getattr__ = getattr_migration(__name__)
|