原文地址: https://www.elastic.co/guide/en/elasticsearch/reference/7.7/transform-examples.html, 原文档版权归 www.elastic.co 所有

Transform examplesedit

These examples demonstrate how to use transforms to derive useful insights from your data. All the examples use one of the Kibana sample datasets. For a more detailed, step-by-step example, see Tutorial: Transforming the eCommerce sample data.

Finding your best customersedit

In this example, we use the eCommerce orders sample dataset to find the customers who spent the most in our hypothetical webshop. Let’s transform the data such that the destination index contains the number of orders, the total price of the orders, the amount of unique products and the average price per order, and the total amount of ordered products for each customer.

POST _transform/_preview
{
  "source": {
    "index": "kibana_sample_data_ecommerce"
  },
  "dest" : { 
    "index" : "sample_ecommerce_orders_by_customer"
  },
  "pivot": {
    "group_by": { 
      "user": { "terms": { "field": "user" }},
      "customer_id": { "terms": { "field": "customer_id" }}
    },
    "aggregations": {
      "order_count": { "value_count": { "field": "order_id" }},
      "total_order_amt": { "sum": { "field": "taxful_total_price" }},
      "avg_amt_per_order": { "avg": { "field": "taxful_total_price" }},
      "avg_unique_products_per_order": { "avg": { "field": "total_unique_products" }},
      "total_unique_products": { "cardinality": { "field": "products.product_id" }}
    }
  }
}

This is the destination index for the transform. It is ignored by _preview.

Two group_by fields have been selected. This means the transform will contain a unique row per user and customer_id combination. Within this dataset both these fields are unique. By including both in the transform it gives more context to the final results.

In the example above, condensed JSON formatting has been used for easier readability of the pivot object.

The preview transforms API enables you to see the layout of the transform in advance, populated with some sample values. For example:

{
  "preview" : [
    {
      "total_order_amt" : 3946.9765625,
      "order_count" : 59.0,
      "total_unique_products" : 116.0,
      "avg_unique_products_per_order" : 2.0,
      "customer_id" : "10",
      "user" : "recip",
      "avg_amt_per_order" : 66.89790783898304
    },
    ...
    ]
  }

This transform makes it easier to answer questions such as:

  • Which customers spend the most?
  • Which customers spend the most per order?
  • Which customers order most often?
  • Which customers ordered the least number of different products?

It’s possible to answer these questions using aggregations alone, however transforms allow us to persist this data as a customer centric index. This enables us to analyze data at scale and gives more flexibility to explore and navigate data from a customer centric perspective. In some cases, it can even make creating visualizations much simpler.

Finding air carriers with the most delaysedit

In this example, we use the Flights sample dataset to find out which air carrier had the most delays. First, we filter the source data such that it excludes all the cancelled flights by using a query filter. Then we transform the data to contain the distinct number of flights, the sum of delayed minutes, and the sum of the flight minutes by air carrier. Finally, we use a bucket_script to determine what percentage of the flight time was actually delay.

POST _transform/_preview
{
  "source": {
    "index": "kibana_sample_data_flights",
    "query": { 
      "bool": {
        "filter": [
          { "term":  { "Cancelled": false } }
        ]
      }
    }
  },
  "dest" : { 
    "index" : "sample_flight_delays_by_carrier"
  },
  "pivot": {
    "group_by": { 
      "carrier": { "terms": { "field": "Carrier" }}
    },
    "aggregations": {
      "flights_count": { "value_count": { "field": "FlightNum" }},
      "delay_mins_total": { "sum": { "field": "FlightDelayMin" }},
      "flight_mins_total": { "sum": { "field": "FlightTimeMin" }},
      "delay_time_percentage": { 
        "bucket_script": {
          "buckets_path": {
            "delay_time": "delay_mins_total.value",
            "flight_time": "flight_mins_total.value"
          },
          "script": "(params.delay_time / params.flight_time) * 100"
        }
      }
    }
  }
}

Filter the source data to select only flights that were not cancelled.

This is the destination index for the transform. It is ignored by _preview.

The data is grouped by the Carrier field which contains the airline name.

This bucket_script performs calculations on the results that are returned by the aggregation. In this particular example, it calculates what percentage of travel time was taken up by delays.

The preview shows you that the new index would contain data like this for each carrier:

