Integrations running in production without structured logging are flying blind. When something breaks at 2am, 'check the logs' needs to be a solution, not a prayer. Unstructured logs cannot be reliably parsed, indexed, or aggregated. Every investigation becomes a regex exercise. Every incident becomes a tab-switching contest between log files with different formats and no shared identifier to correlate them.
The JSON Logger connector
The JSON Logger is an open-source MuleSoft connector published to your organisation's Anypoint Exchange. Once published, you add it to any project from Anypoint Studio's Exchange panel and it appears in the palette. From that point it works like any other connector: drag it onto the canvas, configure the trace point, point it at the payload, and it produces structured JSON output via Log4j2.
Clone from https://github.com/mulesoft-consulting/json-logger, update the groupId in pom.xml with your Anypoint org ID, then deploy to Exchange:
./deploy-to-exchange.sh <your-org-id>Add it to your project's pom.xml. Better yet, manage the version in your parent POM so all teams inherit it centrally:
<dependency>
<groupId>${orgId}</groupId>
<artifactId>json-logger</artifactId>
<version>2.0.1</version>
<classifier>mule-plugin</classifier>
</dependency>Step 1: Configure the global element
The JSON Logger requires a global config element that sets application-level defaults. Set prettyPrint to false in all environments: pretty-printed JSON breaks Log4j2 pattern layouts and makes Elasticsearch parsing unreliable.
<json-logger:config
name="JSON_Logger_Config"
applicationName="${app.name}"
environment="${mule.env}"
prettyPrint="false"
logLocationInfo="true"/>Step 2: The four trace points
Trace points are a built-in concept in the JSON Logger. Each is a named configuration option in the connector's Trace Point dropdown. They give structure and meaning to where in a flow's lifecycle a log entry was produced.
When: First connector after the HTTP Listener, before any routing or business logic.
Captures: HTTP method, request path, inbound headers (sanitise: do not log Authorization values), query parameters, client IP, correlationId, timestamp.
When: Immediately before any HTTP Request connector or downstream call.
Captures: Target system name, endpoint path, HTTP method, outbound request payload (sanitised), correlationId forwarded as X-Correlation-ID.
When: Immediately after the HTTP Request returns, before any transformation.
Captures: HTTP status received, response payload (sanitised), duration in milliseconds since BEFORE_REQUEST, downstream system identifier, error if failed.
When: Last connector before the flow exits. Also in the error handler sub-flow for failed transactions.
Captures: Final HTTP status returned to caller, total flow duration since START, flow name, application name, success or error outcome.
Every log entry produced by the JSON Logger automatically includes the correlationId. This single field makes cross-layer tracing possible. A request that passes through an Experience API, a Process API, and two System APIs can be reconstructed in full from four correlated log streams.
Step 3: Connector placement in your flows
<flow name="orders-exp-api-main">
<http:listener config-ref="httpListenerConfig" path="${http.api.path}"/>
<!-- 1. START: first thing after the listener -->
<json-logger:logger config-ref="JSON_Logger_Config"
tracePoint="START" priority="INFO"
message="Incoming request" payload="#[attributes]"
category="${app.name}"/>
<set-variable variableName="startTime"
value="#[now() as Number {unit: 'milliseconds'}]"/>
<apikit:router config-ref="orders-exp-api-config"/>
<error-handler ref="global-error-handler"/>
</flow>
<flow name="get:\orders\(orderId):orders-exp-api-config">
<!-- 2. BEFORE_REQUEST: just before calling downstream -->
<set-variable variableName="downstreamStartTime"
value="#[now() as Number {unit: 'milliseconds'}]"/>
<json-logger:logger config-ref="JSON_Logger_Config"
tracePoint="BEFORE_REQUEST" priority="INFO"
message="Calling orders-prc-api" payload="#[payload]"
category="${app.name}"/>
<http:request config-ref="prcApiConfig" method="GET" path="/orders/#[vars.orderId]">
<http:headers>#[output application/java --- { 'X-Correlation-ID': correlationId }]</http:headers>
</http:request>
<!-- 3. AFTER_REQUEST: immediately after the downstream response -->
<json-logger:logger config-ref="JSON_Logger_Config"
tracePoint="AFTER_REQUEST" priority="INFO"
message="Response from orders-prc-api" payload="#[payload]"
elapsed="#[(now() as Number {unit: 'milliseconds'}) - vars.downstreamStartTime]"
category="${app.name}"/>
<!-- ... transform and business logic ... -->
<!-- 4. END: last step before the flow exits -->
<json-logger:logger config-ref="JSON_Logger_Config"
tracePoint="END" priority="INFO"
message="Request completed successfully" payload="#[payload]"
elapsed="#[(now() as Number {unit: 'milliseconds'}) - vars.startTime]"
category="${app.name}"/>
</flow>The elapsed field on AFTER_REQUEST and END is what powers latency dashboards in Kibana. Always pass the delta from the matching start time variable.
Every failed transaction should also emit an END trace point; place it inside the global error handler sub-flow with priority="ERROR" to ensure every transaction has a complete log trail from START to END.
Step 4: Configure the Log4j2 HTTP appender
The JSON Logger outputs through Log4j2. The HTTP Appender sends entries directly to Elasticsearch's bulk API with no Filebeat agent required. This works on both on-premises runtimes and CloudHub. The Async wrapper is not optional: without it, every log write blocks the flow thread until Elasticsearch acknowledges the HTTP call.
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN" monitorInterval="60">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%-5p %d [%t] %c: %m%n"/>
</Console>
<Http name="Elasticsearch"
url="${sys:elk.url}/_bulk"
method="POST"
connectTimeoutMillis="5000"
readTimeoutMillis="5000">
<Property name="Authorization" value="ApiKey ${sys:elk.apikey}"/>
<Property name="Content-Type" value="application/x-ndjson"/>
<PatternLayout>
<Pattern>{"index":{"_index":"mule-logs-${sys:mule.env}-{date:yyyy.MM.dd{'}'}"}} %m </Pattern>
</PatternLayout>
</Http>
<!-- Async wrapper: decouple log writes from flow execution -->
<Async name="AsyncElasticsearch" includeLocation="false" bufferSize="2048">
<AppenderRef ref="Elasticsearch"/>
</Async>
</Appenders>
<Loggers>
<AsyncLogger name="com.mulesoft.mule.runtime.core" level="WARN" additivity="false">
<AppenderRef ref="Console"/>
</AsyncLogger>
<AsyncLogger name="${sys:app.name}" level="INFO" additivity="false">
<AppenderRef ref="Console"/>
<AppenderRef ref="AsyncElasticsearch"/>
</AsyncLogger>
<Root level="WARN"><AppenderRef ref="Console"/></Root>
</Loggers>
</Configuration>Pass credentials as JVM system properties. Never hardcode them in log4j2.xml. On CloudHub store them as secure properties in Runtime Manager. On-premises pass as JVM arguments: -Delk.url=https://my-cluster.es.io:9243 -Delk.apikey=base64EncodedKey==
Step 5: Create the Elasticsearch index template
Create the index template before logs start flowing. Without explicit field mappings, Elasticsearch infers types incorrectly for fields like tracePoint and correlationId, which need keyword type for exact-match filtering in Kibana. Set content (the logged payload) to enabled: false, which stores the payload without indexing every nested field, keeping the index lean.
PUT _index_template/mule-logs
{
"index_patterns": ["mule-logs-*"],
"template": {
"settings": { "number_of_shards": 1, "number_of_replicas": 1 },
"mappings": {
"properties": {
"tracePoint": { "type": "keyword" },
"timestamp": { "type": "date", "format": "strict_date_time" },
"priority": { "type": "keyword" },
"correlationId": { "type": "keyword" },
"applicationName": { "type": "keyword" },
"flowName": { "type": "keyword" },
"environment": { "type": "keyword" },
"message": { "type": "text" },
"elapsed": { "type": "long" },
"content": { "type": "object", "enabled": false }
}
}
}
}Step 6: Build your Kibana dashboards
Create the Kibana data view against the mule-logs-* pattern with timestamp as the time field. The four trace points enable dashboards that are impossible with unstructured logging:
- Full transaction replay: filter by
correlationId. Kibana returns four documents in timeline order: the full journey of a single request across every API layer. - Error rate by application:
tracePoint:END AND priority:ERROR, grouped byapplicationName. - Downstream latency by target:
tracePoint:AFTER_REQUEST, aggregate theelapsedfield by target. Identify your slowest dependencies in real time. - Error type distribution:
priority:ERROR, group by theerrorTypefield. See whether failures are connectivity issues, authentication failures, or business validation errors.
Set a Kibana alert when the ratio of tracePoint:END AND priority:ERROR to total tracePoint:END exceeds 5% over any 5-minute window in the prod environment. Low noise, high signal.
Step 7: Propagate the correlation ID across API layers
For cross-layer tracing to work, the correlationId must travel with every request from Experience API through every Process and System API. Every outbound call must include it as the X-Correlation-ID header. The receiving API reads that header and sets it before firing its START trace point.
<!-- Sending downstream -->
<http:request config-ref="prcApiConfig" method="GET" path="/orders/#[vars.orderId]">
<http:headers>#[output application/java --- { 'X-Correlation-ID': correlationId }]</http:headers>
</http:request>
<!-- Reading the incoming header (alternative: use listener's propagate strategy) -->
<set-variable variableName="traceId"
value="#[attributes.headers.'x-correlation-id' default correlationId]"/>Step 8: Sanitise payloads before logging
The JSON Logger's payload field accepts any DataWeave expression. Before passing a payload, sanitise it. GDPR, PCI-DSS, and basic security hygiene all require sensitive data never reaches a log store. Write a reusable DataWeave function in src/main/resources/dwl/sanitise.dwl:
// dwl/sanitise.dwl
%dw 2.0
fun sanitise(data: Object, redact: Array<String> = [
'password', 'token', 'authorization', 'clientSecret',
'creditCardNumber', 'cvv', 'ssn', 'accountNumber', 'iban']) =
data mapObject ((value, key) ->
if (redact contains lower(key as String))
{ (key): '***REDACTED***' }
else if (typeOf(value) ~= 'Object')
{ (key): sanitise(value, redact) }
else
{ (key): value }
)What you have built
A logging strategy is only as good as its consistency. One team following this approach gains observability. An entire organisation following it gains a unified view of every transaction, across every API, searchable in a single Kibana interface. When an incident is raised, your investigation starts with a correlationId search in Kibana, not a guessing game across disconnected log files. From START to END, across every API layer, the full story is there.
The Ampleshift delivery framework ships with the JSON Logger pre-configured as a versioned Exchange asset, the log4j2.xml appender ready to activate, and the Kibana index template already in place.
