Tag: ElasticSearch

ElasticSearch to OpenSearch Migration: Creating Tenants

Finally, create the tenants … we’re using OAUTH for Kibana authentication, so I wasn’t able to use the API to export “saved objects”. Fortunately, we don’t have many tenants … and exporting/importing those saved objects manually isn’t an onerous task.

import requests
from requests.auth import HTTPBasicAuth

def createTenant(strTenantName, strDescription):
        jsonAddTenant = {  "description": strDescription }
        r2 = requests.put(f"https://opensearch.example.com:9200/_opendistro/_security/api/tenants/{strTenantName}", json=jsonAddTenant, auth = HTTPBasicAuth('something', 'something'), verify=False)
        print(r2.text)
        print(r2.status_code)

#  Get all tenants from ES
r = requests.get(f"https://elasticsearch.example.com:9200/_opendistro/_security/api/tenants", auth = HTTPBasicAuth('something', 'something'), verify=False)

dictAllTenants = r.json()

for item in dictAllTenants.items():
        if item[1].get('reserved') == False:
                createTenant(item[0], item[1].get('description'))

ElasticSearch to OpenSearch Migration: Lifecycle Management Policies

Since there are a lot of changes in how lifecycle policies work between ElasticSearch and OpenSearch, the recommendation I’ve seen is to manually create them … but it’s a lot of repetitive typing, so I used a script to create a base policy — a name with a a hot allocation — and manually added all of the remaining stages, transitions, and index patterns to which the policy should be applied.

import requests
from requests.auth import HTTPBasicAuth
import json
from time import sleep
from datetime import timedelta

f = open("data-LifecyclePolicies.txt", "w")

listIgnoredILMPolicies = ["watch-history-ilm-policy"]

# Get all roles from prod & list users in those roles
r = requests.get(f"https://elasticsearch.example.com:9200/_ilm/policy", auth = HTTPBasicAuth('something', 'something'), verify=False)

dictAllILMPolicies= r.json()

for item in dictAllILMPolicies.items():
        if item[0] not in listIgnoredILMPolicies:
                strILMPolicyName = item[0]
                dictILMPolicySettings = item[1]
                iHotDays = None
                iWarmDays = None
                iColdDays = None
                iDeleteDays = None
                if item[1].get('policy').get('phases').get('hot'):
                        iHotDays = (item[1].get('policy').get('phases').get('hot').get('min_age'))
                if item[1].get('policy').get('phases').get('warm'):
                        iWarmDays = (item[1].get('policy').get('phases').get('warm').get('min_age'))
                if item[1].get('policy').get('phases').get('cold'):
                        iColdDays = (item[1].get('policy').get('phases').get('cold').get('min_age'))
                if item[1].get('policy').get('phases').get('delete'):
                        iDeleteDays = (item[1].get('policy').get('phases').get('delete').get('min_age'))
                print(f"Policy named {strILMPolicyName} has phases:")
                print(f"\tHot {iHotDays}")
                print(f"\tWarm {iWarmDays}")
                print(f"\tCold {iColdDays}")
                print(f"\tDelete {iDeleteDays}")
                print("\n")

                f.write(f"Policy named {strILMPolicyName} has phases:\n")
                f.write(f"\tHot {iHotDays}\n")
                f.write(f"\tWarm {iWarmDays}\n")
                f.write(f"\tCold {iColdDays}\n")
                f.write(f"\tDelete {iDeleteDays}\n")
                f.write("\n")
                jsonILMPolicyCreation = {
                                  "policy": {
                                    "description": "Ported from ES7",
                                    "default_state": "hot",
                                    "states": [
                                      {
                                        "name": "hot",
                                        "actions": [
                                          {
                                            "retry": {
                                              "count": 3,
                                              "backoff": "exponential",
                                              "delay": "1m"
                                            },
                                            "allocation": {
                                              "require": {
                                                "temp": "hot"
                                              },
                                              "include": {},
                                              "exclude": {},
                                              "wait_for": "false"
                                            }
                                          }
                                        ],
                                        "transitions": []
                                      }
                                    ],
                                    "ism_template": []
                                  }
                                }

                r2 = requests.put(f"https://opensearch:9200/_plugins/_ism/policies/{item[0]}", json=jsonILMPolicyCreation, auth = HTTPBasicAuth('something', 'something'), verify=False)
                print(r2.text)
                print(r2.status_code)
