Advanced Example: Multi-Tenant Ingestion Service¶
This example demonstrates a more complex use case for wd-di, showcasing how to build a multi-tenant file ingestion service where each tenant uses a different cloud storage backend. It highlights features like scoped services, factory-based dependency resolution, ContextVar for managing tenant context, and configuration-driven behavior.
đŠī¸ The Scenario: Multi-Tenant Uploads¶
Imagine an ingestion service where each customer (tenant) insists on using their own cloud storage provider:
- ACME Ltd. â AWS S3
- Contoso â Azure Blob Storage
- Globex â Google Cloud Storage
At runtime, your application might receive an HTTP header like X-Tenant-Id: ACME and must dynamically choose and use the correct storage backend for that tenant.
Why Use Dependency Injection for This?¶
Using wd-di for this scenario offers significant advantages over manual, conditional logic (e.g., large if/elif blocks):
- Pluggable Backends: Adding a new tenant or storage provider (e.g., "DigitalOcean Spaces") becomes a matter of adding a new storage implementation class and updating a configuration mapping. No changes are needed in the core ingestion logic.
- Constructor Injection: Storage backend implementations can themselves depend on other services (like a clock, logger, or authentication service). The DI container automatically resolves and injects these dependencies when creating a backend instance.
- Enhanced Testability: During testing, you can easily swap out real cloud storage backends for a mock implementation (e.g., one that writes to the local file system or an in-memory store). This is done by simply registering the mock implementation in the
ServiceCollectionduring test setup.
Key DI Concepts Demonstrated¶
This example utilizes several important wd-di features:
ServiceCollection: Used as the central registry for all service definitions (interfaces and their concrete implementations or factories).ContextVarfor Tenant ID: Acontextvars.ContextVar(_current_tenant) is used to implicitly carry the current tenant ID through asynchronous call chains without needing to pass it as an explicit parameter to every function.- Factory for Dynamic Resolution (
_storage_factory): A factory function (_storage_factory) is registered for theIBlobStorageinterface. This factory dynamically determines which concrete storage implementation to provide based on the current tenant's configuration (retrieved viaIConfigurationand the_current_tenantcontext variable). - Service Lifetimes: Careful selection of service lifetimes is crucial:
IClock: Registered as a singleton. A single clock instance is shared across the entire application.DataIngestionService: Registered as scoped. A new instance is created for each logical request or operation scope (in this case, per tenant request). !!! info "Why isDataIngestionServiceScoped and Not Singleton?" IfDataIngestionServicewere a singleton, the DI container would create it once. When created, it would be injected with anIBlobStorageinstance. Due to the factory logic, this would be the storage backend for the very first tenant whose request was processed. Subsequent requests for other tenants would erroneously reuse this same backend (e.g., all tenants' data might end up in ACME's S3 bucket). By makingDataIngestionServicescoped, a new instance is created within each tenant's request scope, ensuring it gets anIBlobStorageinstance appropriate for that specific tenant.IBlobStorage: Registered withadd_transient_factory. While the factory itself is called per resolution, theDataIngestionServicebeing scoped means it gets itsIBlobStorageonce per scope. IfDataIngestionServicewere transient,IBlobStorage(and its factory) would be resolved anew each timeDataIngestionServicewas resolved.
Application Workflow¶
- Startup: The main process initializes the
ServiceCollectionwith all necessary service registrations and builds the rootServiceProvider. - Per Request (Simulated):
- The
handle_requestfunction simulates an incoming request for a specific tenant. - The
_current_tenantContextVaris set to the ID of the tenant for the current request. - A new DI scope is created using
provider.create_scope(). This ensures that any scoped services (likeDataIngestionService) are fresh for this request. DataIngestionServiceis resolved from the current scope. During its resolution:- The DI container needs an
IBlobStorage. - It calls the registered factory for
IBlobStorage(_storage_factory). - The factory uses the
ServiceProvider(sp) to getIConfigurationand the current tenant ID (from_current_tenant.get()) to determine the correct backend type (S3, Azure, GCS). - It then instantiates and returns the appropriate concrete storage implementation (e.g.,
S3Storage), injecting its dependencies (likeIClock).
- The DI container needs an
- The
ingestmethod of theDataIngestionServiceis called to process the file, using the correctly injected tenant-specific storage backend. - Upon exiting the
with provider.create_scope() as scope:block, the scope is disposed of. Any disposable scoped services would also be disposed.
- The
This example is designed to be dependency-free for demonstration purposes; the "cloud" storage backends are simple in-memory classes. You can run python examples/complex_ingest/app.py (assuming the example structure) to see it in action.
Implementation¶
a complete runable implementation can be found in examples/complex_ingest
```python from contextvars import ContextVar from typing import Dict, Callable, Type
from wd.di import ServiceCollection from wd.di.config import Configuration, IConfiguration
Placeholder interfaces/classes for self-contained documentation¶
class IClock: def now(self): ...
class IBlobStorage: def upload(self, blob_name: str, data: bytes): ...
class UtcClock(IClock): def now(self): return "current_time_utc" # Simplified
class S3Storage(IBlobStorage): def init(self, clock: IClock): self._clock = clock def upload(self, blob_name: str, data: bytes): print(f"S3: Uploading {blob_name} at {self._clock.now()} ({len(data)} bytes)")
class AzureBlobStorage(IBlobStorage): def init(self, clock: IClock): self._clock = clock def upload(self, blob_name: str, data: bytes): print(f"Azure: Uploading {blob_name} at {self._clock.now()} ({len(data)} bytes)")
class GcsStorage(IBlobStorage): def init(self, clock: IClock): self._clock = clock def upload(self, blob_name: str, data: bytes): print(f"GCS: Uploading {blob_name} at {self._clock.now()} ({len(data)} bytes)")
class DataIngestionService: def init(self, storage: IBlobStorage): self._storage = storage def ingest(self, filename: str, data: bytes): print(f"DataIngestionService: Ingesting {filename}") self._storage.upload(filename, data)
1. Set up the service collection¶
services = ServiceCollection() services.add_singleton(IClock, UtcClock) services.add_scoped(DataIngestionService)
services.add_singleton_factory( IConfiguration, lambda _: Configuration( { "tenants": { "ACME": {"backend": "s3"}, "Contoso": {"backend": "azure"}, "Globex": {"backend": "gcs"}, } } ), )
_current_tenant: ContextVar[str] = ContextVar("tenant")
def _storage_factory(sp, tenant_id: str) -> IBlobStorage: backend = sp.get_service(IConfiguration).get(f"tenants:{tenant_id}:backend") mapping: Dict[str, Callable[[IClock], IBlobStorage]] = { "s3": lambda clock: S3Storage(clock), "azure": lambda clock: AzureBlobStorage(clock), "gcs": lambda clock: GcsStorage(clock), } if backend not in mapping: raise ValueError(f"Unknown backend \'{backend}\' for tenant {tenant_id}") return mappingbackend
services.add_transient_factory( IBlobStorage, lambda sp: _storage_factory(sp, _current_tenant.get()) )
provider = services.build_service_provider()
2. Request boundary helper¶
def handle_request(tenant_id: str, filename: str, data: bytes) -> None: token = _current_tenant.set(tenant_id) with provider.create_scope() as scope: scope.get_service(DataIngestionService).ingest(filename, data) _current_tenant.reset(token)
3. Demo run¶
if name == "main": dummy_data = b"dummy file content" print("--- Simulating ACME request ---") handle_request("ACME", "acme/lucy.jpg", dummy_data) print("\n--- Simulating Contoso request ---") handle_request("Contoso", "contoso/luna.jpg", dummy_data) print("\n--- Simulating Globex request ---") handle_request("Globex", "globex/lucy.jpg", dummy_data)