ElasticSearch to OpenSearch Migration: Remote Reindex to Move Data

Since we cannot do an in-place upgrade of our ElasticSearch environment, I need to move everything to the new servers. The biggest component is moving the data — which can easily be done using the remote reindex. Use the ElasticSearch API to get a list of all indices, and tell the OpenSearch API to reindex that index from the ElasticSearch remote. This operates on deltas — it will add new documents to an index — so my plan is to spend a few days seeding the initial data, then perform delta updates leading up to the scheduled change.

import requests
from requests.auth import HTTPBasicAuth

f = open("results.txt", "a")

listIndexNames = []

reqGetIndexes = requests.get('https://elasticsearch.example.com:9200/_cat/indices?format=json', auth=HTTPBasicAuth('something','something'), verify=False)
for jsonIndex in reqGetIndexes.json():
        if jsonIndex.get('index')[0] != '.':
                listIndexNames.append(jsonIndex.get('index'))

for strIndexName in listIndexNames:
  jsonReindexItem = {
    "source": {
      "remote": {
        "host": "https://elasticsearch.example.com:9200",
        "username": "something",
        "password": "something"
      },
  "index": strIndexName
    },
    "dest": {
  "index": strIndexName
    }
  }

  r = requests.post('https://opensearch.example.com:9200/_reindex', json=jsonReindexItem, auth = HTTPBasicAuth('something', 'something'), verify=False)
  print(r.json())
  jsonResponse = r.json()

  if r.status_code == 400 and "mapping set to strict" in jsonResponse.get('failures')[0].get('cause').get("reason"):
    # {'error': {'root_cause': [{'type': 'x_content_parse_exception', 'reason': '[1:2] [reindex] unknown field [key]'}], 'type': 'x_content_parse_exception', 'reason': '[1:2] [reindex] unknown field [key]'}, 'status': 400}
    if jsonResponse.get('failures'):
      print(jsonResponse.get('failures')[0].get('cause').get("reason"))
      print("I need to set dynamic mapping")
      r2 = requests.put(f'https://opensearch.example.com:9200/{strIndexName}/_mapping', json={"dynamic":"true"}, auth = HTTPBasicAuth('something', 'something'), verify=False)
      print(r2.json)
      r3 = requests.post('https://opensearch.example.com:9200/_reindex', json=jsonReindexItem, auth = HTTPBasicAuth('something', 'something), verify=False)
      print(r.json())
      print(f"{strIndexName}\t{r3.status_code}\t{r.json()}\n")
      f.write(f"{strIndexName}\t{r3.status_code}\t{r.json()}\n")

  elif r.status_code == 200:
    print(jsonResponse)
    print(f"{strIndexName}\t{r.status_code}\t{r.json()}\n")
    f.write(f"{strIndexName}\t{r.status_code}\t{r.json()}\n")
  else:
    print(f"HTTP Error: {r.status_code} on web call")
    print(f"{strIndexName}\t{r.status_code}\t{r.json()}\n")
    f.write(f"{strIndexName}\t{r.status_code}\t{r.json()}\n")

f.close()

Leave a Reply

Your email address will not be published. Required fields are marked *