f.close()

ElasticSearch to OpenSearch Migration: Map Users to Roles

After the roles are created, I need to map users into the roles — using the ElasticSearch API to list all roles and add each user to the corresponding OpenSearch role.

import requests
from requests.auth import HTTPBasicAuth

def addUserToRole(strRole, strUID):
        jsonAddUser = [
        {               "op": "add",            "path": f"/{strRole}",          "value": {"users": strUID} }]
        print(f"{strRole}\t{jsonAddUser}")
        r2 = requests.patch(f"https://opensearch.example.com:9200/_plugins/_security/api/rolesmapping", json=jsonAddUser, auth = HTTPBasicAuth('something', 'something'), verify=False)
        print(r2.text)
        print(r2.status_code)

listIgnoredGroups = ['security_rest_api_access', 'logstash_role', 'elastalert_role', 'kibana_server', 'wsadmin_role', 'mgmt_role', 'logstash', 'manage_snapshots', 'readall', 'all_access', 'own_index', 'kibana_user', ]

# Get all roles from prod & list users in those roles
#GET _opendistro/_security/api/rolesmapping/
r = requests.get(f"https://elasticsearch.example.com:9200/_opendistro/_security/api/rolesmapping/", auth = HTTPBasicAuth('something', 'something'), verify=False)

dictAllRoles = r.json()

# For each role, list out each user and add that user to that role in OS
for item in dictAllRoles.items():
        if item[0] not in listIgnoredGroups:
                for strUID in item[1].get('users'):
                        addUserToRole(item[0], item[1].get('users'))

ElasticSearch to OpenSearch Migration: Creating Roles

To create the roles, use the ElasticSearch API to get the existing role definitions, remove a few attributes I don’t want to set (reserved, static, hidden), and create the corresponding role in OpenSearch. I skip all of the reserved roles.

import requests
from requests.auth import HTTPBasicAuth

f = open("results-roles.txt", "a")

objGetRoleRequest = requests.get(f"https://elasticsearch.example.com:9200/_opendistro/_security/api/roles", auth = HTTPBasicAuth('something', 'something'), verify=False)
dictRoleInfo = objGetRoleRequest.json()
for item in dictRoleInfo.items():
        if item[1].get('reserved') is False:
                print(item)
                print("\n")
                dictRoleDefinition = dict(item[1])
                dictRoleDefinition.pop('reserved')
                dictRoleDefinition.pop('static')
                dictRoleDefinition.pop('hidden')
                r = requests.put(f"https://opensearch.example.com:9200/_plugins/_security/api/roles/{item[0]}", json=dictRoleDefinition, auth = HTTPBasicAuth('something', 'something'), verify=False)
                print(r.json())

                if r.status_code == 200:
                        print(f"{item[0]}\t{r.status_code}\t{r.json()}\n")
                        f.write(f"{item[0]}\t{r.status_code}\t{r.json()}\n")
                else:
                        print(f"HTTP Error: {r.status_code} on web call")
                        print(f"{item[0]}\t{r.status_code}\t{r.json()}\n")
                        f.write(f"{item[0]}\t{r.status_code}\t{r.json()}\n")
f.close()

ElasticSearch to OpenSearch: Local User Migration

One of the trickier bits of migrating from ElasticSearch to OpenSearch has been the local users — most of our users are authenticated via OAUTH, but programmatic access is done with local user accounts. Fortunately, you appear to be able to get the user password hash from the .opendistro_security API if you authenticate using an SSL cert.

This means the CN of the certificate being used must be registered in the elasticsearch.yml as an admin DN:

