Aggregate document fields with transform and scripted metric

I have a need to merge/aggregate multiple docs with same key into a single one.

The solutions that I explored for doing it outside of open search before publishing to it doesn’t fit very well in my use case.

My plan is to use index transform with a scripted metric to do that aggregation.
I would like some feedback on this approach.

I’m mostly worried about scaling issues, how this approach will perform in a high throughput scenario. I’m also open to other alternative ideas.

Here are the details:

Suppose I have 3 input documents

{
  keyAttrribute: "aKey",
  attributeA: "valueA"
}
{
  keyAttrribute: "aKey",
  attributeB: "valueB"
}
{
  keyAttrribute: "aKey",
  attributeB: "valueB"
}

The output I want is

{
  keyAttrribute: "aKey",
  attributeA: "valueA"
  attributeB: "valueB"
  attributeC: "valueC"
}

So my idea is to send the 3 input documents to an input_index and create a transform job that runs frequently. The transform will group by keyAttrribute and aggregate it with a scripted_metric

The transform job would be like this

{
  "transform": {
  "enabled": true,
  "schedule": {
      "interval": {
      "period": 1,
      "unit": "Minutes",
      "start_time": 1602100553
    }
  },
  "description": "Sample transform job",
  "source_index": "input_index",
  "target_index": "output_index",
  "data_selection_query": {
    "match_all": {}
  },
  "page_size": 500,
  "groups": [
    {
      "terms": {
        "source_field": "keyAttrribute.keyword",
        "target_field": "keyAttrribute"
      }
    }
  ],
  "aggregations": {
    "output": {
      "scripted_metric": {
        "init_script": "state.docs = []",
          "map_script": "state.docs.add(new HashMap(params['_source']))",
          "combine_script": "return state.docs",
          "reduce_script": """
          def ret = new HashMap(); 
          for (s in states) {
            for (d in s) {
              for (entry in d.entrySet()) {
                def key = entry.getKey();
                def value = entry.getValue();
                ret[key] = value;  
              }
            }
          }
          return ret
          """
      }
    }
  }
 }
}

I did a few tests with few records and it seems to work. But since I don’t know much about internals of transform and scripted_metric, I wonder how this will scale

Each group will have ~3 to 5 documents with same key to aggregate. I’ll add/update documents in the input_index at ~1k updates/sec (in batches of hundreds) and I expect to have ~500 million different keys (i.e. between 1.5B to 2.5B documents) in the input_index (that will transform to 500million documents in the output_index.

Would it require too powerful or too many nodes to handle that traffic? If that traffic increases, what resources will need to be increased? Any ideas on how to optimize the job ?

Hi @leandrodvd, I am facing a very similar use case for transforms and scripted metrics, and have the same questions about scalability and resource requirements. Have you, by any chance, found any new info on that?