Custom Connector for Mule ESB Logging
Understanding Mule ESB Logging with Nodinite
The Nodinite Custom Connector enables centralized, structured logging from Mule ESB flows by replacing standard Logger and Business Events components. This guide explains the architecture, implementation steps, and configuration requirements for production-grade logging from Mule ESB 3.6–3.9 environments.
Why Use a Custom Connector for Logging
Mule ESB provides built-in logging options (Logger, Business Events), but these have significant limitations for enterprise integration scenarios:
Comparison: Built-in Logging vs. Custom Connector
| Feature | Logger | Business Events | Nodinite Custom Connector |
|---|---|---|---|
| Cost | ✅ Free (built-in) | ❌ Requires Enterprise Edition | ✅ Free template, requires Nodinite license |
| Payload Logging | ⚠️ Limited to log level filtering | ❌ Cannot log message body | ✅ Always logs full message body |
| Context Properties | ⚠️ Manual implementation | ⚠️ Key/value pairs only (no payload) | ✅ Structured context with full payload |
| On-Premise/Cloud | ❌ Different configurations required | ❌ Different configurations required | ✅ Unified approach - no code changes when migrating |
| Data Visibility | ❌ File-based logs, 12+ hour delays in cloud | ⚠️ Real-time tracking UI | ✅ Async pickup, fast visibility (minutes) |
| Consistency | ❌ Inconsistent quality across flows | ⚠️ Limited metadata | ✅ Enforced structure via JSON schema |
| Maintenance | ❌ Requires log rotation/management | ⚠️ Requires data retention policies | ✅ Centralized log database management |
| Replacement Effort | ❌ Hardcoded, difficult to change | ❌ Hardcoded, difficult to change | ✅ Single subflow change updates all flows |
| Retention Control | ❌ File/folder-based deletion only | ❌ Global retention policies | ✅ Message Type-based retention - fine-tune by business data type |
Architecture: How the Custom Connector Works
Architecture: Mule ESB flows invoke the Custom Connector subflow, which formats JSON Log Events and posts them to a configured destination (queue, database, or folder). The Pickup Service asynchronously fetches events and forwards them to the Log API.
Key Components
| Component | Role | Technology Options |
|---|---|---|
| Custom Connector | Formats Mule message as JSON Log Event | JAR package (Anypoint Studio 6/7) |
| Logging Subflow | Centralizes logging logic for reuse | Single Mule subflow invoked by all flows |
| Destination | Temporary storage for async pickup | ActiveMQ (recommended), PostgreSQL, File, AnypointMQ, Log API (dev only) |
| Pickup Service | Fetches events and forwards to Log API | Pickup Log Events Service Logging Agent |
| Log API | Persists events to Log Database | Nodinite Core Services component |
Why Asynchronous Logging?
Asynchronous logging (via queue or database) is strongly recommended for production environments because:
- Non-blocking: Flow execution continues without waiting for log persistence
- Resilience: Destination outages don't block integration flows
- Scalability: Queue-based pickup handles high-volume bursts
- Performance: No synchronous HTTP calls from Mule flows
Destination Options
| Destination | Pros | Cons | Recommendation |
|---|---|---|---|
| Log API (Synchronous) | Simple setup, no middleware required | Not highly available, blocks flows, doesn't scale | Development/POC only - avoid in production |
| ActiveMQ | Free (Open Source), scales well, supports clustering, fail-over capability | Requires ActiveMQ installation and configuration | Recommended for production |
| PostgreSQL | Scales well, uses existing database infrastructure | Requires table creation and polling configuration | Good alternative if ActiveMQ unavailable |
| AnypointMQ | Cloud-native, highly available, scales automatically | Requires MuleSoft Enterprise subscription, higher cost | Contact support - not yet supported by Pickup Service |
| File Folder | Simple for single-server scenarios | No fail-over, requires file system monitoring | Small deployments only |
Implementation Steps
Prerequisites
Before implementing the Custom Connector, ensure you have:
- Mule ESB 3.6, 3.7, 3.8, or 3.9 installed
- Anypoint Studio 6 or 7
- Nodinite portal account for connector download
- Destination middleware configured (ActiveMQ, PostgreSQL, or file folder)
- Pickup Log Events Service Logging Agent installed and configured
- Understanding of Asynchronous Logging principles
Step 1: Download the Custom Connector
- Log in to the Nodinite portal
- Navigate to Downloads → Logging Agents → Mule ESB Custom Connector
- Download the ZIP file containing the JAR package
Step 2: Add Custom Connector to Anypoint Studio
For Anypoint Studio 6
- Extract the ZIP file
- Copy the JAR file to your Anypoint Studio project's
libfolder - The connector appears in the palette as Nodinite (prefix Log ...)
Screenshot: Nodinite Custom Connector installed in Anypoint Studio 6 palette.
For Anypoint Studio 7
- Right-click project → Import → General → File System
- Browse to extracted JAR location
- Select JAR and click Finish
Screenshot: Import dialog for Anypoint Studio 7 JAR package.
Step 3: Create the Logging Subflow
Create a reusable logging subflow that all integration flows will invoke:
- Create a new subflow named
nodinite-logging-subflow - Add the following components:
- Nodinite: Create Log Event - formats the JSON Log Event
- Destination connector - posts event to chosen destination (HTTP for Log API, JMS for ActiveMQ, etc.)
- Set Payload - restores original payload after logging
Screenshot: Reusable logging subflow with Create Log Event and destination components.
Example subflow configuration (ActiveMQ destination)
<sub-flow name="nodinite-logging-subflow">
<logger message="#[sessionVars.nodiniteLogText]" level="INFO" doc:name="Logger"/>
<set-variable variableName="payloadBeforeLogging" value="#[payload]" doc:name="Store Payload"/>
<nodinite:create-log-event config-ref="Nodinite__Configuration"
endPointName="#[flowVars.endpointName]"
endpointUri="#[flowVars.endpointUri]"
endPointDirection="#[sessionVars.nodiniteEndPointDirection]"
originalMessageTypeName="#[sessionVars.nodiniteMessageTypeName]"
logStatus="#[sessionVars.nodiniteLogStatus]"
logText="#[sessionVars.nodiniteLogText]"
payload="#[payload]"
doc:name="Create JSON Log Event">
<nodinite:context-properties ref="#[sessionVars.nodiniteContextValues]"/>
</nodinite:create-log-event>
<jms:outbound-endpoint queue="nodinite.log.events" connector-ref="ActiveMQ" doc:name="Post to ActiveMQ"/>
<set-payload value="#[flowVars.payloadBeforeLogging]" doc:name="Restore Payload"/>
</sub-flow>
Step 4: Invoke the Logging Subflow from Integration Flows
Add logging calls at key points in your integration flows:
- Before transformation - log incoming message
- After transformation - log outgoing message
- Exception handlers - log error conditions
- Conditional branches - log routing decisions
- Subflow entry/exit - log component boundaries
Example: File-based integration flow with logging
Screenshot: Mule ESB flow using Nodinite Custom Connector at receive, before processing, and after processing steps.
<flow name="invoice-processing-flow">
<file:inbound-endpoint path="/drop/invoices" />
<!-- Log: File Received -->
<scripting:component doc:name="Set Logging Variables">
<scripting:script engine="Groovy"><![CDATA[
sessionVars['correlationId'] = message.id;
sessionVars['nodiniteMessageTypeName'] = "Invoice#1.0";
sessionVars['nodiniteLogText'] = "File Received";
sessionVars['nodiniteLogStatus'] = 0;
sessionVars['nodiniteEndPointDirection'] = 0;
sessionVars['nodiniteContextValues'] = [
fileName: flowVars['originalFilename'],
fileSize: payload.length
];
return payload;
]]></scripting:script>
</scripting:component>
<flow-ref name="nodinite-logging-subflow" doc:name="Log Receive"/>
<!-- Process message -->
<transformer ref="invoiceTransformer"/>
<!-- Log: Processing Complete -->
<scripting:component doc:name="Update Logging Variables">
<scripting:script engine="Groovy"><![CDATA[
sessionVars['nodiniteLogText'] = "Processing Complete";
sessionVars['nodiniteLogStatus'] = 0;
sessionVars['nodiniteEndPointDirection'] = 1;
sessionVars['nodiniteContextValues'] = [
fileName: flowVars['originalFilename'],
invoiceNumber: flowVars['invoiceNumber'],
amount: flowVars['invoiceAmount']
];
return payload;
]]></scripting:script>
</scripting:component>
<flow-ref name="nodinite-logging-subflow" doc:name="Log Success"/>
<file:outbound-endpoint path="/processed/invoices"/>
</flow>
Step 5: Configure the Pickup Service
The Pickup Log Events Service Logging Agent fetches events from your configured destination and forwards them to the Log API.
Configuration steps
- Install the Pickup Service on a server with access to your destination middleware
- Configure the pickup source (ActiveMQ queue, PostgreSQL table, or file folder)
- Configure the Log API target endpoint
- Set the pickup interval (recommend 30-60 seconds for production)
- Configure retry and error handling policies
See the Asynchronous Logging and Pickup Service documentation for detailed configuration examples.
Step 6: Verify Logging and Fine-Tune
Deploy and test:
- Deploy your updated Mule flows
- Trigger a test message
- Verify event appears in Nodinite Web Client
Review log quality:
- Check that Message Types are correctly set
- Verify Search Fields extract expected values
- Confirm Log Status Codes reflect business outcomes
Fine-tune performance:
- Adjust destination batch sizes
- Optimize Pickup Service intervals
- Monitor destination queue/table depths
JSON Log Event Format
The Custom Connector formats Mule messages as JSON Log Events following the Nodinite schema. Understanding this format is critical for proper integration.
Mandatory Fields
The following fields are required for every logged event:
| Field | Data Type | Description | Example Value |
|---|---|---|---|
| LogAgentValueId | number |
ID of the Log Agent sending events | 42 |
| EndPointName | string |
Name of Endpoint (transport description) | "INT101: Receive Invoices" |
| EndPointUri | string |
URI/address of Endpoint | "file:///drop/invoices" |
| EndPointDirection | number |
Direction: 0 (receive), 1 (send) |
0 |
| EndPointTypeId | number |
Transport type ID | 60 (File) |
| OriginalMessageTypeName | string |
Message Type identifier | "Invoice#1.0" |
| LogDateTime | string |
Timestamp in ISO 8601 UTC format | "2018-05-03T13:37:00.123Z" |
Optional But Recommended Fields
| Field | Data Type | Description | Example Value |
|---|---|---|---|
| Body | string |
Message payload (base64-encoded if binary) | Full message content |
| CorrelationId | string |
Correlation identifier for related events | GUID or business key |
| LogStatus | number |
Status code: -1 (error), 0 (success), 1 (info) |
0 |
| LogText | string |
Human-readable log message | "Invoice processed successfully" |
| ContextProperties | object |
Key-value pairs for Context Options | {"orderId": "12345", "customer": "ACME"} |
See the JSON Log Event documentation for the complete schema and examples.
Message Types Configuration
Important
Message Types are mandatory and cannot be changed after logging. The
OriginalMessageTypeNamefield determines which Search Field Expressions are applied to extract business data from logged events.
Why Message Types Matter
Many Nodinite features depend on well-defined Message Types:
- Search Fields - Extract Order Numbers, Customer IDs, Invoice amounts, Transaction codes using Search Field Expressions bound to Message Types
- Log Views - Filter and search logs by business identifiers (not just timestamps and correlation IDs)
- Non-Events Monitoring - Track message volumes, detect missing messages, alert on pattern anomalies
- Business Process Modeling (BPM) - Correlate transactions end-to-end across Mule flows and other integration platforms
Without proper Message Types
- Search Field extraction fails - no business data in search results
- Log Views can only filter by technical metadata
- Non-Events cannot detect missing invoices, orders, or payments
- BPM cannot correlate multi-step business processes
Naming Best Practices
Use consistent, meaningful Message Type names:
- Include domain and version:
Invoice#1.0,Order#2.0 - Use PascalCase:
CustomerUpdate#1.0(notcustomer_update_v1) - Version changes: Increment version when payload schema changes significantly
- Avoid generic names: Use
SAPPurchaseOrder#1.0instead ofMessage#1.0
Example: Setting Message Type in Mule flow
// Set Message Type in session variable before logging
sessionVars['nodiniteMessageTypeName'] = "Invoice#1.0";
Planning Message Types During Development
Message Types cannot be changed after events are logged. Plan your naming convention during development:
- Identify message types in your integration flows (orders, invoices, confirmations, etc.)
- Define naming convention for your organization
- Create Search Field Expressions for each Message Type before go-live
- Document Message Types in your integration design specifications
Context Options and Custom Properties
Context Options allow you to add custom key-value pairs to logged events for filtering, searching, and reporting.
Common Context Properties
| Property | Purpose | Example Value |
|---|---|---|
| orderId | Business transaction identifier | "ORD-12345" |
| customerId | Customer/partner identifier | "CUST-98765" |
| fileName | Source file name | "invoices_2024-11-03.xml" |
| fileSize | File size in bytes | "524288" |
| invoiceAmount | Transaction amount | "1250.00" |
| currency | Currency code | "USD" |
| isGDPRData | Contains personal data flag | "true" |
Example: Setting context properties in Groovy
sessionVars['nodiniteContextValues'] = [
orderId: flowVars['orderId'],
customerId: flowVars['customerId'],
invoiceAmount: flowVars['amount'].toString(),
currency: "USD",
isGDPRData: "true"
];
Repair and Resubmit Properties
Use context properties to enable message repair and resubmission:
sessionVars['nodiniteContextValues'] = [
orderId: flowVars['orderId'],
// Repair/resubmit metadata
originalEndpoint: "file:///drop/invoices",
retryCount: flowVars['retryCount'].toString(),
canResubmit: "true",
resubmitEndpoint: "vm://reprocess-invoice"
];
Build your Mule flows to handle resubmitted messages (check retryCount, implement idempotency, etc.).
Exception Handling and Error Logging
Add the Custom Connector to exception handlers to log error conditions with appropriate Log Status Codes:
<flow name="invoice-processing-flow">
<file:inbound-endpoint path="/drop/invoices" />
<!-- Normal processing ... -->
<catch-exception-strategy doc:name="Catch Exception Strategy">
<scripting:component doc:name="Set Error Logging Variables">
<scripting:script engine="Groovy"><![CDATA[
sessionVars['nodiniteMessageTypeName'] = "Invoice#1.0";
sessionVars['nodiniteLogText'] = "Processing failed: " + exception.getMessage();
sessionVars['nodiniteLogStatus'] = -1; // Error status
sessionVars['nodiniteEndPointDirection'] = 0;
sessionVars['nodiniteContextValues'] = [
fileName: flowVars['originalFilename'],
errorType: exception.getClass().getSimpleName(),
errorMessage: exception.getMessage(),
stackTrace: exception.getStackTrace().toString()
];
return payload;
]]></scripting:script>
</scripting:component>
<flow-ref name="nodinite-logging-subflow" doc:name="Log Error"/>
</catch-exception-strategy>
</flow>
Log Status Code Guidelines
-1(Error) - Processing failure, message rejected, exception thrown0(Success) - Normal processing, message accepted1(Information) - Audit trail, milestone reached, no action required
Correlation
Correlation links related log events across flows, systems, and time. See the Mule ESB Correlation guide for detailed correlation strategies.
Basic correlation example
// Set correlation ID at flow entry
sessionVars['correlationId'] = message.id; // or flowVars['orderId']
message.correlationId = sessionVars['correlationId'];
// Use same correlation ID for all log events in this flow
sessionVars['nodiniteContextValues'] = [
correlationId: sessionVars['correlationId']
];
Log Views and Self-Service
Create role-based Log Views to empower business users:
- Finance Team - search by
invoiceNumber,customerId,invoiceAmount - Operations Team - search by
orderId,shipmentId,warehouse - Support Team - search by
errorType,customerComplaint,ticketNumber
This reduces incidents by enabling self-service troubleshooting.
Best Practices Summary
- Start small - implement in 2-3 flows before rolling out organization-wide
- Use ActiveMQ for production (best scalability and fail-over)
- Plan Message Types during development - they cannot be changed after logging
- Create Search Field Expressions for each Message Type before go-live
- Log at key milestones - receive, transform, send, error
- Use meaningful Log Text - help users understand what happened
- Add context properties - enable business-level searching and filtering
- Implement correlation - link related events across flows and systems
- Test pickup service - verify events reach Nodinite Web Client
- Create Log Views - empower business users with self-service access
Next Steps
Add or manage Search Fields
Add or manage Log Views
Mule ESB Correlation
Pickup Log Events Service
Asynchronous Logging
Related Topics
Logging
Log Events
Message Types
Search Fields
Search Field Expressions
Log Views
Non-Events Monitoring
Business Process Modeling (BPM)
Log Status Codes
Context Options
Endpoints
Endpoint Directions
Endpoint Types[]
JSON Log Event
Log API
Log Agents
Example: Complete Mule Flow Configuration
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns:vm="http://www.mulesoft.org/schema/mule/vm"
xmlns:file="http://www.mulesoft.org/schema/mule/file"
xmlns:jms="http://www.mulesoft.org/schema/mule/jms"
xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting"
xmlns:nodinite="http://www.mulesoft.org/schema/mule/nodinite"
xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans"
version="CE-3.8.1">
<!-- Configuration -->
<nodinite:config name="Nodinite__Configuration" logAgentValueId="33" doc:name="Nodinite: Configuration"/>
<spring:beans>
<spring:bean id="ActiveMQ" class="org.apache.activemq.ActiveMQConnectionFactory">
<spring:property name="brokerURL" value="tcp://10.0.0.10:61616"/>
</spring:bean>
</spring:beans>
<jms:activemq-connector name="ActiveMQ" specification="1.1" brokerURL="tcp://10.0.0.10:61616" validateConnections="true" doc:name="Active MQ"/>
<!-- Logging Sub Flow -->
<sub-flow name="nodinite-logging-subflow">
<logger message="#[sessionVars.nodiniteLogText]" level="INFO" doc:name="Logger"/>
<set-variable variableName="payloadBeforeLogging" value="#[payload]" doc:name="Store Payload"/>
<nodinite:create-log-event config-ref="Nodinite__Configuration"
endPointName="#[flowVars.endpointName]"
endpointUri="#[flowVars.endpointUri]"
endPointDirection="#[sessionVars.nodiniteEndPointDirection]"
endPointTypeId="60"
originalMessageTypeName="#[sessionVars.nodiniteMessageTypeName]"
logStatus="#[sessionVars.nodiniteLogStatus]"
logText="#[sessionVars.nodiniteLogText]"
payload="#[payload]"
doc:name="Create JSON Log Event">
<nodinite:context-properties ref="#[sessionVars.nodiniteContextValues]"/>
</nodinite:create-log-event>
<jms:outbound-endpoint queue="nodinite.log.events" connector-ref="ActiveMQ" doc:name="Post to ActiveMQ"/>
<set-payload value="#[flowVars.payloadBeforeLogging]" doc:name="Restore Payload"/>
</sub-flow>
<!-- Main Flow -->
<flow name="nodinite-mule-flow" doc:name="INT001: Invoice Processing">
<file:inbound-endpoint path="/drop/invoices" responseTimeout="10000" doc:name="File" moveToDirectory="/processed/invoices"/>
<set-variable variableName="endpointName" value="INT001: Receive Invoices" doc:name="Set Endpoint Name"/>
<set-variable variableName="endpointUri" value="file:///drop/invoices" doc:name="Set Endpoint URI"/>
<!-- Log: File Received -->
<scripting:component doc:name="Set Logging Variables">
<scripting:script engine="Groovy"><![CDATA[
sessionVars['correlationId'] = message.id;
sessionVars['nodiniteMessageTypeName'] = "Invoice#1.0";
sessionVars['nodiniteLogText'] = "File Received";
sessionVars['nodiniteLogStatus'] = 0;
sessionVars['nodiniteEndPointDirection'] = 0;
sessionVars['nodiniteContextValues'] = [
fileName: flowVars['originalFilename'],
fileSize: payload.length.toString(),
correlationId: sessionVars['correlationId']
];
return payload;
]]></scripting:script>
</scripting:component>
<flow-ref name="nodinite-logging-subflow" doc:name="Log Receive"/>
<!-- Transform message (your business logic here) -->
<set-payload value="#['Invoice processed: ' + payload]" doc:name="Transform"/>
<!-- Log: Processing Complete -->
<scripting:component doc:name="Update Logging Variables">
<scripting:script engine="Groovy"><![CDATA[
sessionVars['nodiniteLogText'] = "Processing Complete";
sessionVars['nodiniteLogStatus'] = 0;
sessionVars['nodiniteEndPointDirection'] = 1;
sessionVars['nodiniteContextValues'] = [
fileName: flowVars['originalFilename'],
invoiceNumber: "INV-12345",
amount: "1250.00",
currency: "USD",
correlationId: sessionVars['correlationId']
];
return payload;
]]></scripting:script>
</scripting:component>
<flow-ref name="nodinite-logging-subflow" doc:name="Log Success"/>
<file:outbound-endpoint path="/processed/invoices" responseTimeout="10000" doc:name="File"/>
<!-- Exception Handler -->
<catch-exception-strategy doc:name="Catch Exception Strategy">
<scripting:component doc:name="Set Error Logging Variables">
<scripting:script engine="Groovy"><![CDATA[
sessionVars['nodiniteMessageTypeName'] = "Invoice#1.0";
sessionVars['nodiniteLogText'] = "Processing failed: " + exception.getMessage();
sessionVars['nodiniteLogStatus'] = -1;
sessionVars['nodiniteEndPointDirection'] = 0;
sessionVars['nodiniteContextValues'] = [
fileName: flowVars['originalFilename'],
errorType: exception.getClass().getSimpleName(),
errorMessage: exception.getMessage(),
correlationId: sessionVars['correlationId']
];
return payload;
]]></scripting:script>
</scripting:component>
<flow-ref name="nodinite-logging-subflow" doc:name="Log Error"/>
</catch-exception-strategy>
</flow>
</mule>


