On Nov 22, 2024, Amazon OpenSearch Ingestion launched support for AWS Lambda processors. With this launch, you now have more flexibility enriching and transforming your logs, metrics, and trace data in an OpenSearch Ingestion pipeline. Some examples include using foundation models (FMs) to generate vector embeddings for your data and looking up external data sources like Amazon DynamoDB to enrich your data.
Amazon OpenSearch Ingestion is a fully managed, serverless data pipeline that delivers real-time log, metric, and trace data to Amazon OpenSearch Service domains and Amazon OpenSearch Serverless collections.
Processors are components within an OpenSearch Ingestion pipeline that enable you to filter, transform, and enrich events using your desired format before publishing records to a destination of your choice. If no processor is defined in the pipeline configuration, then the events are published in the format specified by the source component. You can incorporate multiple processors within a single pipeline, and they are run sequentially as defined in the pipeline configuration.
OpenSearch Ingestion gives you the option of using Lambda functions as processors along with built-in native processors when transforming data. You can batch events into a single payload based on event count or size before invoking Lambda to optimize the pipeline for performance and cost. Lambda enables you to run code without provisioning or managing servers, eliminating the need to create workload-aware cluster scaling logic, maintain event integrations, or manage runtimes.
In this post, we demonstrate how to use the OpenSearch Ingestion’s Lambda processor to generate embeddings for your source data and ingest them to an OpenSearch Serverless vector collection. This solution uses the flexibility of OpenSearch Ingestion pipelines with a Lambda processor to dynamically generate embeddings. The Lambda function will invoke the Amazon Titan Text Embeddings Model hosted in Amazon Bedrock, allowing for efficient and scalable embedding creation. This architecture simplifies various use cases, including recommendation engines, personalized chatbots, and fraud detection systems.
Integrating OpenSearch Ingestion, Lambda, and OpenSearch Serverless creates a fully serverless pipeline for embedding generation and search. This combination offers automatic scaling to match workload demands and a usage-driven model. Operations are simplified because AWS manages infrastructure, updates, and maintenance. This serverless approach allows you to focus on developing search and analytics solutions rather than managing infrastructure.
Note that Amazon OpenSearch Service also provides Neural search which transforms text into vectors and facilitates vector search both at ingestion time and at search time. During ingestion, neural search transforms document text into vector embeddings and indexes both the text and its vector embeddings in a vector index. Neural search is available for managed clusters running version 2.9 and above.
Solution overview
This solution builds embeddings on a dataset stored in Amazon Simple Storage Service (Amazon S3). We use the Lambda function to invoke the Amazon Titan model on the payload delivered by OpenSearch Ingestion.
Prerequisites
You should have an appropriate role with permissions to invoke your Lambda function and Amazon Bedrock model and also write to the OpenSearch Serverless collection.
To provide access to the collection, you must configure an AWS Identity and Access Management (IAM) pipeline role with a permissions policy that grants access to the collection. For more details, see Granting Amazon OpenSearch Ingestion pipelines access to collections. The following is example code:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "allowinvokeFunction",
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction"
],
"Resource": "arn:aws:lambda:{{region}}:{{account-id}}:function:{{function-name}}"
}
]
}
The role must have the following trust relationship, which allows OpenSearch Ingestion to assume it:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "osis-pipelines.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
Create an ingestion pipeline
You can create a pipeline using a blueprint. For this post, we select the AWS Lambda custom enrichment blueprint.
We use the IMDB title basics dataset, which that contains movie information, including originalTitle
, runtimeMinutes
, and genres.
The OpenSearch Ingestion pipeline uses a Lambda processor to create embeddings for the field original_title
and store the embeddings as original_title_embeddings
along with other data.
See the following pipeline code:
Let’s take a closer look at the Lambda processor in the ingestion pipeline .Pay attention to the key_name
, parameter. You can choose any value for key_name and your Lambda function will need to reference this key in your Lambda function when processing the payload from OpenSearch Ingestion. The payload size is determined by the batch setting. When batching is enabled in the Lambda processor, OpenSearch Ingestion groups multiple events into a single payload before invoking the Lambda function. A batch is sent to Lambda when any of the following thresholds are met:
-
- event_count – The number of events reaches the specified limit
-
- maximum_size – The total size of the batch reaches the specified size (for example, 5 MB) and is configurable up to 6MB (Invocation payload limit for AWS Lambda)
Lambda function
The Lambda function receives the data from OpenSearch Ingestion, invokes Amazon Bedrock to generate the embedding, and adds it to the source record. “documents
” is used to reference the events coming in from OpenSearch Ingestion and matches the key_name
declared in the pipeline. We add the embedding from Amazon Bedrock back to the original record. This new record with the appended embedding value is then sent to the OpenSearch Serverless sink by OpenSearch Ingestion. See the following code:
import json
import boto3
import os
# Initialize Bedrock client
bedrock = boto3.client('bedrock-runtime')
def generate_embedding(text):
"""Generate embedding for the given text using Bedrock."""
response = bedrock.invoke_model(
modelId="amazon.titan-embed-text-v1",
contentType="application/json",
accept="application/json",
body=json.dumps({"inputText": text})
)
embedding = json.loads(response['body'].read())['embedding']
return embedding
def lambda_handler(event, context):
# Assuming the input is a list of JSON documents
documents = event['documents']
processed_documents = []
for doc in documents:
if originalTitle' in doc:
# Generate embedding for the 'originalTitle' field
embedding = generate_embedding(doc[originalTitle'])
# Add the embedding to the document
doc['originalTitle_embeddings'] = embedding
processed_documents.append(doc)
# Return the processed documents
return processed_documents
In case of any exceptions while using the lambda processor, all the documents in the batch are considered failed events and are forwarded the next chain of processors if any or to the sink with a failed tag. The tag can be configured to the pipeline with the tags_on_failure
parameter and the errors are also sent to CloudWatch logs for further action.
After the pipeline runs, you can see that the embeddings were created and stored as originalTitle_embeddings
within the document in a k-NN index, imdb-data-embeddings
. The following screenshot shows an example.
Summary
In this post, we showed how you can use Lambda as part of your OpenSearch Ingestion pipeline to enable complex transformation and enrichment of your data. For more details on the feature, refer to Using an OpenSearch Ingestion pipeline with AWS Lambda.
About the Authors
Jagadish Kumar (Jag) is a Senior Specialist Solutions Architect at AWS focused on Amazon OpenSearch Service. He is deeply passionate about Data Architecture and helps customers build analytics solutions at scale on AWS.
Sam Selvan is a Principal Specialist Solution Architect with Amazon OpenSearch Service.
Srikanth Govindarajan is a Software Development Engineer at Amazon Opensearch Service. Srikanth is passionate about architecting infrastructure and building scalable solutions for search, analytics, security, AI and machine learning based usecases.