githubEdit

1283747813__setting-up-real-time-mysql-data-synchronization-with-opensearch

Setting up Real-Time MySQL Data Synchronization with OpenSearch

This guide explains how to set up real-time data synchronization between MySQL databases and OpenSearch for enhanced search performance in your applications. STEP-BY-STEP: AWS CONSOLE SETUP ๐Ÿ”น Step 1: Enable CDC on Aurora MySQL Go to RDS > Databases > Your Aurora Instance Open Configuration , locate Cluster parameter group If default.aurora-mysql5.x , go to Parameter groups โ†’ create a new cluster parameter group Edit parameters: binlog_format = ROW binlog_row_image = FULL Attach this new parameter group to your DB cluster Reboot the DB cluster to apply changes Set binlog retention: sql CopyEdit CALL mysql.rds_set_configuration('binlog retention hours', 24); ๐Ÿ”น Step 2: Create IAM Role for Lambda Go to IAM > Roles > Create Role Select Lambda Attach policies: AmazonDMSRedshiftS3Role (or custom for DMS access) AmazonOpenSearchServiceFullAccess (or scoped) CloudWatchLogsFullAccess Name it: lambda-opensearch-transform-role ๐Ÿ”น Step 3: Create the Lambda Function Go to Lambda > Create function Name: transform-and-index-opensearch Runtime: Python 3.12 (or Node.js) Choose existing role โ†’ lambda-opensearch-transform-role Deploy your transformation logic Example handler: import json import base64 import os import requests OPENSEARCH_URL = os.environ.get("OPENSEARCH_URL") # e.g. https://your-opensearch-host:9200

AUTH_USER = os.environ.get("OPENSEARCH_USER") # basic auth user

AUTH_PASS = os.environ.get("OPENSEARCH_PASS") # basic auth pass

def lambda_handler(event, context): headers = { "Content-Type": "application/json" } for record in event['Records']: try:

Decode base64 Kinesis data

payload = base64.b64decode(record['kinesis']['data']).decode('utf-8')

message = json.loads(payload)

table = message['metadata']['table-name'].lower() operation = message['metadata']['operation'] doc = message.get('data', {}) if table == 'invoice': doc_id = doc.get('INVOICE_ID')

Skip if no ID

if not doc_id: print("doc_id is empty") continue print(f"Table name: {table}") print(f"Operation: {operation}") index_name = f"app_{table}"

Handle insert/update

if operation in ['insert', 'update', 'load']: url = f"{OPENSEARCH_URL}/{index_name}/_doc/{doc_id}" print("Making API Call")

response = requests.put(url, headers=headers, auth=(AUTH_USER, AUTH_PASS), data=json.dumps(doc))

response = requests.put(url, headers=headers, data=json.dumps(doc)) print(f"[{operation.upper()}] {url} => {response.status_code} {response.text}")

Handle delete

elif operation == 'delete': url = f"{OPENSEARCH_URL}/{index_name}/doc/{doc_id}" response = requests.delete(url, headers=headers, auth=(AUTH_USER, AUTH_PASS)) print(f"[DELETE] {url} => {response.status_code} {response.text}") except Exception as e: print(f"Error processing record: {e}") continue ๐Ÿ”น Step 4: Create DMS Subnet Group Go to DMS > Subnet Groups Click Create Name: dms-subnet-group Select VPC Choose private subnets with access to Aurora & Lambda ๐Ÿ”น Step 5: Create DMS Replication Instance Go to DMS > Replication instances > Create Name: dms-replication-instance Instance Class: dms.t3.medium or higher Choose the VPC & dms-subnet-group Attach security group that allows MySQL (3306) & Lambda access ๐Ÿ”น Step 6: Create DMS Source Endpoint (MySQL) Go to DMS > Endpoints > Create Endpoint type: Source Engine: MySQL Provide: Host, Port, DB Name Username: dms_user Password Test the connection (ensure connectivity & permissions) ๐Ÿ”น Step 7: Create DMS Target Endpoint (Lambda) ๐Ÿ’ก Lambda is not a native DMS target. Use Kinesis or S3 with Lambda trigger OR create a custom endpoint. Options: Create Kinesis or S3 endpoint as Target Set up Lambda trigger on those services OR use custom DMS target plugin (advanced) ๐Ÿ”น Step 8: Create DMS Replication Task Go to DMS > Tasks > Create Name: mysql-to-opensearch-sync Source: Your MySQL/Aurora endpoint Target: Your S3/Kinesis/Lambda proxy Migration type: "CDC only" or "Full load + CDC" Enable CloudWatch logs ๐Ÿ”น Step 9: OpenSearch Setup (Duplo or Self-Managed) Create 7 indexes: app<TABLE_NAME> Define index templates for each: Mappings for ID fields, timestamps, status, etc. Create ILM policies for data retention Create an OpenSearch user/role with restricted access to app_* indexes ๐Ÿ”น Step 10: Monitoring Go to CloudWatch > Log groups Check logs for Lambda, DMS task Create a CloudWatch dashboard : Metrics for Lambda errors, invocation count DMS replication latency

Last updated

Was this helpful?