Since we cannot do an in-place upgrade of our ElasticSearch environment, I need to move everything to the new servers. The biggest component is moving the data — which can easily be done using the remote reindex. Use the ElasticSearch API to get a list of all indices, and tell the OpenSearch API to reindex that index from the ElasticSearch remote. This operates on deltas — it will add new documents to an index — so my plan is to spend a few days seeding the initial data, then perform delta updates leading up to the scheduled change.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | import requests from requests.auth import HTTPBasicAuth f = open ( "results.txt" , "a" ) listIndexNames = [] reqGetIndexes = requests.get( 'https://elasticsearch.example.com:9200/_cat/indices?format=json' , auth = HTTPBasicAuth( 'something' , 'something' ), verify = False ) for jsonIndex in reqGetIndexes.json(): if jsonIndex.get( 'index' )[ 0 ] ! = '.' : listIndexNames.append(jsonIndex.get( 'index' )) for strIndexName in listIndexNames: jsonReindexItem = { "source" : { "remote" : { "username" : "something" , "password" : "something" }, "index" : strIndexName }, "dest" : { "index" : strIndexName } } r = requests.post( 'https://opensearch.example.com:9200/_reindex' , json = jsonReindexItem, auth = HTTPBasicAuth( 'something' , 'something' ), verify = False ) print (r.json()) jsonResponse = r.json() if r.status_code = = 400 and "mapping set to strict" in jsonResponse.get( 'failures' )[ 0 ].get( 'cause' ).get( "reason" ): # {'error': {'root_cause': [{'type': 'x_content_parse_exception', 'reason': '[1:2] [reindex] unknown field [key]'}], 'type': 'x_content_parse_exception', 'reason': '[1:2] [reindex] unknown field [key]'}, 'status': 400} if jsonResponse.get( 'failures' ): print (jsonResponse.get( 'failures' )[ 0 ].get( 'cause' ).get( "reason" )) print ( "I need to set dynamic mapping" ) r2 = requests.put(f 'https://opensearch.example.com:9200/{strIndexName}/_mapping' , json = { "dynamic" : "true" }, auth = HTTPBasicAuth( 'something' , 'something' ), verify = False ) print (r2.json) r3 = requests.post( 'https://opensearch.example.com:9200/_reindex' , json = jsonReindexItem, auth = HTTPBasicAuth( 'something' , 'something), verify = False ) print (r.json()) print (f "{strIndexName}\t{r3.status_code}\t{r.json()}\n" ) f.write(f "{strIndexName}\t{r3.status_code}\t{r.json()}\n" ) elif r.status_code = = 200 : print (jsonResponse) print (f "{strIndexName}\t{r.status_code}\t{r.json()}\n" ) f.write(f "{strIndexName}\t{r.status_code}\t{r.json()}\n" ) else : print (f "HTTP Error: {r.status_code} on web call" ) print (f "{strIndexName}\t{r.status_code}\t{r.json()}\n" ) f.write(f "{strIndexName}\t{r.status_code}\t{r.json()}\n" ) f.close() |