Building Robust AI Systems with DSPy and Amazon Bedrock

From Prompt Magic to Prompt Engineering

Davide Gallitelli
8 min readJun 18, 2024
Image from the Author — AI Generated

As artificial intelligence continues to revolutionize various industries, the need for robust and scalable tools to develop and deploy AI models has never been greater. Two notable advancements in this space are DSPy, Stanford’s cutting-edge data science tool, and Amazon Bedrock, AWS’s innovative foundation for machine learning. This blog post delves into the features, capabilities, and unique synergies between DSPy and Amazon Bedrock, highlighting how they empower developers and data scientists to push the boundaries of AI.

What is DSPy?

%pip install dspy-ai

DSPy, developed by Stanford NLP, is an open-source library designed to streamline the process of creating and managing data science workflows. It is built around three core components: Signatures, Modules, and Optimizers.

Signatures

Signatures in DSPy define the input/output behavior of language model (LM) tasks in a modular and adaptive manner. Rather than relying on lengthy, brittle prompts, signatures allow for clean and reproducible code. Examples of signatures include `»question -> answer»` for question answering or `»document -> summary»` for summarization. Signatures can be simple or complex, depending on the task requirements.

Modules

Modules in DSPy are the building blocks of LM programs. Each module abstracts a specific prompting technique, such as chain-of-thought or retrieval-augmented generation. Modules can handle various signatures and can be composed into larger programs, much like neural network layers in frameworks like PyTorch. This allows for flexible and scalable program construction.

Optimizers

For the sake of brevity, I will not get into the details of DSPy Optimizers in this blog post. Please comment if you want me to deep dive on this very interesting topic!

Optimizers in DSPy fine-tune the parameters of DSPy programs, for example the prompts and/or the LM weights, to optimize the output of your program. They use a combination of gradient descent and discrete optimization techniques to maximize a metric, or more generally a function that evaluates the output of your program, assigning it a score. Different types of optimizers are available, each tailored to different data scenarios and optimization needs. To make sure the optimizer works in the best way possible, you need to provide it with some training inputs.

How to use DSPy with Amazon Bedrock

Configuration

The first step is to configure DSPy so that it will use Amazon Bedrock by default:

import dspy

bedrock_haiku = dspy.AWSAnthropic(
aws_provider = dspy.Bedrock(region_name="us-west-2"),
model="anthropic.claude-3-haiku-20240307-v1:0",
)
dspy.configure(lm=bedrock_haiku)

With the LLM configuration out of the way, we can get started solving problems.

Singatures and Modules

As suggested by the official DSPy documentation, “Using DSPy in 8 Steps”, let’s start by defining a task. To keep things simple at the beginning, let’s just create a simple question & answer program. Therefore, our input will be a question, and our output will be an answer. With that, we can define our signature and our module:

qa = dspy.Predict("question -> answer")

In the example below, Predict is our module, which goal is to generate a prediction, and our signature is question -> answer, which is the shorthand notation to explain to DSPy that we’re looking for an answer from a question. If we print qa, we can see the following output:

Predict(StringSignature(question -> answer
instructions='Given the fields `question`, produce the fields `answer`.'
question = Field(annotation=str required=True json_schema_extra={'__dspy_field_type': 'input', 'prefix': 'Question:', 'desc': '${question}'})
answer = Field(annotation=str required=True json_schema_extra={'__dspy_field_type': 'output', 'prefix': 'Answer:', 'desc': '${answer}'})
))

DSPy infers that question and answer are strings, and uses the prompt highlighted in instructions as input to the Language Model. You can also control the type yourself:

dspy.TypedPredictor("question:str -> answer:int")

# Output
TypedPredictor(StringSignature(question -> answer
instructions='Given the fields `question`, produce the fields `answer`.'
question = Field(annotation=str required=True json_schema_extra={'__dspy_field_type': 'input', 'prefix': 'Question:', 'desc': '${question}'})
answer = Field(annotation=int required=True json_schema_extra={'__dspy_field_type': 'output', 'prefix': 'Answer:', 'desc': '${answer}'})
))

By having the prompt defined in the Module Predict, DSPy introduces a concept of repeatability and control over the process of prompt engineering. Let’s use this class to generate an answer to a question:

qa(question="Who is Sergio Mattarella?").answer

# Output
Here is the answer for the given question:
Question: Who is Sergio Mattarella?
Answer: Sergio Mattarella is the current President of Italy. He has been serving as President since 2015.

Advanced Configuration for Signature and Module

Now, to modify the behavior of the program, we can customize the Signature and/or the Module.

With respect to the dspy.Modules, we can use one of the other ones made available by the DSPy library, or create a custom one:

  1. dspy.Predict: Basic predictor. Does not modify the signature. Handles the key forms of learning (i.e., storing the instructions and demonstrations and updates to the LM).
  2. dspy.ChainOfThought: Teaches the LM to think step-by-step before committing to the signature's response.
  3. dspy.ProgramOfThought: Teaches the LM to output code, whose execution results will dictate the response.
  4. dspy.ReAct: An agent that can use tools to implement the given signature.
  5. dspy.MultiChainComparison: Can compare multiple outputs from ChainOfThought to produce a final prediction.

