← Back to Docs

Custom Backends

Build your own backend for specialized storage requirements

When to Build a Custom Backend

Consider building a custom backend when you need:

Backend Interface

All backends must implement the AVP Backend interface. Here's the interface definition for each language:

Python Interface

from abc import ABC, abstractmethod
from typing import Optional, Dict, List
from dataclasses import dataclass
from datetime import datetime

@dataclass
class DiscoverInfo:
    version: str
    backends: List[str]
    operations: List[str]
    extensions: List[str]

@dataclass
class Session:
    session_id: str
    workspace: str
    expires_at: datetime

@dataclass
class Secret:
    name: str
    value: bytes
    version: int
    labels: Dict[str, str]
    created_at: datetime
    updated_at: datetime

@dataclass
class SecretMetadata:
    name: str
    version: int
    labels: Dict[str, str]

@dataclass
class RotateResult:
    previous_version: int
    new_version: int

class Backend(ABC):
    @abstractmethod
    def discover(self) -> DiscoverInfo:
        """Return backend capabilities."""
        pass

    @abstractmethod
    def authenticate(self, workspace: str, ttl_seconds: Optional[int] = None) -> Session:
        """Create an authenticated session."""
        pass

    @abstractmethod
    def store(self, session_id: str, name: str, value: bytes,
              labels: Optional[Dict[str, str]] = None,
              expires_at: Optional[datetime] = None) -> None:
        """Store a secret."""
        pass

    @abstractmethod
    def retrieve(self, session_id: str, name: str,
                 version: Optional[int] = None) -> Secret:
        """Retrieve a secret."""
        pass

    @abstractmethod
    def delete(self, session_id: str, name: str) -> None:
        """Delete a secret."""
        pass

    @abstractmethod
    def list(self, session_id: str,
             label_filter: Optional[Dict[str, str]] = None) -> List[SecretMetadata]:
        """List secrets in the workspace."""
        pass

    @abstractmethod
    def rotate(self, session_id: str, name: str, new_value: bytes) -> RotateResult:
        """Rotate a secret value."""
        pass

TypeScript Interface

interface DiscoverInfo {
  version: string;
  backends: string[];
  operations: string[];
  extensions: string[];
}

interface Session {
  sessionId: string;
  workspace: string;
  expiresAt: Date;
}

interface Secret {
  name: string;
  value: Buffer;
  version: number;
  labels: Record<string, string>;
  createdAt: Date;
  updatedAt: Date;
}

interface SecretMetadata {
  name: string;
  version: number;
  labels: Record<string, string>;
}

interface RotateResult {
  previousVersion: number;
  newVersion: number;
}

interface Backend {
  discover(): Promise<DiscoverInfo>;
  authenticate(workspace: string, ttlSeconds?: number): Promise<Session>;
  store(sessionId: string, name: string, value: Buffer,
        labels?: Record<string, string>, expiresAt?: Date): Promise<void>;
  retrieve(sessionId: string, name: string, version?: number): Promise<Secret>;
  delete(sessionId: string, name: string): Promise<void>;
  list(sessionId: string, labelFilter?: Record<string, string>): Promise<SecretMetadata[]>;
  rotate(sessionId: string, name: string, newValue: Buffer): Promise<RotateResult>;
}

Example: Redis Backend

Here's a complete example of a custom Redis backend in Python:

"""Redis backend for AVP - stores encrypted secrets in Redis."""

import json
import secrets
import hashlib
from datetime import datetime, timedelta
from typing import Optional, Dict, List
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import redis

from avp import Backend, DiscoverInfo, Session, Secret, SecretMetadata, RotateResult
from avp.errors import NotFoundError, SessionExpiredError, UnauthorizedError


