SubscriptionManagerArtifact¶
Overview¶
The SubscriptionManagerArtifact manages NGSI-LD subscriptions with a FIWARE Context Broker. It extends the base spade_artifact.Artifact class and provides comprehensive functionality for subscription management, notification handling, and artifact-agent communication.
Class Architecture¶
Core Components¶
The class is structured around several key components:
Subscription Management - Creation and deletion of subscriptions - Tracking active subscriptions - Subscription identifier generation
Notification Handling - HTTP server for receiving notifications - Notification processing and filtering - Data publishing to focused agents
Network Configuration - Dynamic port allocation - IP address management - Endpoint configuration
Key Attributes¶
class SubscriptionManagerArtifact(spade_artifact.Artifact):
def __init__(self, jid, passwd, config, broker_url="http://localhost:9090"):
self.broker_url = broker_url
self.recent_notifications = {}
self.watched_attributes = []
self.config = config
self.port = None
self.active_subscriptions = {}
broker_url: Context Broker endpointrecent_notifications: Cache of received notificationswatched_attributes: List of monitored attributesconfig: Subscription configurationport: Dynamic port for notification serveractive_subscriptions: Dictionary of current subscriptions
Core Functionality¶
Subscription Creation¶
async def create_subscription(self, session, subscription_data, subscription_identifier):
"""Creates a new subscription in the Context Broker."""
The method: 1. Sends POST request to Context Broker 2. Handles response and stores subscription ID 3. Updates active_subscriptions dictionary 4. Logs creation status
Subscription Deletion¶
async def delete_subscription(self, session, subscription_id):
"""Deletes a subscription from the Context Broker."""
async def delete_subscription_by_identifier(self, session, subscription_identifier):
"""Deletes a subscription using its unique identifier."""
async def delete_artifact_subscriptions(self, session):
"""Deletes all subscriptions associated with this artifact."""
These methods provide different levels of subscription cleanup: - Individual subscription deletion - Identifier-based deletion - Bulk deletion of artifact subscriptions
Notification Handling¶
async def handle_notification(self, request):
"""Processes incoming notifications from the Context Broker."""
The notification handler: 1. Parses incoming JSON data 2. Filters attributes based on configuration 3. Updates recent_notifications cache 4. Publishes data to focused agents 5. Returns appropriate HTTP response
Integration in run() Method¶
The run() method orchestrates all components:
async def run(self):
try:
self.presence.set_available()
local_ip = self.get_local_ip()
self.port = self.find_free_port()
async with aiohttp.ClientSession() as session:
# Subscription cleanup if configured
if self.config.get("delete_all_artifact_subscriptions", False):
await self.delete_artifact_subscriptions(session)
elif self.config.get("delete_subscription_identifier"):
await self.delete_subscription_by_identifier(
session,
self.config["delete_subscription_identifier"])
# Create new subscription if not delete-only mode
if not self.config.get("delete_only", False):
subscription_identifier = self.config.get("subscription_identifier",
self.generate_subscription_id())
subscription_data = self.build_subscription_data(local_ip, subscription_identifier)
# Set up notification server
app = web.Application()
app.router.add_post("/notify", self.handle_notification)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, '0.0.0.0', self.port)
await site.start()
# Create subscription
await self.create_subscription(session, subscription_data, subscription_identifier)
while True:
await asyncio.sleep(1)
Execution Flow: 1. Sets artifact availability 2. Configures network settings 3. Performs subscription cleanup if needed 4. Sets up notification server 5. Creates new subscription 6. Maintains continuous operation
Usage Patterns¶
1. Basic Subscription Management¶
config = {
"entity_type": "Device",
"watched_attributes": ["temperature", "humidity"],
"subscription_identifier": "env_monitor"
}
artifact = SubscriptionManagerArtifact(
jid="monitor@xmpp.server",
passwd="password",
config=config,
broker_url="http://broker:1026"
)
2. Filtered Subscriptions¶
config = {
"entity_type": "Device",
"watched_attributes": ["temperature"],
"q_filter": "temperature>30",
"subscription_identifier": "high_temp_alert"
}
3. Subscription Cleanup¶
config = {
"delete_all_artifact_subscriptions": True,
"delete_only": True
}
4. Specific Entity Monitoring¶
config = {
"entity_type": "Device",
"entity_id": "device001",
"watched_attributes": ["status"],
"subscription_identifier": "device_status"
}
Advanced Features¶
Dynamic Port Allocation¶
def find_free_port(self):
"""Finds an available port for the notification server."""
Randomly selects ports in range 8000-65000
Tests port availability
Returns first available port
IP Address Management¶
def get_local_ip(self):
"""Retrieves the local IP address."""
Determines machine’s IP address
Handles various network configurations
Falls back to localhost if needed
Subscription Data Building¶
def build_subscription_data(self, local_ip, subscription_identifier):
"""Constructs subscription payload."""
Builds NGSI-LD subscription with: - Entity specifications - Notification endpoint - Attribute filters - Query conditions
Integration Examples¶
1. With SPADE Agent¶
class MonitorAgent(Agent):
async def setup(self):
artifact = SubscriptionManagerArtifact(...)
await artifact.start()
await self.artifacts.focus(artifact.jid, self.handle_notification)
2. Multiple Subscriptions¶
async def manage_subscriptions():
artifact = SubscriptionManagerArtifact(...)
configs = [config1, config2, config3]
for config in configs:
artifact.config = config
await artifact.start()
3. Custom Notification Processing¶
class CustomSubscriptionManager(SubscriptionManagerArtifact):
async def handle_notification(self, request):
data = await request.json()
# Custom processing
await self.publish(processed_data)