Let’s compare the previous output from dspy.Predict with the dspy.ChainOfThought one. However, let’s change the question:

question = "True of False: The numbers in this group add up to an even number: 17,  9, 10, 12, 13, 4, 2."

predictor = dspy.Predict("question -> answer")
predictor(question=question)

# Output
Prediction(
answer='Question: True of False: The numbers in this group add up to an even number: 17, 9, 10, 12, 13, 4, 2.\nAnswer: True. The numbers 17, 9, 10, 12, 13, 4, and 2 add up to 67, which is an even number.'
)

------

cot = dspy.ChainOfThought("question -> answer")
cot(question=question)

# Output
Prediction(
rationale="Question: True of False: The numbers in this group add up to an even number: 17, 9, 10, 12, 13, 4, 2.\nReasoning: Let's think step by step in order to determine if the numbers in this group add up to an even number.\n1. We need to add up all the numbers in the group: 17 + 9 + 10 + 12 + 13 + 4 + 2 = 67.\n2. 67 is an odd number, not an even number.",
answer='False, the numbers in this group do not add up to an even number.'
)

As we can see from the two outputs, the answers are different, with the latter being the correct one. This is because DSPy extends the prompt we have given it through Chain of Thought (CoT). With CoT, we force the LM to reason “step by step” before providing the answer. This rationale is provided in the answer, and the more detailed instruction can be seen in the cot.extended_signature.

cot.extended_signature

# Output
StringSignature(question -> rationale, answer
instructions='Given the fields `question`, produce the fields `answer`.'
question = Field(annotation=str required=True json_schema_extra={'__dspy_field_type': 'input', 'prefix': 'Question:', 'desc': '${question}'})
rationale = Field(annotation=str required=True json_schema_extra={'prefix': "Reasoning: Let's think step by step in order to", 'desc': '${produce the answer}. We ...', '__dspy_field_type': 'output'})
answer = Field(annotation=str required=True json_schema_extra={'__dspy_field_type': 'output', 'prefix': 'Answer:', 'desc': '${answer}'})
)

For the dspy.Signature, we can extend the shorthand notation to, for example, introduce a context (very useful for RAG):

dspy.Predict("context, question -> answer")

# Output
Predict(StringSignature(context, question -> answer
instructions='Given the fields `context`, `question`, produce the fields `answer`.'
context = Field(annotation=str required=True json_schema_extra={'__dspy_field_type': 'input', 'prefix': 'Context:', 'desc': '${context}'})
question = Field(annotation=str required=True json_schema_extra={'__dspy_field_type': 'input', 'prefix': 'Question:', 'desc': '${question}'})
answer = Field(annotation=str required=True json_schema_extra={'__dspy_field_type': 'output', 'prefix': 'Answer:', 'desc': '${answer}'})
))

Or even use the longer notation to have more control:

class BasicQA(dspy.Signature):
"""Answer questions with short answers based on the context"""
context = dspy.InputField()
question = dspy.InputField()
answer = dspy.OutputField(desc="A short answer extrapolated from the context")

# Output
BasicQA(context, question -> answer
instructions='Answer questions with short answers based on the context'
context = Field(annotation=str required=True json_schema_extra={'__dspy_field_type': 'input', 'prefix': 'Context:', 'desc': '${context}'})
question = Field(annotation=str required=True json_schema_extra={'__dspy_field_type': 'input', 'prefix': 'Question:', 'desc': '${question}'})
answer = Field(annotation=str required=True json_schema_extra={'desc': 'A short answer extrapolated from the context', '__dspy_field_type': 'output', 'prefix': 'Answer:'})
)

Signatures even support pydantic notation, thanks to the TypedPredictor module:

import dspy
from pydantic import BaseModel, Field
from dspy.functional import TypedPredictor
from datetime import datetime
from textwrap import dedent

class TravelInformation(BaseModel):
origin: str = Field(pattern=r"^[A-Z]{3}$")
destination: str = Field(pattern=r"^[A-Z]{3}$")
date: str
confidence: float = Field(gt=0, lt=1)

class TravelSignature(dspy.Signature):
""" Extract all travel information in the given email """
email: str = dspy.InputField()
flight_information: list[TravelInformation] = dspy.OutputField()

predictor = TypedPredictor(TravelSignature)
predictor(email=dedent("""
Thank you for booking with Amazon Web Services Airlines.
Your XYZ123 flight from Bari to Las Vegas on June 18th 2024 is booked and ready to welcome you onboard.
We hope you have a pleasant flight.
"""))

Retrievers

Finally, programs can also be extended with dspy.Retrieve class to implement a retrieval system. To check the latest list of available retrievers, please refer to the dspy.retrievers module in the DSPy GitHub repository.

