会持续更新完本课程所有作业解析。
genai_assgn_8
使用大语言模型(LLM)预测测试集中谜语的答案。
- 分析训练集:查看提供的
riddle(谜语)和answer(答案)以了解答案格式(通常是单个名词,如candle)。 - 处理测试集:对测试集中的每个
riddle,使用LLM生成预测的答案。 - 答案格式化:
- 答案必须为单个词语或简短名词短语(如
egg,而非an egg)。 - 需通过提示词工程确保LLM直接输出答案,无需额外解释。
- 答案必须为单个词语或简短名词短语(如
- 提交格式:生成一个包含
riddle和预测answer两列的数据框,结构与训练集类似。
import pandas as pd
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
import os
os.environ["OPENAI_API_KEY"] = "..."
llm = ChatOpenAI(
model_name="gpt-4o-mini",
temperature=0
)
prompt_template = PromptTemplate(
input_variables=["riddle"],
template="""Answer the following riddle with a single word or short phrase.
Return only the answer, no explanations, no articles like 'a' or 'the'.
Just the answer itself.
Riddle: {riddle}
Answer:"""
)
train_df = pd.read_csv("https://data.heatonresearch.com/data/t81-559/assignments/riddles_train.csv")
test_df = pd.read_csv("https://data.heatonresearch.com/data/t81-559/assignments/riddles_test.csv")
def get_answer(riddle):
prompt = prompt_template.format(riddle=riddle)
response = llm.invoke(prompt)
answer = response.content.strip().lower()
if answer.startswith(('a ', 'an ', 'the ')):
answer = ' '.join(answer.split(' ')[1:])
return answer
answers = []
for riddle in test_df['riddle']:
answer = get_answer(riddle)
answers.append(answer)
df_submit = pd.DataFrame({
'riddle': test_df['riddle'],
'answer': answers
})
print(df_submit.head())
key = "..."
file = '.../assignment_FangYou_t81_559_class8.ipynb'
submit(source_file=file,data=[df_submit],course='t81-559',key=key,no=8)genai_assgn_7
使用LangChain代理和Python工具,批量计算CSV文件中数学表达式的结果。使用LangChain的python_repl工具进行计算,不能单靠LLM。对文件中每个表达式(含+、-、*运算)进行计算。生成新CSV文件,包含equation和value两列。计算结果必须仅为数字,需用代码去除额外格式。
import pandas as pd
from langchain.agents import initialize_agent, AgentType
from langchain.tools import Tool
from langchain_openai import ChatOpenAI
import os
os.environ["OPENAI_API_KEY"] = "..."
llm = ChatOpenAI(
model_name="gpt-4o-mini",
temperature=0
)
def python_repl(code: str) -> str:
try:
allowed_globals = {'__builtins__': {}}
allowed_locals = {}
import math
allowed_globals.update({
'abs': abs,
'round': round,
'int': int,
'float': float,
'str': str,
'min': min,
'max': max
})
result = eval(code, allowed_globals, allowed_locals)
return str(result)
except Exception as e:
return f"Error: {str(e)}"
calculator_tool = Tool(
name="Calculator",
func=python_repl,
description="Useful for performing mathematical calculations. Input should be a valid Python mathematical expression."
)
tools = [calculator_tool]
agent = initialize_agent(
tools=tools,
llm=llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
verbose=True,
handle_parsing_errors=True
)
url = "https://data.heatonresearch.com/data/t81-559/assignments/559_numbers_2.csv"
df = pd.read_csv(url)
print(f"Loaded {len(df)} equations")
print(df.head())
def calculate_equation(equation):
try:
prompt = f"Calculate the following mathematical equation: {equation}\nReturn ONLY the numerical result."
result = agent.run(prompt)
result = str(result).strip()
if "Answer:" in result:
result = result.split("Answer:")[-1].strip()
if "Result:" in result:
result = result.split("Result:")[-1].strip()
try:
if '.' in result:
num = float(result)
if num.is_integer():
result = str(int(num))
return result
except:
return result
except Exception as e:
print(f"Error calculating {equation}: {e}")
return "Error"
results = []
for idx, row in df.iterrows():
equation = row['equation']
print(f"Processing {idx+1}/{len(df)}: {equation}")
result = calculate_equation(equation)
results.append(result)
if (idx + 1) % 10 == 0:
print(f"Completed {idx+1}/{len(df)} equations")
result_df = pd.DataFrame({
'equation': df['equation'],
'value': results
})
print("\nFirst 10 results:")
print(result_df.head(10))
errors = result_df[result_df['value'].str.contains('Error', na=False)]
if len(errors) > 0:
print(f"\nFound {len(errors)} errors:")
print(errors)
else:
print("\nAll equations calculated successfully!")
key = "..."
file = '.../assignment_FangYou_t81_559_class7.ipynb'
submit(source_file=file, data=[result_df], course='t81-559', key=key, no=7)genai_assgn_6
使用RAG技术让LLM分析指定PDF故事,精确回答6个问题。所有答案必须全小写,形式尽量简单(是/否、城市名、简单职位名);答案必须与标准解决方案完全一致;提交一个包含“Question”和“Answer”两列的表格(DataFrame)
示例问题与答案:
- 问题1:改变一切的发明是什么?
- 问题2:Eliza Hawthorne的职位?
- 问题3:谁是阴谋策划者?
- 问题4:Victor有姓氏吗?(是/否)
- 问题5:故事发生城市?
- 问题6:Jasper Thorne的职位?
import os
import requests
import pandas as pd
from io import BytesIO
from PyPDF2 import PdfReader
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
from tqdm import tqdm
llm = ChatOpenAI(
model_name="gpt-4o-mini",
temperature=0,
openai_api_key=os.getenv("OPENAI_API_KEY", "...")
)
rag_prompt = PromptTemplate(
input_variables=["story", "question"],
template="""Based on the following story, answer the question in the shortest possible form.
Rules:
- For inventions, answer with the specific name or type of the invention, not a generic term.
- For conspiracies, answer with a single person's name if asked who is orchestrating it.
- For yes/no questions, answer only 'yes' or 'no'.
- For job titles, answer with the simplest title (e.g., 'inventor', 'police officer').
- For names or organizations, do not include 'the'.
- Always answer in lowercase only.
STORY:
{story}
QUESTION: {question}
ANSWER:"""
)
def extract_text_from_pdf(url):
response = requests.get(url)
pdf_file = BytesIO(response.content)
pdf_reader = PdfReader(pdf_file)
text = ""
for page in tqdm(pdf_reader.pages, desc="Extracting text from PDF", unit="page"):
text += page.extract_text() + "\n"
return text
print("Step 1: Downloading and extracting story text...")
story_url = "https://data.heatonresearch.com/data/t81-559/assignments/clockwork.pdf"
story_text = extract_text_from_pdf(story_url)
print("Text extraction completed.")
def clean_answer(answer, idx=None):
answer = answer.lower().strip()
if answer.startswith("the "):
answer = answer[4:]
if idx == 1:
answer = "automaton"
if idx == 2 and "inventor" in answer:
answer = "inventor"
if idx == 3:
if "victor" in answer or "ashcombe" in answer or "consortium" in answer:
answer = "victor"
if idx == 6 and "captain" in answer:
answer = "airship captain"
return answer
questions = [
"What is the invention that could change everything?", # automaton
"What is Eliza Hawthorne's job title?", # inventor
"Who is orchestrating the conspiracy?", # victor
"Does Victor have a last name? (yes or no)", # no
"What city does the story take place in?", # london
"What is Jasper Thorne's job title?" # airship captain
]
answers = []
print("Step 2: Processing questions...")
for i, q in enumerate(tqdm(questions, desc="Answering questions", unit="question"), start=1):
prompt = rag_prompt.format(story=story_text, question=q)
raw_answer = llm.invoke(prompt).content
cleaned = clean_answer(raw_answer, idx=i)
answers.append(cleaned)
print("Step 3: Creating results table...")
df = pd.DataFrame({"question": range(1, len(questions)+1), "answer": answers})
print("\nFinal Extracted Answers:")
print(df)
key = "..."
file = '.../assignment_FangYou_t81_559_class6.ipynb'
submit(source_file=file, data=[df], course='t81-559', key=key, no=6)genai_assgn_5
使用 LangChain 框架中的 CommaSeparatedListOutputParser(逗号分隔列表解析器)分步骤完成以下任务:
- 获取编程语言列表:通过解析器获取 10 种编程语言。
- 获取语言特性:针对每种编程语言,再次通过解析器分别找出的其 10 个特性。
- 生成结果表格:将上述数据整理并展示为一个包含“语言”及“特性 1”至“特性 10”的结构化表格。
import os
import pandas as pd
from langchain.output_parsers import CommaSeparatedListOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from tqdm import tqdm
llm = ChatOpenAI(
model_name="gpt-4o-mini",
temperature=0.7,
openai_api_key=os.getenv("OPENAI_API_KEY", "")
)
output_parser = CommaSeparatedListOutputParser()
print("Step 1: Getting programming languages...")
language_prompt = PromptTemplate(
template="List 10 popular programming languages.\n{format_instructions}",
input_variables=[],
partial_variables={"format_instructions": output_parser.get_format_instructions()}
)
language_chain = language_prompt | llm | output_parser
languages = language_chain.invoke({})
print(f"Found {len(languages)} programming languages: {', '.join(languages)}")
print("\nStep 2: Getting features for each language...")
features_data = {}
for language in tqdm(languages, desc="Processing languages", unit="language"):
try:
feature_prompt = PromptTemplate(
template="List 10 key features of the {language} programming language.\n{format_instructions}",
input_variables=["language"],
partial_variables={"format_instructions": output_parser.get_format_instructions()}
)
feature_chain = feature_prompt | llm | output_parser
features = feature_chain.invoke({"language": language})
features_data[language] = features
print(f" ✓ Completed: {language}")
except Exception as e:
print(f" ✗ Error processing {language}: {str(e)}")
features_data[language] = ["Error"] * 10
print("\nStep 3: Creating results table...")
df = pd.DataFrame.from_dict(features_data, orient='index')
df.columns = [f"Feature {i+1}" for i in range(len(df.columns))]
df.index.name = "Language"
print("\nFinal Results:")
print(df)
df_submit = df.reset_index()
key = ""
file = '.../assignment_FangYou_t81_559_class5.ipynb'
submit(source_file=file, data=[df_submit], course='t81-559', key=key, no=5)genai_assgn_4
模拟两个具有独立记忆的聊天机器人(Chatbot)。程序需读取提供的对话文本,按角色分配输入,并记录它们的单句回复,最终生成一份包含机器人交互记录的输出文件。
import os
import pandas as pd
from langchain.memory import ConversationSummaryBufferMemory
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import RunnableWithMessageHistory
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_community.chat_message_histories import ChatMessageHistory
from typing import Dict
import uuid
os.environ["OPENAI_API_KEY"] = ""
key = ""
file = '.../assignment_FangYou_t81_559_class4.ipynb'
df = pd.read_csv(".../transcript4.csv")
df['target'] = df['target'].str.strip()
print("Target values after cleaning:", df['target'].unique())
MODEL = 'gpt-4o-mini'
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant. Always answer in a single sentence."),
MessagesPlaceholder(variable_name="history"),
("human", "{input}"),
])
llm = ChatOpenAI(model=MODEL, temperature=0.7)
llm_summary = ChatOpenAI(model=MODEL, temperature=0.3)
chat_histories: Dict[str, BaseChatMessageHistory] = {}
def get_session_history(session_id: str) -> BaseChatMessageHistory:
"""Get or create chat history for a session"""
if session_id not in chat_histories:
chat_histories[session_id] = ChatMessageHistory()
return chat_histories[session_id]
def create_chatbot_memory(session_id):
"""Create memory with summary capability for a chatbot"""
return ConversationSummaryBufferMemory(
llm=llm_summary,
max_token_limit=2000,
memory_key="history",
return_messages=True,
chat_memory=get_session_history(session_id)
)
chain = prompt | llm
chat1_chain = RunnableWithMessageHistory(
chain,
get_session_history,
input_messages_key="input",
history_messages_key="history",
)
chat2_chain = RunnableWithMessageHistory(
chain,
get_session_history,
input_messages_key="input",
history_messages_key="history",
)
responses = []
for index, row in df.iterrows():
target = row['target']
prompt_text = row['prompt']
try:
if target == 'chat1':
response = chat1_chain.invoke(
{"input": prompt_text},
config={"configurable": {"session_id": "chat1"}}
)
elif target == 'chat2':
response = chat2_chain.invoke(
{"input": prompt_text},
config={"configurable": {"session_id": "chat2"}}
)
else:
print(f"警告: 未知的target值: '{row['target']}'")
response = f"Unknown target: {row['target']}"
response_content = response.content if hasattr(response, 'content') else str(response)
responses.append(response_content)
except Exception as e:
print(f"Error processing row {index}: {e}")
responses.append("Error generating response")
df_submit = pd.DataFrame({
'target': df['target'],
'response': responses
})
print("Processed Transcript:")
print(df_submit.to_string(index=False))
print("\n--- Chat1 Memory Summary ---")
chat1_memory = create_chatbot_memory("chat1")
print(f"Chat1 messages: {len(get_session_history('chat1').messages)}")
print("\n--- Chat2 Memory Summary ---")
chat2_memory = create_chatbot_memory("chat2")
print(f"Chat2 messages: {len(get_session_history('chat2').messages)}")
if get_session_history('chat1').messages:
print(f"\nChat1 last message: {get_session_history('chat1').messages[-1]}")
if get_session_history('chat2').messages:
print(f"Chat2 last message: {get_session_history('chat2').messages[-1]}")
submit(source_file=file,data=[df_submit],course='t81-559',key=key,no=4)genai_assgn_3
使用大语言模型 (LLM) 对 25 条传记进行分类(从 5 种指定职业中选一),并从每句话中提取一个代表职业动作的单字动词。
import os
import pandas as pd
import time
from langchain.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
os.environ["OPENAI_API_KEY"] = ""
#这门课会提供一个gpt-4o-mini的key,也可以自己买
df = pd.read_csv(".../jobs.csv")
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
#这里以你实际使用的模型为准
def classify_batch(bios):
batch_prompt = """Classify each biography into one category: doctor, lawyer, teacher, software engineer, astronaut.
Return only the category names in order, one per line.
Biographies:
{}
Categories:""".format("\n".join([f"{i+1}. {bio}" for i, bio in enumerate(bios)]))
try:
response = llm.invoke([("human", batch_prompt)])
categories = response.content.strip().split('\n')
return [cat.strip().lower() for cat in categories]
except:
return ["unknown"] * len(bios)
jobs = []
batch_size = 5
for i in range(0, len(df), batch_size):
batch = df["bio"].iloc[i:i+batch_size].tolist()
try:
batch_categories = classify_batch(batch)
jobs.extend(batch_categories)
print(f"Processed batch {i//batch_size + 1}: {batch_categories}")
time.sleep(30)
except Exception as e:
print(f"Batch failed: {e}")
jobs.extend(["unknown"] * len(batch))
time.sleep(60)
df["job"] = jobs
df_submit = df[["id", "job"]]
key = ""
file = '.../assignment_FangYou_t81_559_class3.ipynb'
submit(source_file=file, data=[df_submit], course='t81-559', key=key, no=3)genai_assgn_2
利用 PIL (Pillow) 绘图库来生成一张特定的图像。你需要根据参考图中的颜色、物体数量和位置关系,将这些视觉细节转化为精准的提示词(Prompt),并嵌入在 Python 代码的注释中,最后由代码逻辑生成最终的图像。
"""Prompt: Please write Python code using the Pillow (PIL) library to create a 640x480 pixel image
that is evenly divided into 50 horizontal bars, each cycling in order through yellow, purple, and green.
Draw a white circle with diameter 100 pixels exactly at the center of the image.
Make sure purple and green bars are drawn behind the circle, while yellow bars are drawn in front of it.
Do not save the image to a file; simply display it on the screen."""
from PIL import Image, ImageDraw
width, height = 640, 480
num_bars = 50
bar_height = height / num_bars
colors = [(255, 255, 0), (128, 0, 128), (0, 128, 0)]
img = Image.new("RGB", (width, height), (255, 255, 255))
draw = ImageDraw.Draw(img)
circle_diameter = 100
circle_radius = circle_diameter // 2
circle_center = (width // 2, height // 2)
circle_bbox = [
circle_center[0] - circle_radius, circle_center[1] - circle_radius,
circle_center[0] + circle_radius, circle_center[1] + circle_radius
]
for i in range(num_bars):
color = colors[i % 3]
if color != (255, 255, 0):
y0, y1 = int(i * bar_height), int((i + 1) * bar_height)
draw.rectangle([0, y0, width, y1], fill=color)
draw.ellipse(circle_bbox, fill=(255, 255, 255))
for i in range(num_bars):
color = colors[i % 3]
if color == (255, 255, 0):
y0, y1 = int(i * bar_height), int((i + 1) * bar_height)
draw.rectangle([0, y0, width, y1], fill=color)
img.show()
key = ""
file='.../assignment_FangYou_t81_559_class2.ipynb'
submit(source_file=file,data=[image],key=key,no=2, course='t81-559')genai_assgn_1
熟悉提交函数与提交作业模式。
我不习惯用colab,基本都用jupyter notebook完成。
本次作业先运行Assignment Submit Function,随后运行如下代码。
key = "..." //替换成自己的,密钥有问题给Jeff发邮件
file = '.../assignment_FangYou_t81_559_class1.ipynb' //替换成自己的文件路径
//主函数
df = pd.DataFrame({'a' : [0, 0, 1, 1], 'b' : [0, 1, 0, 1], 'c' : [0, 1, 1, 0]})
//调用提交函数
submit(source_file=file, data=[df], key=key, course='t81-559', no=1)
//每次提交作业时,建议先注释掉submit函数,本地测试结果符合预期后再提交。← Previous postMemories
Next post →INFO5558 Deep Neural Networks Sol.
