Implementation
Packages and Frameworks
LangChain, a framework for building applications powered by language models which we used to create the RAG pipeline and manage document retrieval.
Gradio, a Python library for building user-friendly web interfaces which we used to create the interactive chat system for querying documents.
Chroma DB, an open-source vector database which we used to store and retrieve document embeddings for similarity search.
SQLite, a lightweight relational database which we used to create an SQL agent for querying and analysing structured data.
Pytest, a testing framework for Python which we used to write and execute unit tests for different components of the system.
PDFPlumber, a Python library for extracting text from PDFs which we used to process uploaded documents and prepare them for vectorisation.
Pandas, a data analysis library which we used to manipulate tabular data and handle structured document information.
Documentation
Text Extraction
def extract_text_from_pdf(self, pdf_path: str) -> str:
try:
extracted_text = []
with pdfplumber.open(pdf_path) as pdf:
for page in pdf.pages:
text = page.extract_text() # Extract text from the page
extracted_text.append(text if text else "")
# Extract tables from the page
tables = page.extract_tables()
for table_idx, table in enumerate(tables):
df = pd.DataFrame(table) # Convert table data into a DataFrame
table_text = df.to_string(index=False, header=False)
extracted_text.append(f"\n--- Table {table_idx+1} ---\n{table_text}\n")
return "\n".join(extracted_text) # Combine extracted text and tables into a single string
except Exception as e:
print(f"Error processing {pdf_path}: {str(e)}")
return ""
def load_documents(self) -> Dict[str, List[Tuple[str, str]]]:
# Load and store multiple versions of each document.
pdf_files = sorted(Path(self.docs_dir).glob("*.pdf")) # Sort by name
doc_versions = {}
for pdf_path in pdf_files:
doc_name, version = self.extract_doc_name_and_version(pdf_path.stem)
text = self.extract_text_from_pdf(str(pdf_path))
if text.strip():
if doc_name not in doc_versions:
doc_versions[doc_name] = [("v0", "")] # Add a blank v0 if not already present
doc_versions[doc_name].append((version, text)) # Store versioned text
return doc_versions
def extract_doc_name_and_version(self, filename: str) -> Tuple[str, str]:
# Extracts the base name and version from a filename (e.g., doc_v1.pdf -> doc, v1).
parts = filename.rsplit("_", 1)
return (parts[0], parts[1] if len(parts) > 1 else "v1")
The text extraction process begins with the extract_text_from_pdf() method, which uses pdfplumber to read the content of each page within a PDF file. It first attempts to extract the main body of text, ensuring that even pages with minimal or no text are accounted for. Then, it detects tables and converts them into structured text using pandas, preserving their readability. The extracted text and table data are then combined into a single formatted string for further processing. If an error occurs, the function catches the exception and logs an error message, preventing failures from disrupting the workflow.
To determine document names and versions, the extract_doc_name_and_version() method processes filenames by splitting them at the last underscore. This helps isolate the version number from the document’s base name, ensuring that different versions of the same document are correctly grouped. If no version is found, the system defaults to "v1" to maintain consistency across all processed documents. This structured naming approach ensures that the system can efficiently track changes and support accurate version comparisons.
Once the text is extracted, the system needs to organise multiple versions of each document for comparison. The load_documents() method scans the designated documentation directory for PDF files, sorting them by name to maintain a logical order. For each file, it extracts the document’s name and version, then processes the text using extract_text_from_pdf(). If a document is encountered for the first time, the system assigns it an initial placeholder version to ensure version consistency. The extracted text is stored alongside its respective document and version, allowing for structured version tracking.
Analysing Differences
def normalise_text(self, text: str) -> str:
# Normalise text by removing trailing spaces, converting tabs, and standardising line endings.
return "\n".join(line.rstrip().replace("\t", " ") for line in text.splitlines())
def compare_versions(self, old_text: str, new_text: str) -> str:
# Compare two versions of a document and return the differences.
diff = difflib.ndiff(self.normalise_text(old_text).splitlines(), self.normalise_text(new_text).splitlines())
# Filter out lines that start with "?"
filtered_diff = [line for line in diff if not line.startswith("?")]
# Keep the full new text, but mark additions and deletions
marked_text = "\n".join(filtered_diff)
return marked_text
def process_versions(self, doc_versions: Dict[str, List[Tuple[str, str]]]) -> List[Document]:
# Process and compare different document versions.
documents = []
for doc_name, versions in doc_versions.items():
versions.sort() # Ensure versions are in order
for i in range(len(versions) - 1):
# Compute differences between versions
old_version, old_text = versions[i]
new_version, new_text = versions[i + 1]
differences = self.compare_versions(old_text, new_text)
# Split the differences into smaller chunks
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
chunk_size=self.chunking_size, chunk_overlap=self.overlap_size # Configure chunking size and overlap size
)
split_chunks = text_splitter.split_text(differences)
# Annotate chunks with relevant information and metadata
for idx, chunk in enumerate(split_chunks):
enhanced_chunk = (
f"Document: {doc_name}\nVersion Change: {old_version} → {new_version}\nChunk {idx+1}\n{chunk}\n\
Version-related keywords: {old_version}, {new_version}, {doc_name}, {doc_name} {old_version}, {doc_name} {new_version}"
)
# Store chunks as Document objects for compatability with database
doc = Document(
page_content=enhanced_chunk,
metadata={"source": doc_name, "old_version": old_version, "new_version": new_version}
)
documents.append(doc)
return documents
Before comparing different versions of a document, the system first ensures text consistency using the normalise_text() method. This function standardises formatting by removing trailing spaces, replacing tabs with spaces, and ensuring consistent line endings. By applying these normalisation steps, the system prevents minor formatting inconsistencies from being misinterpreted as actual content changes, improving the accuracy of the version comparison process.
Once the text is normalised, the compare_versions() method analyses the differences between two versions of a document. It leverages Python’s difflib.ndiff to generate a line-by-line comparison, marking additions with a “+” symbol and deletions with a “-“ symbol. To improve readability, lines that start with a "?" (which typically indicate minor differences such as spacing changes) are filtered out. The final output provides a structured representation of the modifications between versions, making it easy to identify what content has been altered.
The process_versions() method orchestrates the comparison of multiple document versions and prepares the extracted differences for storage and retrieval. It first ensures that document versions are sorted in the correct order before comparing each version to its immediate predecessor. The detected differences are then split into smaller, manageable chunks using a RecursiveCharacterTextSplitter, which segments text based on a predefined chunk size and overlap. Each chunk is then enhanced with metadata, including the document name, version numbers, and relevant keywords. These structured chunks are stored as Document objects, allowing them to be efficiently retrieved and analysed in the system.
Setting up RAG Pipeline
def create_vectorstore(self, doc_splits: List[Document]):
# Create Chroma vector store from document chunks
embedding_model = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
vectorstore = Chroma.from_documents(
documents=doc_splits,
embedding=embedding_model,
persist_directory=None,
collection_name="documentation_collection"
)
return vectorstore
def setup_rag_chain(self):
# Set up the RAG chain with prompt template and LLM
prompt = PromptTemplate(
template="""You are an assistant that helps analyze changes between different versions of documents.
Use the following detected differences to answer questions.
Lines beginning with a "+" symbol denote lines that were added.
Lines beginning with a "-" symbol denote lines that were deleted.
Lines beginning with symbols other than "+" and "-" were unchanged.
Question: {question}
Differences: {documents}
Answer:
""",
input_variables=["question", "documents"],
)
llm = ChatOllama(model="llama3.1:8b", temperature=0)
return prompt | llm | StrOutputParser()
def setup_rag_system(self):
# Initialise the RAG system
try:
# Load document versions from storage
doc_versions = self.load_documents()
# Process the loaded documents to extract version differences
processed_docs = self.process_versions(doc_versions)
# Create a vector store for document retrieval
vectorstore = self.create_vectorstore(processed_docs)
# Configure the retriever using similarity search with top 4 matches
self.retriever = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": 4})
# Set up the RAG chain
self.rag_chain = self.setup_rag_chain()
# Mark setup as complete
self.setup_completed = True
except Exception as e:
print(f"Error setting up RAG system: {str(e)}")
raise
To efficiently retrieve and analyse document changes, the system first constructs a vector database using the create_vectorstore() method. This method uses the HuggingFaceEmbeddings model, specifically all-MiniLM-L6-v2, to generate numerical embeddings for each document chunk. These embeddings capture semantic meaning, allowing similar pieces of text to be retrieved based on their contextual relevance. The processed document chunks are then stored in a Chroma vector database, ensuring that document differences can be quickly and effectively queried during user interactions.
The setup_rag_chain() method establishes the core of the Retrieval-Augmented Generation pipeline, combining document retrieval with an AI-powered language model. It defines a structured prompt template that informs the AI how to interpret the detected differences. This prompt ensures that the language model understands the change history when generating responses. The system then integrates this prompt with the Llama 3.1 8B Large Language Model (LLM) configured with a temperature of 0 to prioritise deterministic and fact-based responses. Finally, the pipeline is linked with a StrOutputParser, which formats the model’s output into a clean and readable response.
Bringing all components together, the setup_rag_system() method initialises the entire pipeline. It begins by loading and processing document versions, extracting and structuring the detected changes. The processed data is then stored in the Chroma vector database, allowing for similarity-based retrieval. To facilitate querying, the method configures a retriever that fetches the top four most relevant document chunks based on user input. Lastly, it connects the retriever to the RAG pipeline, finalising the setup and marking it as complete. If any errors occur during this process, they are caught and logged to ensure smooth debugging and maintenance.
Returning a Response
def get_answer(self, question: str) -> str:
# Retrieve an answer about changes in document versions.
if self.setup_completed == False:
return "You have not provided any documents. Please upload your documents before asking any questions."
else:
try:
# Retrieve relevant documents based on the question
documents = self.retriever.invoke(question)
# Format retrieved document changes for context
doc_texts = "\n".join([
f"[Changes from {doc.metadata.get('old_version')} to {doc.metadata.get('new_version')} in \
{doc.metadata.get('source')}]:\n{doc.page_content}"
for doc in documents
])
# Generate an answer using the RAG pipeline
answer = self.rag_chain.invoke({"question": question, "documents": doc_texts})
# Store question-answer pair in chat history
self.chat_history.append({"question": question, "answer": answer})
return answer
except Exception as e:
return f"Error processing question: {str(e)}"
def get_chat_metrics(self) -> List[List]:
# Get metrics about the chat history
metrics = []
for idx, interaction in enumerate(self.chat_history, 1):
metrics.append([
idx,
# Truncate long questions for readability
interaction["question"][:50] + "..." if len(interaction["question"]) > 50 else interaction["question"],
len(interaction["answer"].split()), # Word count of answer
len(interaction["answer"]) # Character count of answer
])
# Return the compiled metrics
return metrics
To provide insights into user interactions, the get_chat_metrics() method generates structured metrics on past queries. If no chat history exists, it returns a placeholder message indicating that no interactions have been recorded. Otherwise, it compiles a list of interactions, assigning each one an index for easy reference. For readability, long questions are truncated after 50 characters. Additionally, the method computes key statistics for each response, including the word count and total character length. These metrics are then formatted into a structured table, with a header row for clarity, providing users with a comprehensive overview of their chat activity.
The get_answer() method serves as the primary interface for querying document changes and retrieving AI-generated responses. It first ensures that the system setup is complete, preventing queries from being processed if no documents have been provided. If the setup is complete, the method uses the retriever to fetch relevant document snippets based on the user’s question. These retrieved snippets are formatted to clearly indicate the versions being compared, ensuring that the model has sufficient context. The formatted differences are then passed to the RAG pipeline, which generates a detailed answer using the structured prompt and language model. Finally, the system logs each question-answer pair in the chat history for reference, letting users track their past interactions. If any errors arise during this process, they are gracefully caught and returned as an error message to maintain system stability.
Resetting the RAG Pipeline
def reset(self):
# Reset the object's attributes
self.setup_completed = False
self.retriever = None
self.rag_chain = None
try:
# Delete all contents in the Chroma vector database
vectorstore = Chroma(persist_directory=None, collection_name="documentation_collection")
vectorstore.delete_collection()
except Exception as e:
print(f"Error clearing Chroma database: {str(e)}")
The reset() method is designed to clear the RAG pipeline and remove any previously loaded documents. It begins by resetting key attributes of the object, such as setting “setup_completed” to False and clearing the "retriever" and “rag_chain” objects. This ensures that the pipeline is no longer reliant on any prior data. Additionally, the method attempts to delete the contents of the Chroma vector database, which stores the document embeddings. By calling vectorstore.delete_collection(), it removes the previously indexed documents, effectively resetting the system’s state. If any error occurs during this process, such as issues with accessing or clearing the Chroma database, an error message is printed for debugging. This reset functionality is useful when a fresh set of documents needs to be loaded or when the pipeline needs to be reinitialised for any other reason.
Telemetry
Data Ingestion and Database Population
def init_database():
directory = 'docs/telemetry'
"""Process all Excel files in a specified directory."""
# Find all Excel files in the directory
# Delete existing database file if it exists
if os.path.exists('telemetry.db'):
os.remove('telemetry.db')
print("Removed existing telemetry.db file")
excel_files = glob.glob(os.path.join(directory, "*.xlsx"))
if not excel_files:
print(f"No Excel files found in '{directory}'")
return
print(f"Found {len(excel_files)} Excel files to process")
for excel_path in excel_files:
process_excel_file(excel_path)
print("All files processed")
This feature handles the automated processing of telemetry data stored in Excel files and populates an SQLite database.
File Discovery: The system uses the glob and os standard Python libraries to automatically locate all Excel files (.xlsx) within a specified directory (docs/telemetry). Before processing, it checks for and removes any pre-existing database file (telemetry.db) using os.remove to ensure a fresh start.
def extract_date_suffix(metadata_df, file_name):
"""Extract date range suffix from metadata."""
metadata_text = str(metadata_df.iloc[0, 0]) if not metadata_df.empty else ""
date_match = re.search(r'Data Pulled: (.*?) - (.*?)(?:\t|$)', metadata_text)
if date_match:
start_date = re.sub(r'[^0-9A-Za-z]', '_', date_match.group(1))
end_date = re.sub(r'[^0-9A-Za-z]', '_', date_match.group(2))
date_suffix = f"{start_date}_to_{end_date}"
else:
# Use filename without extension as fallback
date_suffix = os.path.splitext(file_name)[0]
return date_suffix
Excel Data Reading: The pandas library is central to reading the data. pd.ExcelFile is used to open each Excel file efficiently. Metadata, specifically the date range of the data pull, is extracted from a designated sheet ('WS-Data') using pd.read_excel with specific skiprows and nrows arguments. Regular expressions (re library) are employed within the extract_date_suffix function to parse this date range from the metadata text. If metadata extraction fails, the original filename (without extension) is used as a fallback identifier. For the main data sheets, pd.read_excel is used again, critically specifying a multi-level header (header=[3, 4]) to correctly capture the structured column names present in the source files.
def clean_column_names(columns):
"""Clean column names by removing special characters and spaces."""
return (columns
.str.replace('\n', ' ', regex=True)
.str.replace('Unnamed: \d+.*', '', regex=True)
.str.strip()
.str.replace(' ', '_')
.str.replace('%', 'pct')
.str.replace('>', 'gt')
.str.replace('<', 'lt')
.str.replace('#', 'num')
.str.replace('[^a-zA-Z0-9_]', '_', regex=True)
.str.replace('_{2,}', '_', regex=True))
def clean_dataframe(df):
# Process column names
df.columns = [' '.join(filter(None, col)).strip() for col in df.columns]
df = df.replace('', pd.NA)
# Clean column names
df.columns = clean_column_names(df.columns)
# Convert special columns if they exist
if 'CPU_%Utilization_Core__Total_Time_gt_80%' in df.columns:
try:
df['CPU_%Utilization_Core__Total_Time_gt_80%'] = pd.to_timedelta(df['CPU_%Utilization_Core__Total_Time_gt_80%'])/60
except:
pass
return df
Data Cleaning and Transformation: Column names often contain special characters, spaces, and line breaks unsuitable for database use. The clean_column_names function uses pandas string manipulation methods (.str.replace, str.strip) and regular expressions (re) to sanitize these names, replacing problematic characters (e.g., %, >, <, #, spaces, multiple underscores) with standardized alternatives (e.g., pct, gt, lt, num, _). Empty strings are replaced with pd.NA (Pandas' representation for missing data). Specific data type conversions are performed where necessary, for example, converting time-related columns (like CPU_%Utilization_Core__Total_Time_gt_80%) into a numerical representation (minutes) using pd.to_timedelta. Error handling (try-except) is included for robustness.
def get_connection():
"""Create and manage a database connection context."""
conn = sqlite3.connect('telemetry.db')
try:
yield conn
finally:
conn.close()
def save_to_database(df, sheet_name, date_suffix):
"""Save the processed data to a SQLite database."""
# Create table name
clean_sheet_name = sheet_name.replace('-', '_')
table_name = f"{clean_sheet_name}_{date_suffix}"
# Save to database
with get_connection() as conn:
df.to_sql(table_name, conn, if_exists='replace', index_label="row_index")
print(f" - Processed sheet: {table_name}")
Database Connection and Storage: The standard sqlite3 library is used for database interaction. A contextmanager (get_connection) ensures that database connections are properly opened and closed, even if errors occur. The sqlite_utils library is used later for schema inspection but the core data saving relies on pandas. Processed data for each sheet is saved into the SQLite database (telemetry.db). The df.to_sql() method from pandas is used. Crucially, table names are generated dynamically by combining the cleaned sheet name and the extracted date suffix (e.g., SheetName_StartDate_to_EndDate). This ensures data from different time periods or categories is stored separately but predictably. The if_exists='replace' argument ensures that reloading the same file updates the corresponding table.
Natural Language Query Interface
This feature allows users to ask questions in natural language (English) and retrieve relevant data from the populated SQLite database. It leverages Large Language Models (LLMs) and structured tool usage.
# Open the database
db = sqlite_utils.Database("telemetry.db")
# Extract schema as a dictionary
schema = {table: db[table].columns_dict for table in db.table_names()}
schema = "\n".join([f"Table: {table}\nColumns: {', '.join(columns.keys())}" for table, columns in schema.items()])
Database Schema Extraction: Before the LLM can generate queries, it needs to know the database structure. The sqlite_utils library is used to connect to the existing telemetry.db and extract the schema (table names and their corresponding column names and types). This schema information is formatted into a string.
LLM Integration: The langchain_ollama library provides integration with the Ollama framework to run local LLMs (specifically llama3.1:8b in this case). ChatOllama is instantiated to represent the language model.
system_prompt = f"""
You are an expert SQL analyst. You must generate SQL queries **strictly based** on the provided database schema.
### Database Schema:
{schema}
### Rules:
1. **Use only existing tables and columns.** Never invent names.
2. **Use the correct table based on the question**. The table names include date ranges, so choose the appropriate one.
3. **Refer to column names exactly as they appear and use double quotations(") for column names due to special charachters especially in functions like avg, max, etc** Example:
- Instead of `cpu_utilization`, the correct column is `"CPU_%Utilization_All_CPU_Cores_Highest_avg_%"`.
4. **If unsure, do NOT guess—ask for clarification.** Now, generate an appropriate SQL query.
"""
SQL Generation Prompt: A detailed system prompt (system_prompt) is provided to the LLM. This prompt instructs the LLM to act as an expert SQL analyst, provides the extracted database schema, and lays out strict rules: Only use tables and columns present in the schema. Select the correct table based on the time period implied in the user's question (as encoded in the table names). Use exact column names, quoting them (") if they contain special characters. Avoid guessing and ask for clarification if needed.
def sql_query(query: str):
"""Execute a SQL SELECT query on a SQLite database and return results.
Args:
query (str): A valid SQL SELECT query to execute on the database
Returns:
list: A list of dictionaries containing query results,
where each dictionary represents a row"""
with sqlite3.connect('telemetry.db') as connection:
return pd.read_sql_query(query, connection).to_dict(orient='records')
SQL Execution Tool: A custom tool (sql_query) is defined using langchain_core.tools.tool. This tool accepts a SQL query string as input. It connects to the telemetry.db using sqlite3. It executes the query using pd.read_sql_query, which conveniently returns the results as a pandas DataFrame. The results are converted to a list of dictionaries (.to_dict(orient='records')) for compatibility with the LangChain framework.
def sql_assistant(question):
llm = ChatOllama(model='llama3.1:8b', temperature=0.1).bind_tools([sql_query])
llm_without_tools = ChatOllama(model='llama3.1:8b', temperature=0.1)
messages = [SystemMessage(content=system_prompt), HumanMessage(content=question)]
print(messages)
response =llm.invoke(messages)
for tool_call in response.tool_calls:
selected_tool = {"sql_query":sql_query}[tool_call["name"].lower()]
tool_msg = selected_tool.invoke(tool_call)
result_prompt = f"""
Here are the results to answer the question{question}:
{tool_msg}
Please analyze these results and provide actionable insights.(DO NOT WRITE AN SQL QUERY)
Format your response in a clear, concise manner without repeating all the raw data.
"""
messages = [HumanMessage(content=result_prompt)]
return llm_without_tools.invoke(messages).content
Query Process Flow: The user provides a question (e.g., "What was the average CPU utilization last week?"). The question, along with the system prompt containing the schema and rules, is sent to the first ChatOllama instance (llm). This instance is configured (bind_tools) to use the sql_query tool. The LLM processes the input and generates a corresponding SQL query, formatted as a tool call for the sql_query tool. The application identifies the tool call and invokes the sql_query tool with the generated SQL. The sql_query tool executes the SQL against the database and returns the results. These results are then passed to a second ChatOllama instance (llm_without_tools). This LLM is given a prompt asking it to analyse the raw SQL results and provide actionable insights in a user-friendly format, specifically instructed not to output the SQL query again. The final, summarized answer is returned to the user.
This implementation leverages powerful libraries like pandas for data manipulation, sqlite3 for database storage, and langchain combined with an LLM (Ollama) for the natural language interface, creating an effective system for analyzing telemetry data.
Requests for Quotation (RFQs)
PDF Content and Table Extraction
def extract_content_from_pdf(self, pdf_path: str) -> str:
"""
Extract text and tables from a PDF file using pdfplumber for text and camelot for tables.
Args:
pdf_path (str): Path to PDF file
Returns:
str: Combined extracted text and table content from PDF
"""
try:
text_content = ""
tables_data = []
# Extract text outside tables with pdfplumber
with pdfplumber.open(pdf_path) as pdf:
for page in pdf.pages:
# Get table boundaries
table_bboxes = [table.bbox for table in page.find_tables()]
# Extract text outside tables
text = page.extract_text(x_tolerance=2, y_tolerance=2, tables=table_bboxes)
if text:
text_content += text + "\n"
# Find the cutoff point in the text content
cutoff_marker = "Services are comprised of the following:"
if cutoff_marker in text_content:
# Split at the marker and keep only the content before it (plus the marker itself)
parts = text_content.split(cutoff_marker, 1)
text_content = parts[0] + cutoff_marker
# Get total page count for table extraction
with pdfplumber.open(pdf_path) as pdf:
total_pages = len(pdf.pages)
# Process each page for tables
for page_num in range(1, total_pages + 1):
try:
# Read tables from the current page
tables = camelot.read_pdf(pdf_path, pages=str(page_num))
if tables and tables.n > 0:
for table_num, table in enumerate(tables):
# Convert the table to DataFrame
df = table.df
# Handle header row if present
if not df.empty:
# Use first row as header and drop it
df = (
df.rename(columns=df.iloc[0])
.drop(df.index[0])
.reset_index(drop=True)
)
# Clean up newlines in data
df = df.apply(lambda x: x.str.replace('\n', ' ') if x.dtype == 'object' else x)
# Clean up column names to be valid
df.columns = [
re.sub(r'[^a-zA-Z0-9_]', '', col.replace('\n', ' ').replace(' ', '_'))
for col in df.columns
]
# Remove empty columns
df = df.dropna(axis=1, how='all')
# Identify table type
table_type = self._identify_table_type(df)
# Convert DataFrame to text representation for vector storage
table_text = f"--- TABLE NUMBER {table_num+1} ({table_type}) ---\n"
# Add headers
headers = " | ".join([str(col) for col in df.columns])
table_text += headers + "\n"
# Add separator
table_text += "-" * len(headers) + "\n"
# Add rows
for _, row in df.iterrows():
row_text = " | ".join([str(val) if pd.notna(val) else "" for val in row])
table_text += row_text + "\n"
table_text += "--- END TABLE ---\n"
# Store table data as text
tables_data.append(table_text)
except Exception as table_err:
print(f"Error extracting tables from page {page_num} of {pdf_path}: {str(table_err)}")
# Combine text content and table data
combined_content = text_content + "\n" + "\n".join(tables_data)
return combined_content
except Exception as e:
print(f"Error processing {pdf_path}: {str(e)}")
return ""
Purpose: Extract unstructured text and structured tables from RFQ PDFs, ensuring accurate retrieval of specifications and pricing details.
Libraries Used: pdfplumber (text extraction), camelot-py (table extraction), pandas (data manipulation), and re (text cleaning).
- extract_content_from_pdf() extracts text using pdfplumber, ensuring tables are excluded to prevent duplication.
- A rule truncates text at "Services are comprised of the following:" to filter irrelevant sections.
- camelot.read_pdf() extracts tables, processing them into cleaned DataFrames.
- _identify_table_type() classifies tables based on keywords.
- Tables are serialized into a structured text format for indexing.
- Narrative text and table text are combined into a single string for document representation.
Document Loading and Metadata Extraction
def _extract_key_metadata(self, text: str) -> Dict[str, str]:
"""
Extract key metadata from RFQ text.
Args:
text (str): Text content
Returns:
Dict[str, str]: Extracted metadata
"""
metadata = {}
# Extract customer name
customer_match = re.search(r"(?i)Customer Name[:\s]+([^\n]+)", text)
if customer_match:
metadata["customer_name"] = customer_match.group(1).strip()
# Extract contract duration
duration_match = re.search(r"Contract duration:\s*(\d+)\s*\(Months\)", text)
if duration_match:
metadata["contract_duration"] = duration_match.group(1)
# Extract monthly fee
fee_match = re.search(r"pay HP per month in GBP at £([\d,]+\.\d+)", text)
if fee_match:
metadata["monthly_fee"] = fee_match.group(1)
return metadata
def load_documents(self) -> List[Document]:
"""
Load documents from local PDF files.
Returns:
List[Document]: Text documents
"""
text_documents = []
pdf_files = list(Path(self.docs_dir).glob("*.pdf"))
if not pdf_files:
raise ValueError(f"No PDF files found in {self.docs_dir}")
for pdf_path in pdf_files:
try:
text_content = self.extract_content_from_pdf(str(pdf_path))
# Extract metadata from text
metadata = self._extract_key_metadata(text_content)
metadata["source"] = str(pdf_path.name)
if text_content.strip(): # Only add if text was successfully extracted
doc = Document(
page_content=text_content,
metadata=metadata
)
text_documents.append(doc)
print(f"Successfully loaded: {pdf_path.name}")
except Exception as e:
print(f"Error loading {pdf_path.name}: {str(e)}")
if not text_documents:
raise ValueError("No documents were successfully loaded")
return text_documents
Purpose: Convert processed content into a standardized format with relevant metadata.
Libraries Used: pathlib (file handling), langchain.docstore.document.Document (text representation), re (regex for metadata extraction).
- load_documents() finds PDFs, extracts content, and calls _extract_key_metadata() to extract customer names, contract duration, and fees.
- Metadata, including the source filename, is attached to each document.
- Documents are stored as Document objects for further processing.
Text Splitting (Chunking)
def split_documents(self, docs_list: List[Document]) -> List[Document]:
"""Split text documents into chunks"""
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
chunk_size=500,
chunk_overlap=20
)
return text_splitter.split_documents(docs_list)
Purpose: Split large documents into smaller, coherent chunks for effective retrieval and embedding.
Libraries Used: langchain.text_splitter.RecursiveCharacterTextSplitter, tiktoken (token-aware splitting).
- split_documents() applies recursive splitting using a hierarchy of separators.
- chunk_size=500 and chunk_overlap=20 ensure context retention across splits.
- Metadata is preserved in the chunked documents.
Vector Embeddings and Storage
def create_vectorstore(self, doc_splits: List[Document],
collectionName: str = "rfq_text") -> Chroma:
"""Create Chroma vector store from document chunks"""
embedding_model = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
vectorstore = Chroma.from_documents(
documents=doc_splits,
embedding=embedding_model,
persist_directory=None,
collection_name=collectionName
)
return vectorstore
Purpose: Generate semantic embeddings for document chunks and store them for similarity search.
Libraries Used: langchain_huggingface.HuggingFaceEmbeddings, langchain_chroma.Chroma (vector store).
- create_vectorstore() initializes the HuggingFaceEmbeddings model (all-MiniLM-L6-v2).
- Chroma.from_documents() stores chunk embeddings for retrieval.
- Operates in-memory, but can be configured for persistence.
Retrieval Mechanism
def setup_rag_system(self):
"""Initialize the complete RAG system"""
try:
# Load documents with both text and table content
docs = self.load_documents()
# Split documents
doc_splits = self.split_documents(docs)
# Create vector store
vectorstore = self.create_vectorstore(doc_splits, "rfq_content")
# Create retriever
self.retriever = vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 4}
)
# Setup RAG chain
self.rag_chain = self.setup_rag_chain()
except Exception as e:
print(f"Error setting up RAG system: {str(e)}")
raise
Purpose: Identify relevant document chunks based on a user query.
Libraries Used: langchain_chroma.Chroma (vector search).
- setup_rag_system() configures the retriever using vectorstore.as_retriever().
- Uses semantic similarity search with search_type="similarity" and retrieves the top k=4 results.
Generation with LLM (RAG Chain)
def setup_rag_chain(self):
"""Set up the RAG chain with prompt template and LLM"""
prompt = PromptTemplate(
template="""You are an expert assistant for analyzing RFQ (Request for Quote) documents.
Use the following information from RFQ documents to answer the question accurately.
Pay special attention to pricing information, contract terms, and service specifications.
If tabular data is provided, use it to extract precise figures and details.
If you don't know the answer, just say that you don't know.
Include the source document name in your answer when possible.
Question: {question}
Content:
{text_documents}
Answer:
""",
input_variables=["question", "text_documents"],
)
llm = ChatOllama(
model="llama3.1:8b",
temperature=0.1,
num_predict=512,
)
return prompt | llm | StrOutputParser()
Purpose: Generate answers using retrieved context and a structured prompt.
Libraries Used: langchain.prompts.PromptTemplate, langchain_ollama.ChatOllama, langchain_core.output_parsers.StrOutputParser.
- setup_rag_chain() defines a structured prompt template, guiding the LLM to focus on pricing, contract terms, and service details.
- ChatOllama (model llama3.1:8b) is initialized with temperature=0.1 for deterministic responses.
- The pipeline integrates the prompt, LLM, and output parser using LangChain Expression Language (LCEL).
Query Processing and Answer Generation
def get_ans(self, question: str) -> str:
"""Get answer for a given question by retrieving relevant content"""
try:
# Retrieve relevant documents
documents = self.retriever.invoke(question)
doc_texts = "\n".join([
f"[From {doc.metadata.get('source', 'Unknown')}]: {doc.page_content}"
for doc in documents
])
# Generate answer using text and table information
answer = self.rag_chain.invoke({
"question": question,
"text_documents": doc_texts
})
# Save to chat history
self.chat_history.append({
"question": question,
"answer": answer,
"sources": [doc.metadata.get('source') for doc in documents]
})
return answer
except Exception as e:
return f"Error processing question: {str(e)}"
Purpose: Retrieve context, generate responses, and maintain interaction history.
- get_ans() retrieves relevant document chunks using self.retriever.invoke(question)./li>
- Formats retrieved text with source information.
- Calls self.rag_chain.invoke() to generate an answer based on the retrieved context.
- Stores the query and response in self.chat_history.
This structured pipeline ensures efficient RFQ information retrieval, leveraging advanced document processing and retrieval-augmented generation techniques.
Frontend
Returning Responses
# Initialize the RAG system
rfq_rag = RFQRAG()
documentation = Documentation()
telemetry_metrics = []
def get_telemetry_response(message, history):
# Handle chat messages and return Telemetry responses
init_database()
response = sql_assistant(message)
telemetry_metrics.append({"question": message, "answer": response})
return response
def get_document_response(message, history):
# Handle chat messages and return Documentation responses
response = documentation.get_answer(message)
return response
def get_rfq_response(message, history):
# Handle chat messages and return RFQ responses
response = rfq_rag.get_ans(message)
return response
def get_metrics():
# Get all chat metrics for the metrics tab
full_telemetry_metrics = []
# Calculate metrics for telemetry use case
if telemetry_metrics:
for idx, interaction in enumerate(telemetry_metrics, 1):
full_telemetry_metrics.append([
idx,
# Truncate long questions for readability
interaction["question"][:50] + "..." if len(interaction["question"]) > 50 else interaction["question"],
len(interaction["answer"].split()), # Word count of answer
len(interaction["answer"]) # Character count of answer
])
# Collect metrics for all use cases
all_metrics = (documentation.get_chat_metrics() or []) + (rfq_rag.get_chat_metrics() or []) + (full_telemetry_metrics or [])
return all_metrics
This code is part of the Gradio frontend, where we initialise key objects representing different use cases in the system. The rfq_rag object is created from the RFQRAG class, responsible for handling RFQ-related queries. Similarly, the documentation object is an instance of the Documentation class, used to retrieve answers related to documentation incoherencies. A global list, telemetry_metrics, is also initialized to store interactions related to the telemetry use case.
Several functions are defined to process user queries for each use case. The get_telemetry_response() function handles messages related to telemetry data, initialising the database and querying the SQL assistant. Each interaction is stored in the telemetry_metrics() list for later analysis. The get_document_response() function processes queries related to documentation incoherencies using the documentation.get_answer() method, while get_rfq_response() fetches RFQ-related answers using the rfq_rag.get_ans() method.
Additionally, the get_metrics() function compiles response metrics across all use cases. It first processes telemetry interactions, extracting relevant details such as truncated question text, word count, and character count. The function then aggregates these telemetry metrics with those retrieved from the documentation and rfq_rag objects, ensuring all interactions are included in the final output. This data can be used in the frontend's metrics tab to analyse system performance.
Handling Files
def upload_files(files, folder_name):
# Handle file uploads and move them to the documentation folder
if not os.path.exists(folder_name):
os.makedirs(folder_name) # Ensure the folder exists
for file in files:
destination_path = os.path.join(folder_name, os.path.basename(file)) # Get filename
shutil.move(file, destination_path) # Move file to the target folder
if folder_name == UPLOAD_FOLDER_DOC:
documentation.setup_rag_system() # Reload documents after upload
return f"Uploaded {len(files)} files successfully!"
def delete_uploaded_files(folder_name):
# Delete all files in the upload folder and reset documentation setup status
protected_files = ["RFQ1.pdf","RFQ2.pdf", "RFQ3.pdf", "SampleTelemetryData.xlsx"]
if not os.path.exists(folder_name):
return "The upload folder is already empty."
# If the folder matches the protected folder, preserve certain files
for file in os.listdir(folder_name):
file_path = os.path.join(folder_name, file)
if os.path.isfile(file_path) and file not in protected_files:
os.remove(file_path) # Delete only non-protected files
if folder_name == UPLOAD_FOLDER_DOC:
documentation.reset()
return "All uploaded files have been deleted successfully! Please upload files before asking any questions."
# Run this on app startup to clear the upload folders
delete_uploaded_files(UPLOAD_FOLDER_DOC)
delete_uploaded_files(UPLOAD_FOLDER_RFQ)
delete_uploaded_files(UPLOAD_FOLDER_TEL)
This code is responsible for handling file uploads and deletions within the system. The upload_files() function takes a list of files and a target folder name, creating the folder if it does not already exist. Each uploaded file is moved to the designated folder, and if the files are being uploaded to the documentation folder, the setup_rag_system method is called to reload the document processing system. This ensures that newly uploaded documents are immediately available for queries. The function then returns a success message indicating how many files were uploaded.
The delete_uploaded_files() function is designed to remove all uploaded files from a specified folder, while ensuring that critical reference files remain protected. A predefined list of protected files, such as RFQ documents and telemetry data, is checked before deletion to prevent accidental removal of essential resources. If the specified folder exists, the function iterates through its contents and deletes any files that are not in the protected list. If the folder being cleared is the documentation folder, the reset method of the documentation object is called to reset the system state. A success message is then returned, instructing users to upload new files before submitting queries.
We also use delete_uploaded_files() on each of the main upload folders when the application starts. This ensures that any temporary files from previous sessions are cleared, preventing outdated or irrelevant documents from affecting the system’s responses. By resetting the upload directories at startup, the system maintains a clean and organised workspace for new user interactions.
Creating the Interface
# Documentation RFQ Tab
with gr.Blocks() as rfq:
gr.Markdown("### RFQ Chat")
document_chat = gr.ChatInterface(
fn=get_rfq_response,
type="messages",
examples=["What is the price of table 1?",
"What are the quantities for table 1?"],
description="Ask questions about RFQ data.",
save_history=True,
editable=True
)
# Area to upload new files
gr.Markdown("### Upload New RFQ File")
upload_button = gr.Files(file_types=[".pdf"], type="filepath", label="Upload RFQ pdfs")
upload_output = gr.Textbox(label="Upload Status")
upload_button.upload(fn=upload_files, inputs=[upload_button, gr.State(UPLOAD_FOLDER_RFQ)], outputs=upload_output)
# Button to delete uploaded files
delete_button = gr.Button("Clear Upload Folder", variant="stop")
delete_output = gr.Textbox(label="Delete Status")
delete_button.click(fn=delete_uploaded_files, inputs=gr.State(UPLOAD_FOLDER_RFQ), outputs=delete_output)
# Chat Metrics Tab
metrics = gr.Interface(
fn=get_metrics,
inputs=None,
outputs=gr.Dataframe(headers=["Interaction #", "Question", "Answer Word Count", "Answer Character Count"]),
title="Chat Metrics",
)
# Create tabbed interface
app = gr.TabbedInterface(
[telemetry, document, rfq, metrics],
["Telemetry" ,"Documentation","RFQ", "Metrics"],
title="HP - Business Edge",
theme = theme,
css="body { color-scheme: dark; }"
)
app.launch(share=True, server_name="0.0.0.0", server_port=7861)
This section of the code defines the user interface for the use cases. The interface consists of a chat area where users can input queries about the uploaded files and receive responses using the function corresponding to the required use case. Example queries, such as asking for trends or insights, are provided to guide users on how to interact with the system. The chat interface also includes features like saving chat history, making it a flexible and user-friendly experience.
In addition to the chat function, the interfaces also include file management tools for uploading and deleting files. Users can upload files through an upload button, which moves the files to the designated folder and provides a status update on the upload process. There is also a delete button that allows users to clear the upload folder, ensuring that only the most relevant files are retained. These features are implemented for all use cases, meaning the documentation and telemetry tabs have nearly identical functionalities.
The system also includes a metrics tab, which provides insights into chat interactions. The get_metrics() function generates a table displaying the interaction number, user question, and the length of the system’s responses in both word and character counts. This tab allows users to analyse system performance and response efficiency across different queries.
To structure the interface, each use case, and the metrics section, is placed in its own separate tab using the gr.TabbedInterface() function. This ensures that users can easily navigate between different functionalities without confusion. The application is then launched with specified settings, such as enabling sharing and defining the server configuration.