In order to be able to use the retrievers with Amazon Bedrock, you will need to create a custom SentenceVectorizer class. I’ve done the work ahead of time for you (by the way, if you want this implemented by the DSPy team officially, please +1 this PR#1151):

import boto3
import json
import numpy as np
from typing import List, Optional
from dsp.modules.sentence_vectorizer import BaseSentenceVectorizer

class AmazonBedrockVectorizer(BaseSentenceVectorizer):
'''
This vectorizer uses Amazon Bedrock API to convert texts to embeddings.
'''
SUPPORTED_MODELS = [
"amazon.titan-embed-text-v1", "amazon.titan-embed-text-v2:0",
"cohere.embed-english-v3", "cohere.embed-multilingual-v3"
]

def __init__(
self,
model_id: str = 'amazon.titan-embed-text-v2:0',
embed_batch_size: int = 128,
region_name: str = 'us-west-2',
aws_access_key_id: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
):
self.model_id = model_id
self.embed_batch_size = embed_batch_size

# Initialize the Bedrock client
self.bedrock_client = boto3.client(
service_name='bedrock-runtime',
region_name=region_name,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key
)

def __call__(self, inp_examples: List["Example"]) -> np.ndarray:
text_to_vectorize = self._extract_text_from_examples(inp_examples)
embeddings_list = []

n_batches = (len(text_to_vectorize) - 1) // self.embed_batch_size + 1
for cur_batch_idx in range(n_batches):
start_idx = cur_batch_idx * self.embed_batch_size
end_idx = (cur_batch_idx + 1) * self.embed_batch_size
cur_batch = text_to_vectorize[start_idx: end_idx]

# Configure Bedrock API Body
if self.model_id not in self.SUPPORTED_MODELS:
raise Exception(f"Unsupported model: {self.model_id}")

if self.model_id == "amazon.titan-embed-text-v1":
if self.embed_batch_size == 1:
body = json.dumps({"inputText": cur_batch[0]})
else:
raise Exception(f"Model {self.model_id} supports batch size of 1 exclusively.")
elif self.model_id == "amazon.titan-embed-text-v2:0":
if self.embed_batch_size == 1:
body = json.dumps({
"inputText": cur_batch[0],
"dimensions": 512
})
else:
raise Exception(f"Model {self.model_id} supports batch size of 1 exclusively.")
elif self.model_id.startswith("cohere.embed"):
body = json.dumps({
"texts": cur_batch,
"input_type": "search_document"
})
else:
raise Exception("How did you even get here?")


# Invoke Bedrock API
response = self.bedrock_client.invoke_model(
body=body,
modelId=self.model_id,
accept='application/json',
contentType='application/json'
)

response_body = json.loads(response['body'].read())
if self.model_id.startswith("cohere.embed"):
cur_batch_embeddings = response_body['embeddings']
elif self.model_id.startswith("amazon.titan-embed-text"):
cur_batch_embeddings = response_body['embedding']
else:
raise Exception(f"Not implemented yet! Check the format of response for model {self.model_id}from the Amazon Bedrock documentation: https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters.html")
embeddings_list.extend(cur_batch_embeddings)

embeddings = np.array(embeddings_list, dtype=np.float32)
return embeddings

def _extract_text_from_examples(self, inp_examples: List) -> List[str]:
if isinstance(inp_examples[0], str):
return inp_examples
return [" ".join([example[key] for key in example._input_keys]) for example in inp_examples]

You can use this class in your favorite DSPy retriever:

from dspy.retrieve.faiss_rm import FaissRM

document_chunks = [
"..."
]

frm = FaissRM(
document_chunks=document_chunks,
vectorizer=AmazonBedrockVectorizer(
embed_batch_size=128, model_id="cohere.embed-english-v3"
# OR:
# embed_batch_size=1, model_id="amazon.titan-embed-text-v2:0"
)
)
print(frm(["Provide your question here"]))

Custom Programs

With this knowledge, you’re ready to define custom classes that define the behavior of your program! For example, a RAG class would look like this:

class RAG(dspy.Module):
def __init__(self, num_passages=3):
# ‘Retrieve‘ will use the user’s default retrieval settings unless overriden.
self.retrieve = dspy.Retrieve(k=num_passages)
# ‘ChainOfThought‘ with signature that generates answers given retrieval & question.
self.generate_answer = dspy.ChainOfThought("context, question -> answer")

def forward(self, question):
context = self.retrieve(question).passages
return self.generate_answer(context=context, question=question)

Before running this code, you have to configure your preferred retriever.

Conclusion

DSPy and Amazon Bedrock represent a significant step forward in the evolution of AI development tools. By combining DSPy’s data science prowess with Bedrock’s scalable and efficient model management, developers and data scientists are equipped with a powerful toolkit to tackle complex AI challenges. As these tools continue to evolve, they will undoubtedly play a pivotal role in shaping the future of AI.

For more information, explore the DSPy GitHub repository and the Amazon Bedrock documentation. Stay tuned for future updates and advancements in this exciting field!

--

--

Davide Gallitelli
Davide Gallitelli

Written by Davide Gallitelli

A young Data Engineer, tech passionate and proud geek.

No responses yet