Remote Work Tools

Vector is a high-performance observability pipeline written in Rust. It collects logs, metrics, and traces, transforms them, and routes them to any destination. Compared to Fluentd and Logstash, it uses significantly less memory and CPU — important when running on the same hosts as your application. A single Vector process replaces multiple agents.


Install Vector

# macOS
brew install vector

# Debian/Ubuntu
curl -1sLf 'https://repositories.timber.io/public/vector/cfg/setup/bash.deb.sh' \
  | sudo bash
sudo apt install vector

# RPM (RHEL/Rocky/Alma)
curl -1sLf 'https://repositories.timber.io/public/vector/cfg/setup/bash.rpm.sh' \
  | sudo bash
sudo dnf install vector

# Docker
docker pull timberio/vector:latest-distroless-libc

Architecture: Agent vs Aggregator

Vector runs in two modes:

Agent (on each host): Collects local logs and forwards to the aggregator or directly to storage.

Aggregator (central node): Receives from all agents, applies heavy transforms, routes to destinations.

For small teams (< 20 services), run Vector directly on each host sending to a destination. For larger setups, use the agent/aggregator pattern.


Basic Configuration

Vector’s config is TOML or YAML. A complete pipeline that collects Docker logs and sends to Elasticsearch:

# /etc/vector/vector.toml

# ========================
# SOURCES
# ========================

[sources.docker_logs]
type = "docker_logs"
# Collect all container logs
include_containers = ["*"]
# Or filter:
# include_containers = ["nginx", "api", "worker"]
exclude_containers = ["vector"]  # Don't collect Vector's own logs
include_labels = {}

[sources.journald]
type = "journald"
include_units = ["nginx.service", "postgresql.service"]

[sources.http_input]
type = "http_server"
address = "0.0.0.0:8686"
encoding = "json"

# ========================
# TRANSFORMS
# ========================

# Parse JSON logs from application containers
[transforms.parse_app_logs]
type = "remap"
inputs = ["docker_logs"]
source = '''
  # Parse nested JSON in log field
  structured, err = parse_json(.message)
  if err == null {
    . = merge(., structured)
    del(.message)
  }

  # Add metadata
  .host = get_env_var!("HOSTNAME")
  .environment = get_env_var("ENVIRONMENT") ?? "production"

  # Normalize log level
  .level = downcase(string(.level) ?? "info")

  # Parse timestamp if present
  if exists(.timestamp) {
    .timestamp = parse_timestamp!(.timestamp, format: "%+")
  }
'''

# Filter out health check noise
[transforms.filter_noise]
type = "filter"
inputs = ["parse_app_logs"]
condition = '''
  !match(string(.path) ?? "", r'/health|/ping|/metrics')
'''

# Route logs by severity
[transforms.route_by_level]
type = "route"
inputs = ["filter_noise"]

[transforms.route_by_level.route]
errors = '.level == "error" || .level == "critical" || .level == "fatal"'
warnings = '.level == "warn" || .level == "warning"'
info = '.level == "info" || .level == "debug"'

# Enrich error logs with additional context
[transforms.enrich_errors]
type = "remap"
inputs = ["route_by_level.errors"]
source = '''
  .alert = true
  .requires_attention = true
'''

# ========================
# SINKS (Destinations)
# ========================

# All logs to Elasticsearch
[sinks.elasticsearch]
type = "elasticsearch"
inputs = ["filter_noise", "journald"]
endpoints = ["https://elasticsearch.yourcompany.internal:9200"]
auth.strategy = "basic"
auth.user = "vector"
auth.password = "${ES_PASSWORD}"
index = "logs-{{ .environment }}-{{ now() | strftime(\"%Y.%m.%d\") }}"
bulk.action = "create"
compression = "gzip"

[sinks.elasticsearch.buffer]
type = "disk"
max_size = 268435456  # 256MB
when_full = "block"

# Error logs to S3 for long-term retention
[sinks.s3_errors]
type = "aws_s3"
inputs = ["route_by_level.errors"]
bucket = "your-log-archive"
region = "us-east-1"
key_prefix = "errors/%Y/%m/%d/"
compression = "gzip"
encoding.codec = "json"

[sinks.s3_errors.auth]
access_key_id = "${AWS_ACCESS_KEY_ID}"
secret_access_key = "${AWS_SECRET_ACCESS_KEY}"

