InserterArtifact¶
Introduction¶
The InserterArtifact class is a component of the SPADE-FIWARE-Artifacts toolkit that facilitates the insertion and updating of data in FIWARE Context Brokers. It provides a robust interface for managing entities in NGSI-LD format, supporting both creation and updates of entities with various attribute types.
Key Features¶
Asynchronous Operation - Built on
asynciofor non-blocking operationsFlexible Entity Management - Automatic entity existence checking - Creation of new entities when needed - Support for partial updates of specific attributes
NGSI-LD Support - Full compatibility with NGSI-LD format - Handles Properties, GeoProperties, and Relationships - Customizable JSON templates for entity structure
Data Processing - Customizable data processor function - Built-in JSON cleaning and validation - Template-based payload construction
Class Structure¶
Constructor Parameters¶
def __init__(self, jid, passwd, publisher_jid, host, project_name, columns_update=[],
data_processor=None, json_template=None, json_exceptions=None, port='9090'):
jid: Jabber ID for the artifactpasswd: Password for authenticationpublisher_jid: JID of the publisher artifacthost: Context Broker host addressproject_name: Project identifiercolumns_update: List of specific columns to update (optional)data_processor: Custom data processing function (optional)json_template: Template for JSON payload construction (optional)json_exceptions: Exceptions for JSON cleaning rules (optional)port: Context Broker port (default: ‘9090’)
Core Methods¶
Entity Management¶
async def process_and_send_data(self, payload: dict)
async def update_specific_attributes(self, entity_id: str, entity_data: dict)
async def update_or_create_entity(self, entity_id: str, entity_data: dict, payload: dict)
async def entity_exists(self, entity_id: str) -> bool
async def create_new_entity(self, entity_data: dict)
async def update_entity_attribute(self, entity_id: str, attribute: str, attribute_data: dict, context: any)
async def update_all_attributes(self, entity_id, entity_data, context)
JSON Handling¶
def build_entity_json(self, payload, clean=True)
Integration Flow¶
The run() Method¶
The run() method is the core of the InserterArtifact, orchestrating the entire data flow:
Sets the artifact’s presence to available
Enters an infinite loop to: - Wait for data from the payload queue - Process incoming data - Send data to the Context Broker
Handles errors and maintains continuous operation
Data Flow Process¶
Data Reception: - Receives data through the
artifact_callbackmethod - Processes data using the configureddata_processor- Adds processed data to the payload queueData Processing: - Retrieves data from the queue - Constructs entity ID and JSON structure - Validates and cleans the data
Entity Management: - Checks if entity exists - Updates or creates entities as needed - Handles specific attribute updates
Use Cases and Examples¶
1. IoT Device Integration¶
# Example: IoT sensor data integration
json_template = {
"@context": "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld",
"id": "urn:ngsi-ld:Sensor:{device_id}",
"type": "Sensor",
"temperature": {
"type": "Property",
"value": "{temperature}"
},
"humidity": {
"type": "Property",
"value": "{humidity}"
},
"location": {
"type": "GeoProperty",
"value": {
"type": "Point",
"coordinates": ["{longitude}", "{latitude}"]
}
}
}
inserter = InserterArtifact(
jid="sensor_inserter@xmpp.server",
passwd="password",
publisher_jid="sensor_publisher@xmpp.server",
host="context.broker.host",
project_name="iot_project",
json_template=json_template
)
2. Real-time Monitoring System¶
# Example: System monitoring with specific attribute updates
def process_monitoring_data(data):
return [{
"id": data["system_id"],
"type": "MonitoringSystem",
"cpu_usage": data["cpu"],
"memory_usage": data["memory"],
"disk_usage": data["disk"]
}]
inserter = InserterArtifact(
jid="monitor_inserter@xmpp.server",
passwd="password",
publisher_jid="monitor_publisher@xmpp.server",
host="context.broker.host",
project_name="monitoring",
columns_update=["cpu_usage", "memory_usage", "disk_usage"],
data_processor=process_monitoring_data
)
3. Smart City Application¶
# Example: Traffic monitoring system
json_template = {
"@context": "https://smart-data-models.github.io/dataModel.Transportation/context.jsonld",
"id": "urn:ngsi-ld:TrafficFlowObserved:{intersection_id}",
"type": "TrafficFlowObserved",
"vehicleCount": {
"type": "Property",
"value": "{count}"
},
"averageSpeed": {
"type": "Property",
"value": "{speed}"
},
"congestionLevel": {
"type": "Property",
"value": "{congestion}"
}
}
Advanced Features¶
Custom Data Processing¶
def custom_processor(data):
# Transform incoming data
processed_data = []
for item in data["items"]:
processed_item = {
"id": item["id"],
"type": "CustomEntity",
# Add more transformations
}
processed_data.append(processed_item)
return processed_data
inserter = InserterArtifact(
# ... other parameters ...
data_processor=custom_processor
)
Selective Attribute Updates¶
inserter = InserterArtifact(
# ... other parameters ...
columns_update=["temperature", "humidity"], # Only update these attributes
)
Best Practices¶
Error Handling - Implement proper error handling in data processor functions - Monitor the artifact’s logs for potential issues - Handle network connectivity issues gracefully
Performance Optimization - Use batch processing when possible - Implement efficient data processing functions - Monitor queue size and processing delays
Data Validation - Validate data before processing - Use appropriate JSON templates - Handle missing or invalid data appropriately
Troubleshooting¶
Common Issues¶
Connection Errors - Verify Context Broker URL and port - Check network connectivity - Ensure proper authentication
Data Processing Issues - Validate JSON template format - Check data processor function - Verify incoming data structure
Update Failures - Confirm entity existence - Verify attribute names and types - Check NGSI-LD compatibility