Build your own backend for specialized storage requirements
Consider building a custom backend when you need:
All backends must implement the AVP Backend interface. Here's the interface definition for each language:
from abc import ABC, abstractmethod
from typing import Optional, Dict, List
from dataclasses import dataclass
from datetime import datetime
@dataclass
class DiscoverInfo:
version: str
backends: List[str]
operations: List[str]
extensions: List[str]
@dataclass
class Session:
session_id: str
workspace: str
expires_at: datetime
@dataclass
class Secret:
name: str
value: bytes
version: int
labels: Dict[str, str]
created_at: datetime
updated_at: datetime
@dataclass
class SecretMetadata:
name: str
version: int
labels: Dict[str, str]
@dataclass
class RotateResult:
previous_version: int
new_version: int
class Backend(ABC):
@abstractmethod
def discover(self) -> DiscoverInfo:
"""Return backend capabilities."""
pass
@abstractmethod
def authenticate(self, workspace: str, ttl_seconds: Optional[int] = None) -> Session:
"""Create an authenticated session."""
pass
@abstractmethod
def store(self, session_id: str, name: str, value: bytes,
labels: Optional[Dict[str, str]] = None,
expires_at: Optional[datetime] = None) -> None:
"""Store a secret."""
pass
@abstractmethod
def retrieve(self, session_id: str, name: str,
version: Optional[int] = None) -> Secret:
"""Retrieve a secret."""
pass
@abstractmethod
def delete(self, session_id: str, name: str) -> None:
"""Delete a secret."""
pass
@abstractmethod
def list(self, session_id: str,
label_filter: Optional[Dict[str, str]] = None) -> List[SecretMetadata]:
"""List secrets in the workspace."""
pass
@abstractmethod
def rotate(self, session_id: str, name: str, new_value: bytes) -> RotateResult:
"""Rotate a secret value."""
pass
interface DiscoverInfo {
version: string;
backends: string[];
operations: string[];
extensions: string[];
}
interface Session {
sessionId: string;
workspace: string;
expiresAt: Date;
}
interface Secret {
name: string;
value: Buffer;
version: number;
labels: Record<string, string>;
createdAt: Date;
updatedAt: Date;
}
interface SecretMetadata {
name: string;
version: number;
labels: Record<string, string>;
}
interface RotateResult {
previousVersion: number;
newVersion: number;
}
interface Backend {
discover(): Promise<DiscoverInfo>;
authenticate(workspace: string, ttlSeconds?: number): Promise<Session>;
store(sessionId: string, name: string, value: Buffer,
labels?: Record<string, string>, expiresAt?: Date): Promise<void>;
retrieve(sessionId: string, name: string, version?: number): Promise<Secret>;
delete(sessionId: string, name: string): Promise<void>;
list(sessionId: string, labelFilter?: Record<string, string>): Promise<SecretMetadata[]>;
rotate(sessionId: string, name: string, newValue: Buffer): Promise<RotateResult>;
}
Here's a complete example of a custom Redis backend in Python:
"""Redis backend for AVP - stores encrypted secrets in Redis."""
import json
import secrets
import hashlib
from datetime import datetime, timedelta
from typing import Optional, Dict, List
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import redis
from avp import Backend, DiscoverInfo, Session, Secret, SecretMetadata, RotateResult
from avp.errors import NotFoundError, SessionExpiredError, UnauthorizedError
class RedisBackend(Backend):
"""AVP backend using Redis for storage with client-side encryption."""
def __init__(self, redis_url: str, encryption_key: str):
"""
Initialize Redis backend.
Args:
redis_url: Redis connection URL (redis://localhost:6379/0)
encryption_key: Master encryption key for secrets
"""
self.redis = redis.from_url(redis_url)
self.fernet = self._derive_fernet(encryption_key)
def _derive_fernet(self, password: str) -> Fernet:
"""Derive Fernet key from password."""
# Use a fixed salt for deterministic key derivation
# In production, store salt separately
salt = b'avp-redis-backend-salt-v1'
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=480000,
)
key = base64.urlsafe_b64encode(kdf.derive(password.encode()))
return Fernet(key)
def _encrypt(self, data: bytes) -> bytes:
"""Encrypt data."""
return self.fernet.encrypt(data)
def _decrypt(self, data: bytes) -> bytes:
"""Decrypt data."""
return self.fernet.decrypt(data)
def _session_key(self, session_id: str) -> str:
return f"avp:session:{session_id}"
def _secret_key(self, workspace: str, name: str) -> str:
return f"avp:secret:{workspace}:{name}"
def _workspace_key(self, workspace: str) -> str:
return f"avp:workspace:{workspace}"
def discover(self) -> DiscoverInfo:
return DiscoverInfo(
version="0.1.0",
backends=["redis"],
operations=["DISCOVER", "AUTHENTICATE", "STORE", "RETRIEVE",
"DELETE", "LIST", "ROTATE"],
extensions=[]
)
def authenticate(self, workspace: str, ttl_seconds: Optional[int] = None) -> Session:
ttl = ttl_seconds or 3600
session_id = secrets.token_urlsafe(32)
expires_at = datetime.utcnow() + timedelta(seconds=ttl)
session_data = {
"workspace": workspace,
"expires_at": expires_at.isoformat()
}
# Store session with TTL
self.redis.setex(
self._session_key(session_id),
ttl,
json.dumps(session_data)
)
return Session(
session_id=session_id,
workspace=workspace,
expires_at=expires_at
)
def _get_session(self, session_id: str) -> dict:
"""Get and validate session."""
data = self.redis.get(self._session_key(session_id))
if not data:
raise SessionExpiredError("Session not found or expired")
session = json.loads(data)
expires_at = datetime.fromisoformat(session["expires_at"])
if datetime.utcnow() > expires_at:
raise SessionExpiredError("Session expired")
return session
def store(self, session_id: str, name: str, value: bytes,
labels: Optional[Dict[str, str]] = None,
expires_at: Optional[datetime] = None) -> None:
session = self._get_session(session_id)
workspace = session["workspace"]
# Check if secret exists (for versioning)
existing = self.redis.get(self._secret_key(workspace, name))
version = 1
if existing:
existing_data = json.loads(self._decrypt(existing))
version = existing_data["version"] + 1
now = datetime.utcnow()
secret_data = {
"name": name,
"value": base64.b64encode(value).decode(),
"version": version,
"labels": labels or {},
"created_at": now.isoformat() if version == 1 else existing_data["created_at"],
"updated_at": now.isoformat(),
"expires_at": expires_at.isoformat() if expires_at else None
}
# Encrypt and store
encrypted = self._encrypt(json.dumps(secret_data).encode())
if expires_at:
ttl = int((expires_at - now).total_seconds())
self.redis.setex(self._secret_key(workspace, name), ttl, encrypted)
else:
self.redis.set(self._secret_key(workspace, name), encrypted)
# Add to workspace index
self.redis.sadd(self._workspace_key(workspace), name)
def retrieve(self, session_id: str, name: str,
version: Optional[int] = None) -> Secret:
session = self._get_session(session_id)
workspace = session["workspace"]
data = self.redis.get(self._secret_key(workspace, name))
if not data:
raise NotFoundError(f"Secret '{name}' not found")
secret_data = json.loads(self._decrypt(data))
# Version check (we only keep latest in this simple implementation)
if version and secret_data["version"] != version:
raise NotFoundError(f"Version {version} not found")
return Secret(
name=secret_data["name"],
value=base64.b64decode(secret_data["value"]),
version=secret_data["version"],
labels=secret_data["labels"],
created_at=datetime.fromisoformat(secret_data["created_at"]),
updated_at=datetime.fromisoformat(secret_data["updated_at"])
)
def delete(self, session_id: str, name: str) -> None:
session = self._get_session(session_id)
workspace = session["workspace"]
self.redis.delete(self._secret_key(workspace, name))
self.redis.srem(self._workspace_key(workspace), name)
def list(self, session_id: str,
label_filter: Optional[Dict[str, str]] = None) -> List[SecretMetadata]:
session = self._get_session(session_id)
workspace = session["workspace"]
# Get all secret names in workspace
names = self.redis.smembers(self._workspace_key(workspace))
results = []
for name in names:
name = name.decode() if isinstance(name, bytes) else name
data = self.redis.get(self._secret_key(workspace, name))
if not data:
continue
secret_data = json.loads(self._decrypt(data))
# Apply label filter
if label_filter:
labels = secret_data.get("labels", {})
if not all(labels.get(k) == v for k, v in label_filter.items()):
continue
results.append(SecretMetadata(
name=secret_data["name"],
version=secret_data["version"],
labels=secret_data["labels"]
))
return results
def rotate(self, session_id: str, name: str, new_value: bytes) -> RotateResult:
session = self._get_session(session_id)
workspace = session["workspace"]
# Get existing secret
data = self.redis.get(self._secret_key(workspace, name))
if not data:
raise NotFoundError(f"Secret '{name}' not found")
existing = json.loads(self._decrypt(data))
previous_version = existing["version"]
# Store new version
self.store(session_id, name, new_value, existing.get("labels"))
return RotateResult(
previous_version=previous_version,
new_version=previous_version + 1
)
from avp import AVPClient
from my_backends import RedisBackend
# Create client with custom backend
backend = RedisBackend(
redis_url="redis://localhost:6379/0",
encryption_key="your-encryption-key"
)
client = AVPClient(backend)
# Use like any other backend
session = client.authenticate("my-workspace")
client.store(session.session_id, "API_KEY", b"secret-value")
secret = client.retrieve(session.session_id, "API_KEY")
import pytest
from avp.testing import BackendTestSuite
from my_backends import RedisBackend
class TestRedisBackend(BackendTestSuite):
"""Run standard AVP backend tests against Redis backend."""
@pytest.fixture
def backend(self):
return RedisBackend(
redis_url="redis://localhost:6379/15", # Use test database
encryption_key="test-key"
)
def teardown_method(self):
# Clean up test data
import redis
r = redis.from_url("redis://localhost:6379/15")
r.flushdb()
If you build a useful backend, consider publishing it:
avp-backend-{name}