plugins.security.authcz.admin_dn:
  - 'CN=admin,O=LJRTest,ST=Ohio,C=US'
  - 'CN=ljradmin,O=LJRTest,ST=Ohio,C=US'

Provided the certificate is an admin_dn, the account can be used to search the .opendistro_security index and return local user info — including hashes. Information within the document is base 64 encoded, so the value needs to be decoded before you’ve got legible user information. One the user record has been obtained, the information can be used to POST details to the OpenSearch API and create a matching user.

import json
import requests
import base64
from requests.auth import HTTPBasicAuth

clientCrt = "./certs/ljr-mgr.pem"
clientKey = "./certs/ljr-mgr.key"
strOSAdminUser = 'something'
strOSAdminPass = 'something'

r = requests.get("https://elasticsearch.example.com:9200/.opendistro_security/_search?pretty", verify=False, cert=(clientCrt, clientKey))
if r.status_code == 200:
        dictResult = r.json()

        for item in dictResult.get('hits').get('hits'):
                if item.get('_id') == "internalusers":
                        strInternalUsersXML = item.get('_source').get('internalusers')
                        strUserJSON = base64.b64decode(strInternalUsersXML).decode("utf-8")
                        dictUserInfo = json.loads(strUserJSON)
                        for tupleUserRecord in dictUserInfo.items():
                                strUserName = tupleUserRecord[0]
                                dictUserRecord = tupleUserRecord[1]
                                if dictUserRecord.get('reserved') == False:
                                        dictUserDetails = {
                                                "hash": dictUserRecord.get('hash'),
                                                "opendistro_security_roles": dictUserRecord.get('opendistro_security_roles'),
                                                "backend_roles": dictUserRecord.get('backend_roles'),
                                                "attributes": dictUserRecord.get('attributes')
                                                }

                                        if dictUserRecord.get('description') is not None:
                                                dictUserDetails["description"] = dictUserRecord.get('description')

                                        reqCreateUser = requests.put(f'https://opensearch.example.com:9200/_plugins/_security/api/internalusers/{strUserName}', json=dictUserDetails, auth = HTTPBasicAuth(strOSAdminUser, strOSAdminPass), verify=False)
                                        print(reqCreateUser.text)
else:
        print(r.status_code)

ElasticSearch to OpenSearch Migration: Remote Reindex to Move Data

Since we cannot do an in-place upgrade of our ElasticSearch environment, I need to move everything to the new servers. The biggest component is moving the data — which can easily be done using the remote reindex. Use the ElasticSearch API to get a list of all indices, and tell the OpenSearch API to reindex that index from the ElasticSearch remote. This operates on deltas — it will add new documents to an index — so my plan is to spend a few days seeding the initial data, then perform delta updates leading up to the scheduled change.

import requests
from requests.auth import HTTPBasicAuth

f = open("results.txt", "a")

listIndexNames = []

reqGetIndexes = requests.get('https://elasticsearch.example.com:9200/_cat/indices?format=json', auth=HTTPBasicAuth('something','something'), verify=False)
for jsonIndex in reqGetIndexes.json():
        if jsonIndex.get('index')[0] != '.':
                listIndexNames.append(jsonIndex.get('index'))