# Alert on critical errors via HTTP to Slack/PagerDuty webhook
[sinks.alert_errors]
type = "http"
inputs = ["enrich_errors"]
uri = "${SLACK_WEBHOOK_URL}"
method = "post"
encoding.codec = "json"
request.rate_limit_num = 10   # Max 10 alerts per second (de-duplicate)
request.rate_limit_duration_secs = 1

# Internal metrics (Vector's own performance)
[sources.vector_metrics]
type = "internal_metrics"

[sinks.prometheus_metrics]
type = "prometheus_exporter"
inputs = ["vector_metrics"]
address = "0.0.0.0:9598"

Docker Compose Deployment

# docker-compose.yml
version: "3.8"
services:
  vector:
    image: timberio/vector:latest-distroless-libc
    container_name: vector
    restart: unless-stopped
    volumes:
      - ./vector/vector.toml:/etc/vector/vector.toml:ro
      - /var/run/docker.sock:/var/run/docker.sock:ro
      - /var/log:/var/log:ro
      - vector-data:/var/lib/vector
    environment:
      - ES_PASSWORD=${ES_PASSWORD}
      - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
      - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
      - SLACK_WEBHOOK_URL=${SLACK_WEBHOOK_URL}
      - ENVIRONMENT=production
      - HOSTNAME=${HOSTNAME}
    ports:
      - "8686:8686"   # HTTP input
      - "9598:9598"   # Prometheus metrics
    # Needs Docker socket access
    group_add:
      - "999"

volumes:
  vector-data:

Aggregator Config (Centralized)

The aggregator receives from all agent nodes:

# /etc/vector/aggregator.toml (on your central log server)

[sources.from_agents]
type = "vector"
address = "0.0.0.0:6000"
# Enable TLS for production:
# tls.enabled = true
# tls.cert_file = "/etc/vector/certs/server.crt"
# tls.key_file = "/etc/vector/certs/server.key"

[transforms.deduplicate]
type = "dedupe"
inputs = ["from_agents"]
fields.match = ["message", "host", "container_name"]
cache.num_events = 5000

[sinks.loki]
type = "loki"
inputs = ["deduplicate"]
endpoint = "http://loki.yourcompany.internal:3100"
encoding.codec = "json"

[sinks.loki.labels]
level = "{{ .level }}"
env = "{{ .environment }}"
service = "{{ .container_name }}"
host = "{{ .host }}"

On each agent, forward to aggregator:

[sinks.aggregator]
type = "vector"
inputs = ["parse_app_logs", "journald"]
address = "aggregator.yourcompany.internal:6000"
compression = true

VRL (Vector Remap Language) Examples

VRL is Vector’s transformation DSL. Common patterns:

# Extract fields from a structured log line
[transforms.parse_nginx_access]
type = "remap"
inputs = ["nginx_logs"]
source = '''
  . = parse_nginx_log!(string!(.message), format: "combined")
  .bytes_sent = to_int!(.size)
  .response_time_ms = to_float!(.request_time) * 1000
'''

# Mask PII before sending to log store
[transforms.mask_pii]
type = "remap"
inputs = ["app_logs"]
source = '''
  if exists(.email) {
    .email = redact(.email, filters: ["email"])
  }
  if exists(.credit_card) {
    .credit_card = redact(.credit_card, filters: ["credit_card_number"])
  }
  if exists(.ssn) {
    del(.ssn)
  }
'''

# Add request latency percentile tagging
[transforms.tag_latency]
type = "remap"
inputs = ["api_logs"]
source = '''
  latency_ms = to_float!(.response_time_ms)
  .latency_tier = if latency_ms < 100 {
    "fast"
  } else if latency_ms < 500 {
    "normal"
  } else if latency_ms < 2000 {
    "slow"
  } else {
    "critical"
  }
'''

Monitoring Vector Itself

# Check Vector status
systemctl status vector

# Watch the API (enable api.enabled = true in config)
curl http://localhost:8686/health
curl http://localhost:8686/components

# Scrape Prometheus metrics
curl http://localhost:9598/metrics \
  | grep -E "vector_(events|errors|processed)"

# Real-time component stats
vector top  # requires vector CLI