Remote Work Tools

Fluentd is the CNCF log aggregation standard. It collects logs from dozens of sources (Docker, syslog, application files, Kubernetes), transforms and filters them, then routes to one or more destinations. For remote teams with multiple services across multiple hosts, centralized logging is the difference between having observable systems and debugging in the dark.

When an incident happens at 3 AM in a different time zone, the on-call engineer needs logs available in a single search interface — not scattered across individual container filesystems that require SSH access to read. Fluentd handles the collection layer so engineers can focus on investigation rather than log retrieval.


Install Fluentd

On Debian/Ubuntu (using td-agent, the stable distribution):

curl -fsSL https://toolbelt.treasuredata.com/sh/install-ubuntu-focal-td-agent4.sh | sh
systemctl enable td-agent
systemctl start td-agent

Docker (recommended for containerized infrastructure):

docker pull fluent/fluentd:v1.17-1

Basic Configuration Structure

Fluentd config lives at /etc/td-agent/td-agent.conf. It has four directive types:

A minimal config that reads Docker logs and sends to Elasticsearch:

# /etc/td-agent/td-agent.conf

# =====================
# SOURCES
# =====================

# Collect Docker container logs
<source>
  @type tail
  @id docker_logs
  path /var/lib/docker/containers/*/*.log
  pos_file /var/log/td-agent/docker.log.pos
  tag docker.*
  read_from_head true
  <parse>
    @type json
    time_format %Y-%m-%dT%H:%M:%S.%NZ
  </parse>
</source>

# Collect syslog
<source>
  @type syslog
  @id syslog
  port 5140
  bind 0.0.0.0
  tag system
  <transport udp>
  </transport>
</source>

# Collect application logs via HTTP API
<source>
  @type http
  @id http_input
  port 8888
  bind 0.0.0.0
  body_size_limit 32m
  keepalive_timeout 10
</source>

# =====================
# FILTERS
# =====================

# Parse Docker log metadata
<filter docker.**>
  @type record_transformer
  <record>
    container_id ${tag_suffix[4]}
    host "#{Socket.gethostname}"
    environment "#{ENV['ENVIRONMENT'] || 'production'}"
  </record>
</filter>

# Drop health check noise
<filter docker.**>
  @type grep
  <exclude>
    key log
    pattern /GET \/health/
  </exclude>
</filter>

# Parse JSON application logs
<filter docker.**>
  @type parser
  key_name log
  reserve_data true
  <parse>
    @type json
    json_parser json
  </parse>
</filter>

# =====================
# OUTPUT
# =====================

<match docker.**>
  @type elasticsearch
  @id elasticsearch_out
  host elasticsearch.yourcompany.internal
  port 9200
  user elastic
  password "#{ENV['ES_PASSWORD']}"
  logstash_format true
  logstash_prefix docker
  logstash_dateformat %Y.%m.%d
  flush_interval 10s
  <buffer>
    @type file
    path /var/log/td-agent/buffer/elasticsearch
    flush_mode interval
    flush_interval 10s
    chunk_limit_size 8m
    queue_limit_length 128
    retry_max_times 10
    retry_wait 30s
    retry_exponential_backoff_base 2
    overflow_action block
  </buffer>
</match>

Docker Compose Deployment

For a full logging stack with Docker:

# docker-compose.yml
version: "3.8"

services:
  fluentd:
    build:
      context: ./fluentd
      dockerfile: Dockerfile
    container_name: fluentd
    volumes:
      - ./fluentd/conf:/fluentd/etc
      - /var/lib/docker/containers:/var/lib/docker/containers:ro
      - fluentd-pos:/fluentd/pos
      - fluentd-buffer:/fluentd/buffer
    ports:
      - "24224:24224"     # Fluentd forward protocol
      - "24224:24224/udp"
      - "8888:8888"       # HTTP input
    environment:
      - ES_PASSWORD=${ES_PASSWORD}
      - ENVIRONMENT=production
    restart: unless-stopped
    networks:
      - logging

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.13.0
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
      - ES_JAVA_OPTS=-Xms512m -Xmx512m
    volumes:
      - es-data:/usr/share/elasticsearch/data
    networks:
      - logging

  kibana:
    image: docker.elastic.co/kibana/kibana:8.13.0
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
    ports:
      - "5601:5601"
    depends_on:
      - elasticsearch
    networks:
      - logging

networks:
  logging:

volumes:
  es-data:
  fluentd-pos:
  fluentd-buffer:

Fluentd Dockerfile with required plugins:

# fluentd/Dockerfile
FROM fluent/fluentd:v1.17-1

USER root

# Install plugins
RUN gem install \
    fluent-plugin-elasticsearch \
    fluent-plugin-s3 \
    fluent-plugin-rewrite-tag-filter \
    fluent-plugin-record-modifier

USER fluent

Configure Applications to Send Logs to Fluentd

Applications send logs to the Fluentd container via the forward protocol:

# In your app's docker-compose.yml
services:
  api:
    image: yourcompany/api:latest
    logging:
      driver: fluentd
      options:
        fluentd-address: "fluentd:24224"
        fluentd-async: "true"
        tag: "{{.Name}}"

From application code (Go):

import (
    "github.com/fluent/fluent-logger-golang/fluent"
)

func main() {
    logger, err := fluent.New(fluent.Config{
        FluentHost: "fluentd",
        FluentPort: 24224,
        TagPrefix:  "app",
    })
    if err != nil {
        panic(err)
    }
    defer logger.Close()

    // Log structured event
    logger.Post("payments", map[string]interface{}{
        "user_id":    userID,
        "amount":     amount,
        "currency":   currency,
        "request_id": requestID,
        "level":      "info",
        "message":    "payment processed",
    })
}

From Python using the structured logging approach:

import fluent.sender
import fluent.event

fluent.event.setup('app', host='fluentd', port=24224)

# Emit a structured event
fluent.event.Event('payments', {
    'user_id': user_id,
    'amount': amount,
    'currency': currency,
    'request_id': request_id,
    'level': 'info',
    'message': 'payment processed',
})

The key discipline for remote teams: always emit structured JSON rather than plain text strings. Structured logs make Fluentd filters and Kibana queries far more powerful. A log line like "payment processed for user 1234 amount 99.99" is hard to aggregate; a structured record with discrete fields is trivially filterable by user, amount range, or currency.


Route Logs to S3 for Long-Term Storage

# Add to td-agent.conf

<match **>
  @type s3
  @id s3_archive
  aws_key_id "#{ENV['AWS_ACCESS_KEY_ID']}"
  aws_sec_key "#{ENV['AWS_SECRET_ACCESS_KEY']}"
  s3_bucket your-log-archive-bucket
  s3_region us-east-1
  path logs/%Y/%m/%d/
  s3_object_key_format %{path}%{time_slice}_%{index}.%{file_extension}
  time_slice_format %Y%m%d_%H
  time_slice_wait 10m
  store_as gzip
  <format>
    @type json
  </format>
  <buffer time>
    @type file
    path /fluentd/buffer/s3
    timekey 3600          # 1 hour chunks
    timekey_wait 10m      # Wait 10 minutes before flushing to allow late records
    chunk_limit_size 256m
  </buffer>
</match>

For cost management, use S3 lifecycle rules alongside the Fluentd S3 output. Logs older than 30 days can transition to S3 Glacier for long-term compliance storage at a fraction of the cost:

{
  "Rules": [
    {
      "ID": "log-archive-lifecycle",
      "Status": "Enabled",
      "Filter": { "Prefix": "logs/" },
      "Transitions": [
        { "Days": 30, "StorageClass": "STANDARD_IA" },
        { "Days": 90, "StorageClass": "GLACIER" }
      ],
      "Expiration": { "Days": 365 }
    }
  ]
}

Apply this with the AWS CLI:

aws s3api put-bucket-lifecycle-configuration \
  --bucket your-log-archive-bucket \
  --lifecycle-configuration file://lifecycle.json

Multi-Output Routing with Labels

Send logs to different destinations based on content:

<match docker.**>
  @type relabel
  @label @routing
</match>

<label @routing>
  # Security events → dedicated security log store
  <match **>
    @type rewrite_tag_filter
    <rule>
      key level
      pattern /SECURITY|AUDIT/
      tag security.${tag}
    </rule>
    <rule>
      key $.http_status
      pattern /^5/
      tag errors.${tag}
    </rule>
  </match>

  <match security.**>
    @type elasticsearch
    host security-elasticsearch.yourcompany.internal
    port 9200
    logstash_prefix security
    logstash_format true
  </match>

  <match errors.**>
    @type copy
    <store>
      @type elasticsearch
      host elasticsearch.yourcompany.internal
      port 9200
      logstash_prefix errors
    </store>
    <store>
      @type slack
      webhook_url "#{ENV['SLACK_WEBHOOK']}"
      channel "#alerts"
      message_keys level,message,container_id
      title "Application Error"
    </store>
  </match>

  <match **>
    @type elasticsearch
    host elasticsearch.yourcompany.internal
    port 9200
    logstash_prefix app
    logstash_format true
  </match>
</label>

The Slack store in the errors match is valuable for remote teams: 5xx errors get posted to #alerts immediately without anyone watching a dashboard. Engineers working in different time zones can respond to errors without polling Kibana continuously.


Sampling High-Volume Logs

In production, some services emit enormous log volumes. Sampling reduces costs without losing visibility into errors:

<filter app.high_volume_service.**>
  @type sampling
  sample_unit second
  # Keep 1 in 10 info logs; keep all warnings and errors
  interval 10
</filter>

# Override: always keep errors regardless of sampling
<filter app.high_volume_service.**>
  @type grep
  <regexp>
    key level
    pattern /error|warn|SECURITY/
  </regexp>
</filter>

A two-stage filter approach: sample down info logs first, then let the grep filter rescue errors from the sampling decision. This avoids dropping critical logs while still reducing index size by 80-90% on noisy services.


Adding Trace Correlation Fields

Remote debugging across microservices is much faster when every log record carries a trace ID that links related requests. Inject the trace ID at the Fluentd filter layer so it applies uniformly even to services that don’t emit it natively:

<filter app.**>
  @type record_transformer
  enable_ruby true
  <record>
    # Pass X-Request-ID through from the log record if present,
    # otherwise generate a placeholder for correlation queries
    trace_id ${record["request_id"] || record["x_request_id"] || "no-trace"}
    service_name ${tag_parts[1]}
    datacenter "#{ENV['DATACENTER'] || 'us-east-1'}"
    log_version "1"
  </record>
</filter>

With trace_id present on every record, a Kibana query like trace_id:"req-abc-123" retrieves the full request path across all services — API gateway, auth service, payments service — without manually correlating timestamps.


Monitor Fluentd Health

# Check Fluentd status
curl http://localhost:24220/api/config.json | jq '.'

# Metrics endpoint
curl http://localhost:24220/api/plugins.json | jq '.'

# Check buffer queue depth (high = Elasticsearch can't keep up)
curl http://localhost:24220/api/plugins.json \
  | jq '.plugins[] | select(.plugin_id | contains("elasticsearch")) | {buffer_queue_length, retry_count}'

Alert on buffer queue depth exceeding a threshold — it means Fluentd is accumulating logs faster than the output can accept them. Left unaddressed, the buffer fills, the overflow_action block directive causes Fluentd to apply backpressure on input, and eventually logs are dropped or Docker log drivers start timing out. Catching this early prevents a silent data loss situation:

#!/bin/bash
# scripts/check-fluentd-buffer.sh
QUEUE=$(curl -s http://localhost:24220/api/plugins.json \
  | jq '.plugins[] | select(.plugin_id | contains("elasticsearch")) | .buffer_queue_length' \
  | head -1)

if [[ "$QUEUE" -gt 50 ]]; then
  curl -s -X POST "$SLACK_WEBHOOK_URL" \
    -H "Content-Type: application/json" \
    -d "{\"text\": \":warning: Fluentd buffer queue depth is $QUEUE on $(hostname) — check Elasticsearch health\"}"
fi