for strIndexName in listIndexNames:
  jsonReindexItem = {
    "source": {
      "remote": {
        "host": "https://elasticsearch.example.com:9200",
        "username": "something",
        "password": "something"
      },
  "index": strIndexName
    },
    "dest": {
  "index": strIndexName
    }
  }

  r = requests.post('https://opensearch.example.com:9200/_reindex', json=jsonReindexItem, auth = HTTPBasicAuth('something', 'something'), verify=False)
  print(r.json())
  jsonResponse = r.json()

  if r.status_code == 400 and "mapping set to strict" in jsonResponse.get('failures')[0].get('cause').get("reason"):
    # {'error': {'root_cause': [{'type': 'x_content_parse_exception', 'reason': '[1:2] [reindex] unknown field [key]'}], 'type': 'x_content_parse_exception', 'reason': '[1:2] [reindex] unknown field [key]'}, 'status': 400}
    if jsonResponse.get('failures'):
      print(jsonResponse.get('failures')[0].get('cause').get("reason"))
      print("I need to set dynamic mapping")
      r2 = requests.put(f'https://opensearch.example.com:9200/{strIndexName}/_mapping', json={"dynamic":"true"}, auth = HTTPBasicAuth('something', 'something'), verify=False)
      print(r2.json)
      r3 = requests.post('https://opensearch.example.com:9200/_reindex', json=jsonReindexItem, auth = HTTPBasicAuth('something', 'something), verify=False)
      print(r.json())
      print(f"{strIndexName}\t{r3.status_code}\t{r.json()}\n")
      f.write(f"{strIndexName}\t{r3.status_code}\t{r.json()}\n")

  elif r.status_code == 200:
    print(jsonResponse)
    print(f"{strIndexName}\t{r.status_code}\t{r.json()}\n")
    f.write(f"{strIndexName}\t{r.status_code}\t{r.json()}\n")
  else:
    print(f"HTTP Error: {r.status_code} on web call")
    print(f"{strIndexName}\t{r.status_code}\t{r.json()}\n")
    f.write(f"{strIndexName}\t{r.status_code}\t{r.json()}\n")

f.close()

ElasticSearch to OpenSearch Migration: Creating Index Templates

Prior to creating the indices, I need to create the index templates.

import requests
from requests.auth import HTTPBasicAuth
import json
from time import sleep

def serialize_sets(obj):
        if isinstance(obj, set):
                return list(obj)
        return obj

listIgnoredTemplates = ['.watch-history', '.watch-history-1', '.watch-history-2', '.watch-history-3', '.watch-history-4', '.watch-history-5', '.watch-history-6', '.watch-history-7', '.watch-history-8', '.watch-history-9', '.watch-history-10', '.watch-history-11', 'ilm-history', 'ilm-history_2', 'tenant_template', '.monitoring-logstash']

# Get all roles from prod & list users in those roles
r = requests.get(f"https://elasticsearch.example.com:9200/_template", auth = HTTPBasicAuth('something', 'something'), verify=False)

dictAllTemplates= r.json()

for item in dictAllTemplates.items():
        if item[0] not in listIgnoredTemplates:
                if item[1].get('settings').get('index'):
                        iShards = (item[1].get('settings').get('index').get('number_of_shards'))
                        iReplicas = (item[1].get('settings').get('index').get('number_of_replicas'))
                else:
                        iShards = 3
                        iReplicas = 1
                if iShards is None:
                        iShards = 3
                if iReplicas is None:
                        iReplicas = 1
                if item[1].get('settings').get('index') and item[1].get('settings').get('index').get('lifecycle'):
                        jsonAddTemplate = {
                                 "index_patterns": item[1].get('index_patterns'),
                                  "template": {
                                    "aliases": {
                                      item[1].get('settings').get('index').get('lifecycle').get('rollover_alias'): {}
                                    },
                                    "settings": {
                                      "number_of_shards": iShards,
                                      "number_of_replicas": iReplicas
                                    },
                                    "mappings":        item[1].get('mappings')
                                    }
                                  }
                else:
                        jsonAddTemplate = {
                                  "index_patterns": item[1].get('index_patterns'),
                                  "template": {
                                    "settings": {
                                      "number_of_shards": iShards,
                                      "number_of_replicas": iReplicas
                                    },
                                    "mappings":         item[1].get('mappings')
                                    }
                                  }
                r2 = requests.put(f"https://opensearch.example.com:9200/_index_template/{item[0]}", json=jsonAddTemplate, auth = HTTPBasicAuth('something', 'something'), verify=False)
                print(r2.text)
                print(r2.status_code)
                sleep(2)

OpenSearch Proof of Concept In-Place Upgrade from ElasticSearch 7.7.0 to OpenSearch 2.12.0

I need to migrate my ElasticSearch installation over to OpenSearch. From reading the documentation, it isn’t really clear if that is even possible as an in-place upgrade or if I’d need to use a remote reindex or snapshot backup/restore. So I tested the process with a minimal data set. TL;DR: Yes, it works.

Create a docker instance of ElasticSearch 7.7.0

mkdir /docker/es/esdata
chmod -R g+dwx /docker/es/esdata
chgrp -R 0 /docker/es/esdata

mkdir /docker/es/esconfig

Populate configuration info into ./esconfig and ./esdata is an empty directory

docker run –name es770 -dit -v /docker/es/esdata:/usr/share/elasticsearch/data -v /docker/es/esconfig:/usr/share/elasticsearch/config -p 9200:9200 -p 9300:9300 -e “discovery.type=single-node” docker.elastic.co/elasticsearch/elasticsearch:7.7.0

Populate Data into ElasticSearch Sandbox

Use curl to populate an index with some records – you can create lifecycle policies, customize the fields, etc … this is the bare minimum to validate that data in ES7.7 can be ingested by OS2.12curl -X POST “localhost:9200/ljrtest/_bulk” -H “Content-Type: application/x-ndjson” -d’
{“index”: {“_id”: “1”}}
{“id”: “1”, “message”: “Record one”}
{“index”: {“_id”: “2”}}
{“id”: “2”, “message”: “Record two”}
{“index”: {“_id”: “3”}}
{“id”: “3”, “message”: “Record three”}
{“index”: {“_id”: “4”}}
{“id”: “4”, “message”: “Record four”}
{“index”: {“_id”: “5”}}
{“id”: “5”, “message”: “Record five”}
{“index”: {“_id”: “6”}}
{“id”: “6”, “message”: “Record six”}
{“index”: {“_id”: “7”}}
{“id”: “7”, “message”: “Record seven”}
{“index”: {“_id”: “8”}}
{“id”: “8”, “message”: “Record eight”}
{“index”: {“_id”: “9”}}
{“id”: “9”, “message”: “Record nine”}
{“index”: {“_id”: “10”}}
{“id”: “10”, “message”: “Record ten”}

Shut Down ElasticSearch

docker stop es770

Bring Up an OpenSearch 2.12 Host

mkdir /docker/es/osconfig

Populate the configuration data for OpenSearch in ./osconfig

docker run –name os212 -dit -v /docker/es/esdata:/usr/share/opensearch/data -v /docker/es/osconfig:/usr/share/opensearch/config -p 9200:9200 -p 9600:9600 -e “discovery.type=single-node” -e “OPENSEARCH_INITIAL_ADMIN_PASSWORD=P@s5w0rd-123” opensearchproject/opensearch:2.12.0

Verify Data is Still Available in OpenSearch

[root@docker es]# curl -k -u “admin:P@s5w0rd-123” https://localhost:9200/ljrtest
{“ljrtest”:{“aliases”:{},”mappings”:{“properties”:{“id”:{“type”:”text”,”fields”:{“keyword”:{“type”:”keyword”,”ignore_above”:256}}},”message”:{“type”:”text”,”fields”:{“keyword”:{“type”:”keyword”,”ignore_above”:256}}}}},”settings”:{“index”:{“creation_date”:”1710969477402″,”number_of_shards”:”1″,”number_of_replicas”:”1″,”uuid”:”AO5JBoyzSJiKZA9xeA2imQ”,”version”:{“created”:”7070099″,”upgraded”:”136337827″},”provided_name”:”ljrtest”}}}}

Conclusion

Yes, a very basic data set in ElasticSearch 7.7.0 can be upgraded in-place to OpenSearch 2.12.0 — in the “real world” compatibility issues will crop up (flatten!!), but the idea is fundamentally sound.

Problem, though, is compatibility issues. We don’t have exotic data types in our instance but Kibana uses “flatten” … so those rare people use use Kibana to access and visualize their data really cannot just move to OpenSearch. That’s a huge caveat. I can recreate everything manually after deleting all of the Kibana indices (and possibly some more, haven’t gone this route to see). But if I’m going to recreate everything, why wouldn’t I recreate everything and use remote reindex to move data? I can do this incrementally — take a week to move all the data slowly, do a catch-up reindex t-2 days, another t-1 days, another the day of the change, heck even one a few hours before the change. Then the change is a quick delta reindex, stop ElasticSearch, and swap over to OpenSearch. The backout is to just swing back to the fully functional, unchanged ElasticSearch instance.

ElasticSearch — Too Many Shards

Our ElasticSearch environment melted down in a fairly spectacular fashion — evidently (at least in older iterations), it’s an unhandled Java exception when a server is trying to send data over to another server that is refusing it because that would put the receiver over the shard limit. So we didn’t just have a server or three go into read only mode — we had cascading failure where java would except out and the process was dead. Restarting the ElasticSearch service temporarily restored functionality — so I quickly increased the max shards per node limit to keep the system up whilst I cleaned up whatever I could clean up

curl -X PUT http://uid:pass@`hostname`:9200/_cluster/settings -H "Content-Type: application/json" -d '{ "persistent": { "cluster.max_shards_per_node": "5000" } }'

There were two requests against the ES API that were helpful in cleaning ‘stuff’ up — GET /_cat/allocation?v returns a list of each node in the ES cluster with a count of shards (plus disk space) being used. This was useful in confirming that load across ‘hot’, ‘warm’, and ‘cold’ nodes was reasonable. If it was not, we would want to investigate why some nodes were under-allocated. We were, however, fine.

The second request: GET /_cat/shards?v=true which dumps out all of the shards that comprise the stored data. In my case, a lot of clients create a new index daily — MyApp-20231215 — and then proceeded to add absolutely nothing to that index. Literally 10% of our shards were devoted to storing zero documents! Well, that’s silly. I created a quick script to remove any zero-document index that is older than a week. A new document coming in will create the index again, and we don’t need to waste shards not storing data.

Once you’ve cleaned up the shards, it’s a good idea to drop your shard-per-node configuration down again. I’m also putting together a script to run through the allocated shards per node data to alert us when allocation is unbalanced or when total shards approach our limit. Hopefully this will allow us to proactively reduce shards instead of having the entire cluster fall over one night.

Increasing Kibana CSV Report Max Size

The default size limit for CSV reports in Kibana is 10 meg. Since that’s not enough for some of our users, I’ve been testing increases to the xpack.reporting.csv.maxSizeBytes value.

We’re still limited by the ES http.max_content_length value — which the documentation seems pretty confident shouldn’t be increased because the system can become unstable. Increasing the max Kibana report size to 100mb just yields a different error because ES doesn’t like it. 75 exhausted the JavaScript heap (?) – which I could get around by setting  NODE_OPTIONS=–max_old_space_size=4096 … but that just led to the server abending whenever a report was run (in fact, I had to remove the reports I tried to run from the server to get everything back into a working state). Increasing the limit to 50 meg, though, didn’t do anything unreasonable in dev. So somewhere between 50 and 75 meg is our upper limit, and 50 seemed like a nice round number to me.

Notes on resource usage – Data is held in memory as a report is created. We’d see an increase in memory/CPU usage while the report is being generated (or, I guess more accurately, a longer time during which the memory/CPU usage is increased because if a 10 meg report takes 30 seconds to run then a 50 meg report is going to take 2.5 minutes to run … and the memory/CPU usage is pretty much the same during the “a report is running” period).

Then, though, the report is stashed in ElasticSearch for user(s) to retrieve within .reporting* indicies. And that’s where things get a little silly — architecturally, this is just another index; it ages off with a lifecycle policy if one exists. But it looks like they never created a lifecycle management policy. So you can still retrieve reports run a little over two years ago! We will certainly want to set up a policy to clean up old reports … just have to decide how long is reasonable.