Update 2025-04-13_16:26:34
This commit is contained in:
@ -0,0 +1,15 @@
|
||||
from .api_key import APIKeyCookie as APIKeyCookie
|
||||
from .api_key import APIKeyHeader as APIKeyHeader
|
||||
from .api_key import APIKeyQuery as APIKeyQuery
|
||||
from .http import HTTPAuthorizationCredentials as HTTPAuthorizationCredentials
|
||||
from .http import HTTPBasic as HTTPBasic
|
||||
from .http import HTTPBasicCredentials as HTTPBasicCredentials
|
||||
from .http import HTTPBearer as HTTPBearer
|
||||
from .http import HTTPDigest as HTTPDigest
|
||||
from .oauth2 import OAuth2 as OAuth2
|
||||
from .oauth2 import OAuth2AuthorizationCodeBearer as OAuth2AuthorizationCodeBearer
|
||||
from .oauth2 import OAuth2PasswordBearer as OAuth2PasswordBearer
|
||||
from .oauth2 import OAuth2PasswordRequestForm as OAuth2PasswordRequestForm
|
||||
from .oauth2 import OAuth2PasswordRequestFormStrict as OAuth2PasswordRequestFormStrict
|
||||
from .oauth2 import SecurityScopes as SecurityScopes
|
||||
from .open_id_connect_url import OpenIdConnect as OpenIdConnect
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
288
venv/lib/python3.11/site-packages/fastapi/security/api_key.py
Normal file
288
venv/lib/python3.11/site-packages/fastapi/security/api_key.py
Normal file
@ -0,0 +1,288 @@
|
||||
from typing import Optional
|
||||
|
||||
from fastapi.openapi.models import APIKey, APIKeyIn
|
||||
from fastapi.security.base import SecurityBase
|
||||
from starlette.exceptions import HTTPException
|
||||
from starlette.requests import Request
|
||||
from starlette.status import HTTP_403_FORBIDDEN
|
||||
from typing_extensions import Annotated, Doc
|
||||
|
||||
|
||||
class APIKeyBase(SecurityBase):
|
||||
@staticmethod
|
||||
def check_api_key(api_key: Optional[str], auto_error: bool) -> Optional[str]:
|
||||
if not api_key:
|
||||
if auto_error:
|
||||
raise HTTPException(
|
||||
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
|
||||
)
|
||||
return None
|
||||
return api_key
|
||||
|
||||
|
||||
class APIKeyQuery(APIKeyBase):
|
||||
"""
|
||||
API key authentication using a query parameter.
|
||||
|
||||
This defines the name of the query parameter that should be provided in the request
|
||||
with the API key and integrates that into the OpenAPI documentation. It extracts
|
||||
the key value sent in the query parameter automatically and provides it as the
|
||||
dependency result. But it doesn't define how to send that API key to the client.
|
||||
|
||||
## Usage
|
||||
|
||||
Create an instance object and use that object as the dependency in `Depends()`.
|
||||
|
||||
The dependency result will be a string containing the key value.
|
||||
|
||||
## Example
|
||||
|
||||
```python
|
||||
from fastapi import Depends, FastAPI
|
||||
from fastapi.security import APIKeyQuery
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
query_scheme = APIKeyQuery(name="api_key")
|
||||
|
||||
|
||||
@app.get("/items/")
|
||||
async def read_items(api_key: str = Depends(query_scheme)):
|
||||
return {"api_key": api_key}
|
||||
```
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
name: Annotated[
|
||||
str,
|
||||
Doc("Query parameter name."),
|
||||
],
|
||||
scheme_name: Annotated[
|
||||
Optional[str],
|
||||
Doc(
|
||||
"""
|
||||
Security scheme name.
|
||||
|
||||
It will be included in the generated OpenAPI (e.g. visible at `/docs`).
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
description: Annotated[
|
||||
Optional[str],
|
||||
Doc(
|
||||
"""
|
||||
Security scheme description.
|
||||
|
||||
It will be included in the generated OpenAPI (e.g. visible at `/docs`).
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
auto_error: Annotated[
|
||||
bool,
|
||||
Doc(
|
||||
"""
|
||||
By default, if the query parameter is not provided, `APIKeyQuery` will
|
||||
automatically cancel the request and send the client an error.
|
||||
|
||||
If `auto_error` is set to `False`, when the query parameter is not
|
||||
available, instead of erroring out, the dependency result will be
|
||||
`None`.
|
||||
|
||||
This is useful when you want to have optional authentication.
|
||||
|
||||
It is also useful when you want to have authentication that can be
|
||||
provided in one of multiple optional ways (for example, in a query
|
||||
parameter or in an HTTP Bearer token).
|
||||
"""
|
||||
),
|
||||
] = True,
|
||||
):
|
||||
self.model: APIKey = APIKey(
|
||||
**{"in": APIKeyIn.query}, # type: ignore[arg-type]
|
||||
name=name,
|
||||
description=description,
|
||||
)
|
||||
self.scheme_name = scheme_name or self.__class__.__name__
|
||||
self.auto_error = auto_error
|
||||
|
||||
async def __call__(self, request: Request) -> Optional[str]:
|
||||
api_key = request.query_params.get(self.model.name)
|
||||
return self.check_api_key(api_key, self.auto_error)
|
||||
|
||||
|
||||
class APIKeyHeader(APIKeyBase):
|
||||
"""
|
||||
API key authentication using a header.
|
||||
|
||||
This defines the name of the header that should be provided in the request with
|
||||
the API key and integrates that into the OpenAPI documentation. It extracts
|
||||
the key value sent in the header automatically and provides it as the dependency
|
||||
result. But it doesn't define how to send that key to the client.
|
||||
|
||||
## Usage
|
||||
|
||||
Create an instance object and use that object as the dependency in `Depends()`.
|
||||
|
||||
The dependency result will be a string containing the key value.
|
||||
|
||||
## Example
|
||||
|
||||
```python
|
||||
from fastapi import Depends, FastAPI
|
||||
from fastapi.security import APIKeyHeader
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
header_scheme = APIKeyHeader(name="x-key")
|
||||
|
||||
|
||||
@app.get("/items/")
|
||||
async def read_items(key: str = Depends(header_scheme)):
|
||||
return {"key": key}
|
||||
```
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
name: Annotated[str, Doc("Header name.")],
|
||||
scheme_name: Annotated[
|
||||
Optional[str],
|
||||
Doc(
|
||||
"""
|
||||
Security scheme name.
|
||||
|
||||
It will be included in the generated OpenAPI (e.g. visible at `/docs`).
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
description: Annotated[
|
||||
Optional[str],
|
||||
Doc(
|
||||
"""
|
||||
Security scheme description.
|
||||
|
||||
It will be included in the generated OpenAPI (e.g. visible at `/docs`).
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
auto_error: Annotated[
|
||||
bool,
|
||||
Doc(
|
||||
"""
|
||||
By default, if the header is not provided, `APIKeyHeader` will
|
||||
automatically cancel the request and send the client an error.
|
||||
|
||||
If `auto_error` is set to `False`, when the header is not available,
|
||||
instead of erroring out, the dependency result will be `None`.
|
||||
|
||||
This is useful when you want to have optional authentication.
|
||||
|
||||
It is also useful when you want to have authentication that can be
|
||||
provided in one of multiple optional ways (for example, in a header or
|
||||
in an HTTP Bearer token).
|
||||
"""
|
||||
),
|
||||
] = True,
|
||||
):
|
||||
self.model: APIKey = APIKey(
|
||||
**{"in": APIKeyIn.header}, # type: ignore[arg-type]
|
||||
name=name,
|
||||
description=description,
|
||||
)
|
||||
self.scheme_name = scheme_name or self.__class__.__name__
|
||||
self.auto_error = auto_error
|
||||
|
||||
async def __call__(self, request: Request) -> Optional[str]:
|
||||
api_key = request.headers.get(self.model.name)
|
||||
return self.check_api_key(api_key, self.auto_error)
|
||||
|
||||
|
||||
class APIKeyCookie(APIKeyBase):
|
||||
"""
|
||||
API key authentication using a cookie.
|
||||
|
||||
This defines the name of the cookie that should be provided in the request with
|
||||
the API key and integrates that into the OpenAPI documentation. It extracts
|
||||
the key value sent in the cookie automatically and provides it as the dependency
|
||||
result. But it doesn't define how to set that cookie.
|
||||
|
||||
## Usage
|
||||
|
||||
Create an instance object and use that object as the dependency in `Depends()`.
|
||||
|
||||
The dependency result will be a string containing the key value.
|
||||
|
||||
## Example
|
||||
|
||||
```python
|
||||
from fastapi import Depends, FastAPI
|
||||
from fastapi.security import APIKeyCookie
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
cookie_scheme = APIKeyCookie(name="session")
|
||||
|
||||
|
||||
@app.get("/items/")
|
||||
async def read_items(session: str = Depends(cookie_scheme)):
|
||||
return {"session": session}
|
||||
```
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
name: Annotated[str, Doc("Cookie name.")],
|
||||
scheme_name: Annotated[
|
||||
Optional[str],
|
||||
Doc(
|
||||
"""
|
||||
Security scheme name.
|
||||
|
||||
It will be included in the generated OpenAPI (e.g. visible at `/docs`).
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
description: Annotated[
|
||||
Optional[str],
|
||||
Doc(
|
||||
"""
|
||||
Security scheme description.
|
||||
|
||||
It will be included in the generated OpenAPI (e.g. visible at `/docs`).
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
auto_error: Annotated[
|
||||
bool,
|
||||
Doc(
|
||||
"""
|
||||
By default, if the cookie is not provided, `APIKeyCookie` will
|
||||
automatically cancel the request and send the client an error.
|
||||
|
||||
If `auto_error` is set to `False`, when the cookie is not available,
|
||||
instead of erroring out, the dependency result will be `None`.
|
||||
|
||||
This is useful when you want to have optional authentication.
|
||||
|
||||
It is also useful when you want to have authentication that can be
|
||||
provided in one of multiple optional ways (for example, in a cookie or
|
||||
in an HTTP Bearer token).
|
||||
"""
|
||||
),
|
||||
] = True,
|
||||
):
|
||||
self.model: APIKey = APIKey(
|
||||
**{"in": APIKeyIn.cookie}, # type: ignore[arg-type]
|
||||
name=name,
|
||||
description=description,
|
||||
)
|
||||
self.scheme_name = scheme_name or self.__class__.__name__
|
||||
self.auto_error = auto_error
|
||||
|
||||
async def __call__(self, request: Request) -> Optional[str]:
|
||||
api_key = request.cookies.get(self.model.name)
|
||||
return self.check_api_key(api_key, self.auto_error)
|
@ -0,0 +1,6 @@
|
||||
from fastapi.openapi.models import SecurityBase as SecurityBaseModel
|
||||
|
||||
|
||||
class SecurityBase:
|
||||
model: SecurityBaseModel
|
||||
scheme_name: str
|
423
venv/lib/python3.11/site-packages/fastapi/security/http.py
Normal file
423
venv/lib/python3.11/site-packages/fastapi/security/http.py
Normal file
@ -0,0 +1,423 @@
|
||||
import binascii
|
||||
from base64 import b64decode
|
||||
from typing import Optional
|
||||
|
||||
from fastapi.exceptions import HTTPException
|
||||
from fastapi.openapi.models import HTTPBase as HTTPBaseModel
|
||||
from fastapi.openapi.models import HTTPBearer as HTTPBearerModel
|
||||
from fastapi.security.base import SecurityBase
|
||||
from fastapi.security.utils import get_authorization_scheme_param
|
||||
from pydantic import BaseModel
|
||||
from starlette.requests import Request
|
||||
from starlette.status import HTTP_401_UNAUTHORIZED, HTTP_403_FORBIDDEN
|
||||
from typing_extensions import Annotated, Doc
|
||||
|
||||
|
||||
class HTTPBasicCredentials(BaseModel):
|
||||
"""
|
||||
The HTTP Basic credentials given as the result of using `HTTPBasic` in a
|
||||
dependency.
|
||||
|
||||
Read more about it in the
|
||||
[FastAPI docs for HTTP Basic Auth](https://fastapi.tiangolo.com/advanced/security/http-basic-auth/).
|
||||
"""
|
||||
|
||||
username: Annotated[str, Doc("The HTTP Basic username.")]
|
||||
password: Annotated[str, Doc("The HTTP Basic password.")]
|
||||
|
||||
|
||||
class HTTPAuthorizationCredentials(BaseModel):
|
||||
"""
|
||||
The HTTP authorization credentials in the result of using `HTTPBearer` or
|
||||
`HTTPDigest` in a dependency.
|
||||
|
||||
The HTTP authorization header value is split by the first space.
|
||||
|
||||
The first part is the `scheme`, the second part is the `credentials`.
|
||||
|
||||
For example, in an HTTP Bearer token scheme, the client will send a header
|
||||
like:
|
||||
|
||||
```
|
||||
Authorization: Bearer deadbeef12346
|
||||
```
|
||||
|
||||
In this case:
|
||||
|
||||
* `scheme` will have the value `"Bearer"`
|
||||
* `credentials` will have the value `"deadbeef12346"`
|
||||
"""
|
||||
|
||||
scheme: Annotated[
|
||||
str,
|
||||
Doc(
|
||||
"""
|
||||
The HTTP authorization scheme extracted from the header value.
|
||||
"""
|
||||
),
|
||||
]
|
||||
credentials: Annotated[
|
||||
str,
|
||||
Doc(
|
||||
"""
|
||||
The HTTP authorization credentials extracted from the header value.
|
||||
"""
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
class HTTPBase(SecurityBase):
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
scheme: str,
|
||||
scheme_name: Optional[str] = None,
|
||||
description: Optional[str] = None,
|
||||
auto_error: bool = True,
|
||||
):
|
||||
self.model = HTTPBaseModel(scheme=scheme, description=description)
|
||||
self.scheme_name = scheme_name or self.__class__.__name__
|
||||
self.auto_error = auto_error
|
||||
|
||||
async def __call__(
|
||||
self, request: Request
|
||||
) -> Optional[HTTPAuthorizationCredentials]:
|
||||
authorization = request.headers.get("Authorization")
|
||||
scheme, credentials = get_authorization_scheme_param(authorization)
|
||||
if not (authorization and scheme and credentials):
|
||||
if self.auto_error:
|
||||
raise HTTPException(
|
||||
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
|
||||
)
|
||||
else:
|
||||
return None
|
||||
return HTTPAuthorizationCredentials(scheme=scheme, credentials=credentials)
|
||||
|
||||
|
||||
class HTTPBasic(HTTPBase):
|
||||
"""
|
||||
HTTP Basic authentication.
|
||||
|
||||
## Usage
|
||||
|
||||
Create an instance object and use that object as the dependency in `Depends()`.
|
||||
|
||||
The dependency result will be an `HTTPBasicCredentials` object containing the
|
||||
`username` and the `password`.
|
||||
|
||||
Read more about it in the
|
||||
[FastAPI docs for HTTP Basic Auth](https://fastapi.tiangolo.com/advanced/security/http-basic-auth/).
|
||||
|
||||
## Example
|
||||
|
||||
```python
|
||||
from typing import Annotated
|
||||
|
||||
from fastapi import Depends, FastAPI
|
||||
from fastapi.security import HTTPBasic, HTTPBasicCredentials
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
security = HTTPBasic()
|
||||
|
||||
|
||||
@app.get("/users/me")
|
||||
def read_current_user(credentials: Annotated[HTTPBasicCredentials, Depends(security)]):
|
||||
return {"username": credentials.username, "password": credentials.password}
|
||||
```
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
scheme_name: Annotated[
|
||||
Optional[str],
|
||||
Doc(
|
||||
"""
|
||||
Security scheme name.
|
||||
|
||||
It will be included in the generated OpenAPI (e.g. visible at `/docs`).
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
realm: Annotated[
|
||||
Optional[str],
|
||||
Doc(
|
||||
"""
|
||||
HTTP Basic authentication realm.
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
description: Annotated[
|
||||
Optional[str],
|
||||
Doc(
|
||||
"""
|
||||
Security scheme description.
|
||||
|
||||
It will be included in the generated OpenAPI (e.g. visible at `/docs`).
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
auto_error: Annotated[
|
||||
bool,
|
||||
Doc(
|
||||
"""
|
||||
By default, if the HTTP Basic authentication is not provided (a
|
||||
header), `HTTPBasic` will automatically cancel the request and send the
|
||||
client an error.
|
||||
|
||||
If `auto_error` is set to `False`, when the HTTP Basic authentication
|
||||
is not available, instead of erroring out, the dependency result will
|
||||
be `None`.
|
||||
|
||||
This is useful when you want to have optional authentication.
|
||||
|
||||
It is also useful when you want to have authentication that can be
|
||||
provided in one of multiple optional ways (for example, in HTTP Basic
|
||||
authentication or in an HTTP Bearer token).
|
||||
"""
|
||||
),
|
||||
] = True,
|
||||
):
|
||||
self.model = HTTPBaseModel(scheme="basic", description=description)
|
||||
self.scheme_name = scheme_name or self.__class__.__name__
|
||||
self.realm = realm
|
||||
self.auto_error = auto_error
|
||||
|
||||
async def __call__( # type: ignore
|
||||
self, request: Request
|
||||
) -> Optional[HTTPBasicCredentials]:
|
||||
authorization = request.headers.get("Authorization")
|
||||
scheme, param = get_authorization_scheme_param(authorization)
|
||||
if self.realm:
|
||||
unauthorized_headers = {"WWW-Authenticate": f'Basic realm="{self.realm}"'}
|
||||
else:
|
||||
unauthorized_headers = {"WWW-Authenticate": "Basic"}
|
||||
if not authorization or scheme.lower() != "basic":
|
||||
if self.auto_error:
|
||||
raise HTTPException(
|
||||
status_code=HTTP_401_UNAUTHORIZED,
|
||||
detail="Not authenticated",
|
||||
headers=unauthorized_headers,
|
||||
)
|
||||
else:
|
||||
return None
|
||||
invalid_user_credentials_exc = HTTPException(
|
||||
status_code=HTTP_401_UNAUTHORIZED,
|
||||
detail="Invalid authentication credentials",
|
||||
headers=unauthorized_headers,
|
||||
)
|
||||
try:
|
||||
data = b64decode(param).decode("ascii")
|
||||
except (ValueError, UnicodeDecodeError, binascii.Error):
|
||||
raise invalid_user_credentials_exc # noqa: B904
|
||||
username, separator, password = data.partition(":")
|
||||
if not separator:
|
||||
raise invalid_user_credentials_exc
|
||||
return HTTPBasicCredentials(username=username, password=password)
|
||||
|
||||
|
||||
class HTTPBearer(HTTPBase):
|
||||
"""
|
||||
HTTP Bearer token authentication.
|
||||
|
||||
## Usage
|
||||
|
||||
Create an instance object and use that object as the dependency in `Depends()`.
|
||||
|
||||
The dependency result will be an `HTTPAuthorizationCredentials` object containing
|
||||
the `scheme` and the `credentials`.
|
||||
|
||||
## Example
|
||||
|
||||
```python
|
||||
from typing import Annotated
|
||||
|
||||
from fastapi import Depends, FastAPI
|
||||
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
security = HTTPBearer()
|
||||
|
||||
|
||||
@app.get("/users/me")
|
||||
def read_current_user(
|
||||
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)]
|
||||
):
|
||||
return {"scheme": credentials.scheme, "credentials": credentials.credentials}
|
||||
```
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
bearerFormat: Annotated[Optional[str], Doc("Bearer token format.")] = None,
|
||||
scheme_name: Annotated[
|
||||
Optional[str],
|
||||
Doc(
|
||||
"""
|
||||
Security scheme name.
|
||||
|
||||
It will be included in the generated OpenAPI (e.g. visible at `/docs`).
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
description: Annotated[
|
||||
Optional[str],
|
||||
Doc(
|
||||
"""
|
||||
Security scheme description.
|
||||
|
||||
It will be included in the generated OpenAPI (e.g. visible at `/docs`).
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
auto_error: Annotated[
|
||||
bool,
|
||||
Doc(
|
||||
"""
|
||||
By default, if the HTTP Bearer token is not provided (in an
|
||||
`Authorization` header), `HTTPBearer` will automatically cancel the
|
||||
request and send the client an error.
|
||||
|
||||
If `auto_error` is set to `False`, when the HTTP Bearer token
|
||||
is not available, instead of erroring out, the dependency result will
|
||||
be `None`.
|
||||
|
||||
This is useful when you want to have optional authentication.
|
||||
|
||||
It is also useful when you want to have authentication that can be
|
||||
provided in one of multiple optional ways (for example, in an HTTP
|
||||
Bearer token or in a cookie).
|
||||
"""
|
||||
),
|
||||
] = True,
|
||||
):
|
||||
self.model = HTTPBearerModel(bearerFormat=bearerFormat, description=description)
|
||||
self.scheme_name = scheme_name or self.__class__.__name__
|
||||
self.auto_error = auto_error
|
||||
|
||||
async def __call__(
|
||||
self, request: Request
|
||||
) -> Optional[HTTPAuthorizationCredentials]:
|
||||
authorization = request.headers.get("Authorization")
|
||||
scheme, credentials = get_authorization_scheme_param(authorization)
|
||||
if not (authorization and scheme and credentials):
|
||||
if self.auto_error:
|
||||
raise HTTPException(
|
||||
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
|
||||
)
|
||||
else:
|
||||
return None
|
||||
if scheme.lower() != "bearer":
|
||||
if self.auto_error:
|
||||
raise HTTPException(
|
||||
status_code=HTTP_403_FORBIDDEN,
|
||||
detail="Invalid authentication credentials",
|
||||
)
|
||||
else:
|
||||
return None
|
||||
return HTTPAuthorizationCredentials(scheme=scheme, credentials=credentials)
|
||||
|
||||
|
||||
class HTTPDigest(HTTPBase):
|
||||
"""
|
||||
HTTP Digest authentication.
|
||||
|
||||
## Usage
|
||||
|
||||
Create an instance object and use that object as the dependency in `Depends()`.
|
||||
|
||||
The dependency result will be an `HTTPAuthorizationCredentials` object containing
|
||||
the `scheme` and the `credentials`.
|
||||
|
||||
## Example
|
||||
|
||||
```python
|
||||
from typing import Annotated
|
||||
|
||||
from fastapi import Depends, FastAPI
|
||||
from fastapi.security import HTTPAuthorizationCredentials, HTTPDigest
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
security = HTTPDigest()
|
||||
|
||||
|
||||
@app.get("/users/me")
|
||||
def read_current_user(
|
||||
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)]
|
||||
):
|
||||
return {"scheme": credentials.scheme, "credentials": credentials.credentials}
|
||||
```
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
scheme_name: Annotated[
|
||||
Optional[str],
|
||||
Doc(
|
||||
"""
|
||||
Security scheme name.
|
||||
|
||||
It will be included in the generated OpenAPI (e.g. visible at `/docs`).
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
description: Annotated[
|
||||
Optional[str],
|
||||
Doc(
|
||||
"""
|
||||
Security scheme description.
|
||||
|
||||
It will be included in the generated OpenAPI (e.g. visible at `/docs`).
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
auto_error: Annotated[
|
||||
bool,
|
||||
Doc(
|
||||
"""
|
||||
By default, if the HTTP Digest is not provided, `HTTPDigest` will
|
||||
automatically cancel the request and send the client an error.
|
||||
|
||||
If `auto_error` is set to `False`, when the HTTP Digest is not
|
||||
available, instead of erroring out, the dependency result will
|
||||
be `None`.
|
||||
|
||||
This is useful when you want to have optional authentication.
|
||||
|
||||
It is also useful when you want to have authentication that can be
|
||||
provided in one of multiple optional ways (for example, in HTTP
|
||||
Digest or in a cookie).
|
||||
"""
|
||||
),
|
||||
] = True,
|
||||
):
|
||||
self.model = HTTPBaseModel(scheme="digest", description=description)
|
||||
self.scheme_name = scheme_name or self.__class__.__name__
|
||||
self.auto_error = auto_error
|
||||
|
||||
async def __call__(
|
||||
self, request: Request
|
||||
) -> Optional[HTTPAuthorizationCredentials]:
|
||||
authorization = request.headers.get("Authorization")
|
||||
scheme, credentials = get_authorization_scheme_param(authorization)
|
||||
if not (authorization and scheme and credentials):
|
||||
if self.auto_error:
|
||||
raise HTTPException(
|
||||
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
|
||||
)
|
||||
else:
|
||||
return None
|
||||
if scheme.lower() != "digest":
|
||||
if self.auto_error:
|
||||
raise HTTPException(
|
||||
status_code=HTTP_403_FORBIDDEN,
|
||||
detail="Invalid authentication credentials",
|
||||
)
|
||||
else:
|
||||
return None
|
||||
return HTTPAuthorizationCredentials(scheme=scheme, credentials=credentials)
|
638
venv/lib/python3.11/site-packages/fastapi/security/oauth2.py
Normal file
638
venv/lib/python3.11/site-packages/fastapi/security/oauth2.py
Normal file
@ -0,0 +1,638 @@
|
||||
from typing import Any, Dict, List, Optional, Union, cast
|
||||
|
||||
from fastapi.exceptions import HTTPException
|
||||
from fastapi.openapi.models import OAuth2 as OAuth2Model
|
||||
from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel
|
||||
from fastapi.param_functions import Form
|
||||
from fastapi.security.base import SecurityBase
|
||||
from fastapi.security.utils import get_authorization_scheme_param
|
||||
from starlette.requests import Request
|
||||
from starlette.status import HTTP_401_UNAUTHORIZED, HTTP_403_FORBIDDEN
|
||||
|
||||
# TODO: import from typing when deprecating Python 3.9
|
||||
from typing_extensions import Annotated, Doc
|
||||
|
||||
|
||||
class OAuth2PasswordRequestForm:
|
||||
"""
|
||||
This is a dependency class to collect the `username` and `password` as form data
|
||||
for an OAuth2 password flow.
|
||||
|
||||
The OAuth2 specification dictates that for a password flow the data should be
|
||||
collected using form data (instead of JSON) and that it should have the specific
|
||||
fields `username` and `password`.
|
||||
|
||||
All the initialization parameters are extracted from the request.
|
||||
|
||||
Read more about it in the
|
||||
[FastAPI docs for Simple OAuth2 with Password and Bearer](https://fastapi.tiangolo.com/tutorial/security/simple-oauth2/).
|
||||
|
||||
## Example
|
||||
|
||||
```python
|
||||
from typing import Annotated
|
||||
|
||||
from fastapi import Depends, FastAPI
|
||||
from fastapi.security import OAuth2PasswordRequestForm
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
@app.post("/login")
|
||||
def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
|
||||
data = {}
|
||||
data["scopes"] = []
|
||||
for scope in form_data.scopes:
|
||||
data["scopes"].append(scope)
|
||||
if form_data.client_id:
|
||||
data["client_id"] = form_data.client_id
|
||||
if form_data.client_secret:
|
||||
data["client_secret"] = form_data.client_secret
|
||||
return data
|
||||
```
|
||||
|
||||
Note that for OAuth2 the scope `items:read` is a single scope in an opaque string.
|
||||
You could have custom internal logic to separate it by colon characters (`:`) or
|
||||
similar, and get the two parts `items` and `read`. Many applications do that to
|
||||
group and organize permissions, you could do it as well in your application, just
|
||||
know that that it is application specific, it's not part of the specification.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
grant_type: Annotated[
|
||||
Union[str, None],
|
||||
Form(pattern="^password$"),
|
||||
Doc(
|
||||
"""
|
||||
The OAuth2 spec says it is required and MUST be the fixed string
|
||||
"password". Nevertheless, this dependency class is permissive and
|
||||
allows not passing it. If you want to enforce it, use instead the
|
||||
`OAuth2PasswordRequestFormStrict` dependency.
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
username: Annotated[
|
||||
str,
|
||||
Form(),
|
||||
Doc(
|
||||
"""
|
||||
`username` string. The OAuth2 spec requires the exact field name
|
||||
`username`.
|
||||
"""
|
||||
),
|
||||
],
|
||||
password: Annotated[
|
||||
str,
|
||||
Form(),
|
||||
Doc(
|
||||
"""
|
||||
`password` string. The OAuth2 spec requires the exact field name
|
||||
`password".
|
||||
"""
|
||||
),
|
||||
],
|
||||
scope: Annotated[
|
||||
str,
|
||||
Form(),
|
||||
Doc(
|
||||
"""
|
||||
A single string with actually several scopes separated by spaces. Each
|
||||
scope is also a string.
|
||||
|
||||
For example, a single string with:
|
||||
|
||||
```python
|
||||
"items:read items:write users:read profile openid"
|
||||
````
|
||||
|
||||
would represent the scopes:
|
||||
|
||||
* `items:read`
|
||||
* `items:write`
|
||||
* `users:read`
|
||||
* `profile`
|
||||
* `openid`
|
||||
"""
|
||||
),
|
||||
] = "",
|
||||
client_id: Annotated[
|
||||
Union[str, None],
|
||||
Form(),
|
||||
Doc(
|
||||
"""
|
||||
If there's a `client_id`, it can be sent as part of the form fields.
|
||||
But the OAuth2 specification recommends sending the `client_id` and
|
||||
`client_secret` (if any) using HTTP Basic auth.
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
client_secret: Annotated[
|
||||
Union[str, None],
|
||||
Form(),
|
||||
Doc(
|
||||
"""
|
||||
If there's a `client_password` (and a `client_id`), they can be sent
|
||||
as part of the form fields. But the OAuth2 specification recommends
|
||||
sending the `client_id` and `client_secret` (if any) using HTTP Basic
|
||||
auth.
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
):
|
||||
self.grant_type = grant_type
|
||||
self.username = username
|
||||
self.password = password
|
||||
self.scopes = scope.split()
|
||||
self.client_id = client_id
|
||||
self.client_secret = client_secret
|
||||
|
||||
|
||||
class OAuth2PasswordRequestFormStrict(OAuth2PasswordRequestForm):
|
||||
"""
|
||||
This is a dependency class to collect the `username` and `password` as form data
|
||||
for an OAuth2 password flow.
|
||||
|
||||
The OAuth2 specification dictates that for a password flow the data should be
|
||||
collected using form data (instead of JSON) and that it should have the specific
|
||||
fields `username` and `password`.
|
||||
|
||||
All the initialization parameters are extracted from the request.
|
||||
|
||||
The only difference between `OAuth2PasswordRequestFormStrict` and
|
||||
`OAuth2PasswordRequestForm` is that `OAuth2PasswordRequestFormStrict` requires the
|
||||
client to send the form field `grant_type` with the value `"password"`, which
|
||||
is required in the OAuth2 specification (it seems that for no particular reason),
|
||||
while for `OAuth2PasswordRequestForm` `grant_type` is optional.
|
||||
|
||||
Read more about it in the
|
||||
[FastAPI docs for Simple OAuth2 with Password and Bearer](https://fastapi.tiangolo.com/tutorial/security/simple-oauth2/).
|
||||
|
||||
## Example
|
||||
|
||||
```python
|
||||
from typing import Annotated
|
||||
|
||||
from fastapi import Depends, FastAPI
|
||||
from fastapi.security import OAuth2PasswordRequestForm
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
@app.post("/login")
|
||||
def login(form_data: Annotated[OAuth2PasswordRequestFormStrict, Depends()]):
|
||||
data = {}
|
||||
data["scopes"] = []
|
||||
for scope in form_data.scopes:
|
||||
data["scopes"].append(scope)
|
||||
if form_data.client_id:
|
||||
data["client_id"] = form_data.client_id
|
||||
if form_data.client_secret:
|
||||
data["client_secret"] = form_data.client_secret
|
||||
return data
|
||||
```
|
||||
|
||||
Note that for OAuth2 the scope `items:read` is a single scope in an opaque string.
|
||||
You could have custom internal logic to separate it by colon characters (`:`) or
|
||||
similar, and get the two parts `items` and `read`. Many applications do that to
|
||||
group and organize permissions, you could do it as well in your application, just
|
||||
know that that it is application specific, it's not part of the specification.
|
||||
|
||||
|
||||
grant_type: the OAuth2 spec says it is required and MUST be the fixed string "password".
|
||||
This dependency is strict about it. If you want to be permissive, use instead the
|
||||
OAuth2PasswordRequestForm dependency class.
|
||||
username: username string. The OAuth2 spec requires the exact field name "username".
|
||||
password: password string. The OAuth2 spec requires the exact field name "password".
|
||||
scope: Optional string. Several scopes (each one a string) separated by spaces. E.g.
|
||||
"items:read items:write users:read profile openid"
|
||||
client_id: optional string. OAuth2 recommends sending the client_id and client_secret (if any)
|
||||
using HTTP Basic auth, as: client_id:client_secret
|
||||
client_secret: optional string. OAuth2 recommends sending the client_id and client_secret (if any)
|
||||
using HTTP Basic auth, as: client_id:client_secret
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
grant_type: Annotated[
|
||||
str,
|
||||
Form(pattern="^password$"),
|
||||
Doc(
|
||||
"""
|
||||
The OAuth2 spec says it is required and MUST be the fixed string
|
||||
"password". This dependency is strict about it. If you want to be
|
||||
permissive, use instead the `OAuth2PasswordRequestForm` dependency
|
||||
class.
|
||||
"""
|
||||
),
|
||||
],
|
||||
username: Annotated[
|
||||
str,
|
||||
Form(),
|
||||
Doc(
|
||||
"""
|
||||
`username` string. The OAuth2 spec requires the exact field name
|
||||
`username`.
|
||||
"""
|
||||
),
|
||||
],
|
||||
password: Annotated[
|
||||
str,
|
||||
Form(),
|
||||
Doc(
|
||||
"""
|
||||
`password` string. The OAuth2 spec requires the exact field name
|
||||
`password".
|
||||
"""
|
||||
),
|
||||
],
|
||||
scope: Annotated[
|
||||
str,
|
||||
Form(),
|
||||
Doc(
|
||||
"""
|
||||
A single string with actually several scopes separated by spaces. Each
|
||||
scope is also a string.
|
||||
|
||||
For example, a single string with:
|
||||
|
||||
```python
|
||||
"items:read items:write users:read profile openid"
|
||||
````
|
||||
|
||||
would represent the scopes:
|
||||
|
||||
* `items:read`
|
||||
* `items:write`
|
||||
* `users:read`
|
||||
* `profile`
|
||||
* `openid`
|
||||
"""
|
||||
),
|
||||
] = "",
|
||||
client_id: Annotated[
|
||||
Union[str, None],
|
||||
Form(),
|
||||
Doc(
|
||||
"""
|
||||
If there's a `client_id`, it can be sent as part of the form fields.
|
||||
But the OAuth2 specification recommends sending the `client_id` and
|
||||
`client_secret` (if any) using HTTP Basic auth.
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
client_secret: Annotated[
|
||||
Union[str, None],
|
||||
Form(),
|
||||
Doc(
|
||||
"""
|
||||
If there's a `client_password` (and a `client_id`), they can be sent
|
||||
as part of the form fields. But the OAuth2 specification recommends
|
||||
sending the `client_id` and `client_secret` (if any) using HTTP Basic
|
||||
auth.
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
):
|
||||
super().__init__(
|
||||
grant_type=grant_type,
|
||||
username=username,
|
||||
password=password,
|
||||
scope=scope,
|
||||
client_id=client_id,
|
||||
client_secret=client_secret,
|
||||
)
|
||||
|
||||
|
||||
class OAuth2(SecurityBase):
|
||||
"""
|
||||
This is the base class for OAuth2 authentication, an instance of it would be used
|
||||
as a dependency. All other OAuth2 classes inherit from it and customize it for
|
||||
each OAuth2 flow.
|
||||
|
||||
You normally would not create a new class inheriting from it but use one of the
|
||||
existing subclasses, and maybe compose them if you want to support multiple flows.
|
||||
|
||||
Read more about it in the
|
||||
[FastAPI docs for Security](https://fastapi.tiangolo.com/tutorial/security/).
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
flows: Annotated[
|
||||
Union[OAuthFlowsModel, Dict[str, Dict[str, Any]]],
|
||||
Doc(
|
||||
"""
|
||||
The dictionary of OAuth2 flows.
|
||||
"""
|
||||
),
|
||||
] = OAuthFlowsModel(),
|
||||
scheme_name: Annotated[
|
||||
Optional[str],
|
||||
Doc(
|
||||
"""
|
||||
Security scheme name.
|
||||
|
||||
It will be included in the generated OpenAPI (e.g. visible at `/docs`).
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
description: Annotated[
|
||||
Optional[str],
|
||||
Doc(
|
||||
"""
|
||||
Security scheme description.
|
||||
|
||||
It will be included in the generated OpenAPI (e.g. visible at `/docs`).
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
auto_error: Annotated[
|
||||
bool,
|
||||
Doc(
|
||||
"""
|
||||
By default, if no HTTP Authorization header is provided, required for
|
||||
OAuth2 authentication, it will automatically cancel the request and
|
||||
send the client an error.
|
||||
|
||||
If `auto_error` is set to `False`, when the HTTP Authorization header
|
||||
is not available, instead of erroring out, the dependency result will
|
||||
be `None`.
|
||||
|
||||
This is useful when you want to have optional authentication.
|
||||
|
||||
It is also useful when you want to have authentication that can be
|
||||
provided in one of multiple optional ways (for example, with OAuth2
|
||||
or in a cookie).
|
||||
"""
|
||||
),
|
||||
] = True,
|
||||
):
|
||||
self.model = OAuth2Model(
|
||||
flows=cast(OAuthFlowsModel, flows), description=description
|
||||
)
|
||||
self.scheme_name = scheme_name or self.__class__.__name__
|
||||
self.auto_error = auto_error
|
||||
|
||||
async def __call__(self, request: Request) -> Optional[str]:
|
||||
authorization = request.headers.get("Authorization")
|
||||
if not authorization:
|
||||
if self.auto_error:
|
||||
raise HTTPException(
|
||||
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
|
||||
)
|
||||
else:
|
||||
return None
|
||||
return authorization
|
||||
|
||||
|
||||
class OAuth2PasswordBearer(OAuth2):
|
||||
"""
|
||||
OAuth2 flow for authentication using a bearer token obtained with a password.
|
||||
An instance of it would be used as a dependency.
|
||||
|
||||
Read more about it in the
|
||||
[FastAPI docs for Simple OAuth2 with Password and Bearer](https://fastapi.tiangolo.com/tutorial/security/simple-oauth2/).
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
tokenUrl: Annotated[
|
||||
str,
|
||||
Doc(
|
||||
"""
|
||||
The URL to obtain the OAuth2 token. This would be the *path operation*
|
||||
that has `OAuth2PasswordRequestForm` as a dependency.
|
||||
"""
|
||||
),
|
||||
],
|
||||
scheme_name: Annotated[
|
||||
Optional[str],
|
||||
Doc(
|
||||
"""
|
||||
Security scheme name.
|
||||
|
||||
It will be included in the generated OpenAPI (e.g. visible at `/docs`).
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
scopes: Annotated[
|
||||
Optional[Dict[str, str]],
|
||||
Doc(
|
||||
"""
|
||||
The OAuth2 scopes that would be required by the *path operations* that
|
||||
use this dependency.
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
description: Annotated[
|
||||
Optional[str],
|
||||
Doc(
|
||||
"""
|
||||
Security scheme description.
|
||||
|
||||
It will be included in the generated OpenAPI (e.g. visible at `/docs`).
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
auto_error: Annotated[
|
||||
bool,
|
||||
Doc(
|
||||
"""
|
||||
By default, if no HTTP Authorization header is provided, required for
|
||||
OAuth2 authentication, it will automatically cancel the request and
|
||||
send the client an error.
|
||||
|
||||
If `auto_error` is set to `False`, when the HTTP Authorization header
|
||||
is not available, instead of erroring out, the dependency result will
|
||||
be `None`.
|
||||
|
||||
This is useful when you want to have optional authentication.
|
||||
|
||||
It is also useful when you want to have authentication that can be
|
||||
provided in one of multiple optional ways (for example, with OAuth2
|
||||
or in a cookie).
|
||||
"""
|
||||
),
|
||||
] = True,
|
||||
):
|
||||
if not scopes:
|
||||
scopes = {}
|
||||
flows = OAuthFlowsModel(
|
||||
password=cast(Any, {"tokenUrl": tokenUrl, "scopes": scopes})
|
||||
)
|
||||
super().__init__(
|
||||
flows=flows,
|
||||
scheme_name=scheme_name,
|
||||
description=description,
|
||||
auto_error=auto_error,
|
||||
)
|
||||
|
||||
async def __call__(self, request: Request) -> Optional[str]:
|
||||
authorization = request.headers.get("Authorization")
|
||||
scheme, param = get_authorization_scheme_param(authorization)
|
||||
if not authorization or scheme.lower() != "bearer":
|
||||
if self.auto_error:
|
||||
raise HTTPException(
|
||||
status_code=HTTP_401_UNAUTHORIZED,
|
||||
detail="Not authenticated",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
else:
|
||||
return None
|
||||
return param
|
||||
|
||||
|
||||
class OAuth2AuthorizationCodeBearer(OAuth2):
|
||||
"""
|
||||
OAuth2 flow for authentication using a bearer token obtained with an OAuth2 code
|
||||
flow. An instance of it would be used as a dependency.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
authorizationUrl: str,
|
||||
tokenUrl: Annotated[
|
||||
str,
|
||||
Doc(
|
||||
"""
|
||||
The URL to obtain the OAuth2 token.
|
||||
"""
|
||||
),
|
||||
],
|
||||
refreshUrl: Annotated[
|
||||
Optional[str],
|
||||
Doc(
|
||||
"""
|
||||
The URL to refresh the token and obtain a new one.
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
scheme_name: Annotated[
|
||||
Optional[str],
|
||||
Doc(
|
||||
"""
|
||||
Security scheme name.
|
||||
|
||||
It will be included in the generated OpenAPI (e.g. visible at `/docs`).
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
scopes: Annotated[
|
||||
Optional[Dict[str, str]],
|
||||
Doc(
|
||||
"""
|
||||
The OAuth2 scopes that would be required by the *path operations* that
|
||||
use this dependency.
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
description: Annotated[
|
||||
Optional[str],
|
||||
Doc(
|
||||
"""
|
||||
Security scheme description.
|
||||
|
||||
It will be included in the generated OpenAPI (e.g. visible at `/docs`).
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
auto_error: Annotated[
|
||||
bool,
|
||||
Doc(
|
||||
"""
|
||||
By default, if no HTTP Authorization header is provided, required for
|
||||
OAuth2 authentication, it will automatically cancel the request and
|
||||
send the client an error.
|
||||
|
||||
If `auto_error` is set to `False`, when the HTTP Authorization header
|
||||
is not available, instead of erroring out, the dependency result will
|
||||
be `None`.
|
||||
|
||||
This is useful when you want to have optional authentication.
|
||||
|
||||
It is also useful when you want to have authentication that can be
|
||||
provided in one of multiple optional ways (for example, with OAuth2
|
||||
or in a cookie).
|
||||
"""
|
||||
),
|
||||
] = True,
|
||||
):
|
||||
if not scopes:
|
||||
scopes = {}
|
||||
flows = OAuthFlowsModel(
|
||||
authorizationCode=cast(
|
||||
Any,
|
||||
{
|
||||
"authorizationUrl": authorizationUrl,
|
||||
"tokenUrl": tokenUrl,
|
||||
"refreshUrl": refreshUrl,
|
||||
"scopes": scopes,
|
||||
},
|
||||
)
|
||||
)
|
||||
super().__init__(
|
||||
flows=flows,
|
||||
scheme_name=scheme_name,
|
||||
description=description,
|
||||
auto_error=auto_error,
|
||||
)
|
||||
|
||||
async def __call__(self, request: Request) -> Optional[str]:
|
||||
authorization = request.headers.get("Authorization")
|
||||
scheme, param = get_authorization_scheme_param(authorization)
|
||||
if not authorization or scheme.lower() != "bearer":
|
||||
if self.auto_error:
|
||||
raise HTTPException(
|
||||
status_code=HTTP_401_UNAUTHORIZED,
|
||||
detail="Not authenticated",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
else:
|
||||
return None # pragma: nocover
|
||||
return param
|
||||
|
||||
|
||||
class SecurityScopes:
|
||||
"""
|
||||
This is a special class that you can define in a parameter in a dependency to
|
||||
obtain the OAuth2 scopes required by all the dependencies in the same chain.
|
||||
|
||||
This way, multiple dependencies can have different scopes, even when used in the
|
||||
same *path operation*. And with this, you can access all the scopes required in
|
||||
all those dependencies in a single place.
|
||||
|
||||
Read more about it in the
|
||||
[FastAPI docs for OAuth2 scopes](https://fastapi.tiangolo.com/advanced/security/oauth2-scopes/).
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
scopes: Annotated[
|
||||
Optional[List[str]],
|
||||
Doc(
|
||||
"""
|
||||
This will be filled by FastAPI.
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
):
|
||||
self.scopes: Annotated[
|
||||
List[str],
|
||||
Doc(
|
||||
"""
|
||||
The list of all the scopes required by dependencies.
|
||||
"""
|
||||
),
|
||||
] = scopes or []
|
||||
self.scope_str: Annotated[
|
||||
str,
|
||||
Doc(
|
||||
"""
|
||||
All the scopes required by all the dependencies in a single string
|
||||
separated by spaces, as defined in the OAuth2 specification.
|
||||
"""
|
||||
),
|
||||
] = " ".join(self.scopes)
|
@ -0,0 +1,84 @@
|
||||
from typing import Optional
|
||||
|
||||
from fastapi.openapi.models import OpenIdConnect as OpenIdConnectModel
|
||||
from fastapi.security.base import SecurityBase
|
||||
from starlette.exceptions import HTTPException
|
||||
from starlette.requests import Request
|
||||
from starlette.status import HTTP_403_FORBIDDEN
|
||||
from typing_extensions import Annotated, Doc
|
||||
|
||||
|
||||
class OpenIdConnect(SecurityBase):
|
||||
"""
|
||||
OpenID Connect authentication class. An instance of it would be used as a
|
||||
dependency.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
openIdConnectUrl: Annotated[
|
||||
str,
|
||||
Doc(
|
||||
"""
|
||||
The OpenID Connect URL.
|
||||
"""
|
||||
),
|
||||
],
|
||||
scheme_name: Annotated[
|
||||
Optional[str],
|
||||
Doc(
|
||||
"""
|
||||
Security scheme name.
|
||||
|
||||
It will be included in the generated OpenAPI (e.g. visible at `/docs`).
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
description: Annotated[
|
||||
Optional[str],
|
||||
Doc(
|
||||
"""
|
||||
Security scheme description.
|
||||
|
||||
It will be included in the generated OpenAPI (e.g. visible at `/docs`).
|
||||
"""
|
||||
),
|
||||
] = None,
|
||||
auto_error: Annotated[
|
||||
bool,
|
||||
Doc(
|
||||
"""
|
||||
By default, if no HTTP Authorization header is provided, required for
|
||||
OpenID Connect authentication, it will automatically cancel the request
|
||||
and send the client an error.
|
||||
|
||||
If `auto_error` is set to `False`, when the HTTP Authorization header
|
||||
is not available, instead of erroring out, the dependency result will
|
||||
be `None`.
|
||||
|
||||
This is useful when you want to have optional authentication.
|
||||
|
||||
It is also useful when you want to have authentication that can be
|
||||
provided in one of multiple optional ways (for example, with OpenID
|
||||
Connect or in a cookie).
|
||||
"""
|
||||
),
|
||||
] = True,
|
||||
):
|
||||
self.model = OpenIdConnectModel(
|
||||
openIdConnectUrl=openIdConnectUrl, description=description
|
||||
)
|
||||
self.scheme_name = scheme_name or self.__class__.__name__
|
||||
self.auto_error = auto_error
|
||||
|
||||
async def __call__(self, request: Request) -> Optional[str]:
|
||||
authorization = request.headers.get("Authorization")
|
||||
if not authorization:
|
||||
if self.auto_error:
|
||||
raise HTTPException(
|
||||
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
|
||||
)
|
||||
else:
|
||||
return None
|
||||
return authorization
|
10
venv/lib/python3.11/site-packages/fastapi/security/utils.py
Normal file
10
venv/lib/python3.11/site-packages/fastapi/security/utils.py
Normal file
@ -0,0 +1,10 @@
|
||||
from typing import Optional, Tuple
|
||||
|
||||
|
||||
def get_authorization_scheme_param(
|
||||
authorization_header_value: Optional[str],
|
||||
) -> Tuple[str, str]:
|
||||
if not authorization_header_value:
|
||||
return "", ""
|
||||
scheme, _, param = authorization_header_value.partition(" ")
|
||||
return scheme, param
|
Reference in New Issue
Block a user