{
  "preview" : [
    {
      "carrier" : "ES-Air",
      "flights_count" : 2802.0,
      "flight_mins_total" : 1436927.5130677223,
      "delay_time_percentage" : 9.335543983955839,
      "delay_mins_total" : 134145.0
    },
    ...
  ]
}

This transform makes it easier to answer questions such as:

  • Which air carrier has the most delays as a percentage of flight time?

This data is fictional and does not reflect actual delays or flight stats for any of the featured destination or origin airports.

Finding suspicious client IPsedit

In this example, we use the web log sample dataset to identify suspicious client IPs. We transform the data such that the new index contains the sum of bytes and the number of distinct URLs, agents, incoming requests by location, and geographic destinations for each client IP. We also use filter aggregations to count the specific types of HTTP responses that each client IP receives. Ultimately, the example below transforms web log data into an entity centric index where the entity is clientip.

PUT _transform/suspicious_client_ips
{
  "source": {
    "index": "kibana_sample_data_logs"
  },
  "dest" : { 
    "index" : "sample_weblogs_by_clientip"
  },
  "sync" : { 
    "time": {
      "field": "timestamp",
      "delay": "60s"
    }
  },
  "pivot": {
    "group_by": {  
      "clientip": { "terms": { "field": "clientip" } }
      },
    "aggregations": {
      "url_dc": { "cardinality": { "field": "url.keyword" }},
      "bytes_sum": { "sum": { "field": "bytes" }},
      "geo.src_dc": { "cardinality": { "field": "geo.src" }},
      "agent_dc": { "cardinality": { "field": "agent.keyword" }},
      "geo.dest_dc": { "cardinality": { "field": "geo.dest" }},
      "responses.total": { "value_count": { "field": "timestamp" }},
      "success" : { 
         "filter": {
            "term": { "response" : "200"}}
        },
      "error404" : {
         "filter": {
            "term": { "response" : "404"}}
        },
      "error503" : {
         "filter": {
            "term": { "response" : "503"}}
        },
      "timestamp.min": { "min": { "field": "timestamp" }},
      "timestamp.max": { "max": { "field": "timestamp" }},
      "timestamp.duration_ms": { 
        "bucket_script": {
          "buckets_path": {
            "min_time": "timestamp.min.value",
            "max_time": "timestamp.max.value"
          },
          "script": "(params.max_time - params.min_time)"
        }
      }
    }
  }
}

This is the destination index for the transform.

Configures the transform to run continuously. It uses the timestamp field to synchronize the source and destination indices. The worst case ingestion delay is 60 seconds.

The data is grouped by the clientip field.

Filter aggregation that counts the occurrences of successful (200) responses in the response field. The following two aggregations (error404 and error503) count the error responses by error codes.

This bucket_script calculates the duration of the clientip access based on the results of the aggregation.

After you create the transform, you must start it:

POST _transform/suspicious_client_ips/_start

Shortly thereafter, the first results should be available in the destination index:

GET sample_weblogs_by_clientip/_search

The search result shows you data like this for each client IP:

    "hits" : [
      {
        "_index" : "sample_weblogs_by_clientip",
        "_id" : "MOeHH_cUL5urmartKj-b5UQAAAAAAAAA",
        "_score" : 1.0,
        "_source" : {
          "geo" : {
            "src_dc" : 2.0,
            "dest_dc" : 2.0
          },
          "success" : 2,
          "error404" : 0,
          "error503" : 0,
          "clientip" : "0.72.176.46",
          "agent_dc" : 2.0,
          "bytes_sum" : 4422.0,
          "responses" : {
            "total" : 2.0
          },
          "url_dc" : 2.0,
          "timestamp" : {
            "duration_ms" : 5.2191698E8,
            "min" : "2020-03-16T07:51:57.333Z",
            "max" : "2020-03-22T08:50:34.313Z"
          }
        }
      }
    ]

Like other Kibana sample data sets, the web log sample dataset contains timestamps relative to when you installed it, including timestamps in the future. The continuous transform will pick up the data points once they are in the past. If you installed the web log sample dataset some time ago, you can uninstall and reinstall it and the timestamps will change.

This transform makes it easier to answer questions such as:

  • Which client IPs are transferring the most amounts of data?
  • Which client IPs are interacting with a high number of different URLs?
  • Which client IPs have high error rates?
  • Which client IPs are interacting with a high number of destination countries?