class RedisBackend(Backend):
    """AVP backend using Redis for storage with client-side encryption."""

    def __init__(self, redis_url: str, encryption_key: str):
        """
        Initialize Redis backend.

        Args:
            redis_url: Redis connection URL (redis://localhost:6379/0)
            encryption_key: Master encryption key for secrets
        """
        self.redis = redis.from_url(redis_url)
        self.fernet = self._derive_fernet(encryption_key)

    def _derive_fernet(self, password: str) -> Fernet:
        """Derive Fernet key from password."""
        # Use a fixed salt for deterministic key derivation
        # In production, store salt separately
        salt = b'avp-redis-backend-salt-v1'
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=salt,
            iterations=480000,
        )
        key = base64.urlsafe_b64encode(kdf.derive(password.encode()))
        return Fernet(key)

    def _encrypt(self, data: bytes) -> bytes:
        """Encrypt data."""
        return self.fernet.encrypt(data)

    def _decrypt(self, data: bytes) -> bytes:
        """Decrypt data."""
        return self.fernet.decrypt(data)

    def _session_key(self, session_id: str) -> str:
        return f"avp:session:{session_id}"

    def _secret_key(self, workspace: str, name: str) -> str:
        return f"avp:secret:{workspace}:{name}"

    def _workspace_key(self, workspace: str) -> str:
        return f"avp:workspace:{workspace}"

    def discover(self) -> DiscoverInfo:
        return DiscoverInfo(
            version="0.1.0",
            backends=["redis"],
            operations=["DISCOVER", "AUTHENTICATE", "STORE", "RETRIEVE",
                       "DELETE", "LIST", "ROTATE"],
            extensions=[]
        )

    def authenticate(self, workspace: str, ttl_seconds: Optional[int] = None) -> Session:
        ttl = ttl_seconds or 3600
        session_id = secrets.token_urlsafe(32)
        expires_at = datetime.utcnow() + timedelta(seconds=ttl)

        session_data = {
            "workspace": workspace,
            "expires_at": expires_at.isoformat()
        }

        # Store session with TTL
        self.redis.setex(
            self._session_key(session_id),
            ttl,
            json.dumps(session_data)
        )

        return Session(
            session_id=session_id,
            workspace=workspace,
            expires_at=expires_at
        )

    def _get_session(self, session_id: str) -> dict:
        """Get and validate session."""
        data = self.redis.get(self._session_key(session_id))
        if not data:
            raise SessionExpiredError("Session not found or expired")

        session = json.loads(data)
        expires_at = datetime.fromisoformat(session["expires_at"])

        if datetime.utcnow() > expires_at:
            raise SessionExpiredError("Session expired")

        return session

    def store(self, session_id: str, name: str, value: bytes,
              labels: Optional[Dict[str, str]] = None,
              expires_at: Optional[datetime] = None) -> None:
        session = self._get_session(session_id)
        workspace = session["workspace"]

        # Check if secret exists (for versioning)
        existing = self.redis.get(self._secret_key(workspace, name))
        version = 1
        if existing:
            existing_data = json.loads(self._decrypt(existing))
            version = existing_data["version"] + 1

        now = datetime.utcnow()
        secret_data = {
            "name": name,
            "value": base64.b64encode(value).decode(),
            "version": version,
            "labels": labels or {},
            "created_at": now.isoformat() if version == 1 else existing_data["created_at"],
            "updated_at": now.isoformat(),
            "expires_at": expires_at.isoformat() if expires_at else None
        }

        # Encrypt and store
        encrypted = self._encrypt(json.dumps(secret_data).encode())

        if expires_at:
            ttl = int((expires_at - now).total_seconds())
            self.redis.setex(self._secret_key(workspace, name), ttl, encrypted)
        else:
            self.redis.set(self._secret_key(workspace, name), encrypted)

        # Add to workspace index
        self.redis.sadd(self._workspace_key(workspace), name)

    def retrieve(self, session_id: str, name: str,
                 version: Optional[int] = None) -> Secret:
        session = self._get_session(session_id)
        workspace = session["workspace"]

        data = self.redis.get(self._secret_key(workspace, name))
        if not data:
            raise NotFoundError(f"Secret '{name}' not found")

        secret_data = json.loads(self._decrypt(data))

        # Version check (we only keep latest in this simple implementation)
        if version and secret_data["version"] != version:
            raise NotFoundError(f"Version {version} not found")

        return Secret(
            name=secret_data["name"],
            value=base64.b64decode(secret_data["value"]),
            version=secret_data["version"],
            labels=secret_data["labels"],
            created_at=datetime.fromisoformat(secret_data["created_at"]),
            updated_at=datetime.fromisoformat(secret_data["updated_at"])
        )

    def delete(self, session_id: str, name: str) -> None:
        session = self._get_session(session_id)
        workspace = session["workspace"]

        self.redis.delete(self._secret_key(workspace, name))
        self.redis.srem(self._workspace_key(workspace), name)

    def list(self, session_id: str,
             label_filter: Optional[Dict[str, str]] = None) -> List[SecretMetadata]:
        session = self._get_session(session_id)
        workspace = session["workspace"]

        # Get all secret names in workspace
        names = self.redis.smembers(self._workspace_key(workspace))

        results = []
        for name in names:
            name = name.decode() if isinstance(name, bytes) else name
            data = self.redis.get(self._secret_key(workspace, name))
            if not data:
                continue

            secret_data = json.loads(self._decrypt(data))

            # Apply label filter
            if label_filter:
                labels = secret_data.get("labels", {})
                if not all(labels.get(k) == v for k, v in label_filter.items()):
                    continue

            results.append(SecretMetadata(
                name=secret_data["name"],
                version=secret_data["version"],
                labels=secret_data["labels"]
            ))

        return results

    def rotate(self, session_id: str, name: str, new_value: bytes) -> RotateResult:
        session = self._get_session(session_id)
        workspace = session["workspace"]

        # Get existing secret
        data = self.redis.get(self._secret_key(workspace, name))
        if not data:
            raise NotFoundError(f"Secret '{name}' not found")

        existing = json.loads(self._decrypt(data))
        previous_version = existing["version"]

        # Store new version
        self.store(session_id, name, new_value, existing.get("labels"))

        return RotateResult(
            previous_version=previous_version,
            new_version=previous_version + 1
        )

Using Your Custom Backend

from avp import AVPClient
from my_backends import RedisBackend

# Create client with custom backend
backend = RedisBackend(
    redis_url="redis://localhost:6379/0",
    encryption_key="your-encryption-key"
)

client = AVPClient(backend)

# Use like any other backend
session = client.authenticate("my-workspace")
client.store(session.session_id, "API_KEY", b"secret-value")
secret = client.retrieve(session.session_id, "API_KEY")

Best Practices

Security Requirements

Common Pitfalls

Testing Your Backend

import pytest
from avp.testing import BackendTestSuite
from my_backends import RedisBackend

class TestRedisBackend(BackendTestSuite):
    """Run standard AVP backend tests against Redis backend."""

    @pytest.fixture
    def backend(self):
        return RedisBackend(
            redis_url="redis://localhost:6379/15",  # Use test database
            encryption_key="test-key"
        )

    def teardown_method(self):
        # Clean up test data
        import redis
        r = redis.from_url("redis://localhost:6379/15")
        r.flushdb()

Publishing Your Backend

If you build a useful backend, consider publishing it:

  1. Create a package named avp-backend-{name}
  2. Include comprehensive documentation
  3. Add integration tests
  4. Publish to PyPI/npm/crates.io
  5. Open a PR to add it to the AVP ecosystem page