Source code for fermilink.web.storage_helpers
from __future__ import annotations
from pathlib import Path
import aiofiles
from chainlit.data.storage_clients.base import BaseStorageClient
def _normalize_subdir(value: str) -> str:
"""Normalize a relative storage subdirectory string."""
cleaned = (value or "").replace("\\", "/")
parts = [part for part in cleaned.split("/") if part and part != "."]
return "/".join(parts)
def _join_url(root_path: str, path: str) -> str:
"""Join a root URL path prefix with a child path."""
root = (root_path or "").rstrip("/")
tail = "/" + path.lstrip("/")
return f"{root}{tail}" if root else tail
[docs]
class LocalPublicStorageClient(BaseStorageClient):
"""Chainlit storage client that persists artifacts under `/public`."""
[docs]
def __init__(self, public_root: Path, subdir: str, root_path: str):
self.public_root = public_root
self.subdir = _normalize_subdir(subdir) or ".chainlit/artifacts"
self.base_dir = (self.public_root / self.subdir).resolve()
self.base_dir.mkdir(parents=True, exist_ok=True)
self.public_url_prefix = _join_url(root_path, "/public")
def _resolve_path(self, object_key: str) -> Path:
key = str(object_key or "").lstrip("/").replace("\\", "/")
path = (self.base_dir / key).resolve()
try:
path.relative_to(self.base_dir)
except ValueError as exc:
raise ValueError("Invalid object key") from exc
return path
def _url_for_key(self, object_key: str) -> str:
key = str(object_key or "").lstrip("/").replace("\\", "/")
rel = f"{self.subdir}/{key}" if self.subdir else key
return f"{self.public_url_prefix}/{rel}"
[docs]
async def upload_file(
self,
object_key: str,
data: bytes | str,
mime: str = "application/octet-stream",
overwrite: bool = True,
content_disposition: str | None = None,
) -> dict:
_ = mime
_ = content_disposition
path = self._resolve_path(object_key)
path.parent.mkdir(parents=True, exist_ok=True)
if not overwrite and path.exists():
return {"object_key": object_key, "url": self._url_for_key(object_key)}
if isinstance(data, str):
data = data.encode("utf-8")
async with aiofiles.open(path, "wb") as handle:
await handle.write(data)
return {"object_key": object_key, "url": self._url_for_key(object_key)}
[docs]
async def delete_file(self, object_key: str) -> bool:
path = self._resolve_path(object_key)
try:
path.unlink()
except FileNotFoundError:
return True
return True
[docs]
async def get_read_url(self, object_key: str) -> str:
return self._url_for_key(object_key)
[docs]
async def close(self) -> None:
return None
def _resolve_public_root(
*,
configured_public_dir: str,
app_root: Path,
package_public_root: Path,
is_router_only_import: bool,
) -> Path:
"""Resolve effective Chainlit public root and seed packaged assets if needed."""
configured = Path(configured_public_dir).expanduser()
if not configured.is_absolute():
configured = (app_root / configured).resolve()
if is_router_only_import:
if package_public_root.is_dir():
return package_public_root
return configured
try:
configured.mkdir(parents=True, exist_ok=True)
except OSError:
if package_public_root.is_dir():
return package_public_root
return configured
if package_public_root.is_dir():
for asset in package_public_root.iterdir():
target = configured / asset.name
if target.exists():
continue
if asset.is_file():
try:
target.write_bytes(asset.read_bytes())
except OSError:
continue
return configured
def _build_storage_provider(
*, subdir: str, public_root: Path, root_path: str
) -> BaseStorageClient:
"""Instantiate the local public storage provider for Chainlit."""
return LocalPublicStorageClient(
public_root=public_root,
subdir=subdir,
root_path=root_path,
)