Spaces:
Running
Running
from io import BytesIO | |
from dotenv import load_dotenv | |
import os | |
from utils import * | |
from fastapi import FastAPI, File, HTTPException, Header, UploadFile,status | |
from tokenManagement import * | |
from jwtcoding import * | |
from fastapi.responses import JSONResponse | |
import docx | |
import fitz | |
from scraper import scrapeCourse | |
import asyncio | |
from google import genai | |
from typing import Optional,List | |
from pydantic import BaseModel | |
load_dotenv() | |
CX = os.getenv("SEARCH_ENGINE_ID") | |
API_KEY = os.getenv("GOOGLE_API_KEY") | |
PINECONE_API_KEY=os.getenv("PINECONE_API_KEY") | |
GEMINI_API_KEY=os.getenv("GEMINI_API_KEY") | |
MONGO_URI=os.getenv("MONGO_URI") | |
app = FastAPI() | |
import re | |
class UserBody(BaseModel): | |
firstName: Optional[str] = None | |
lastName: Optional[str] = None | |
email:str | |
password:str | |
class AiAnalysis(BaseModel): | |
query:str | |
class Token(BaseModel): | |
refreshToken:str | |
class RecommendedCourse(BaseModel): | |
courseTitle:Optional[str]=None | |
courseLink:Optional[str]=None | |
duration:Optional[str]=None | |
courseLevel:Optional[str]=None | |
interimRoleBenefit:Optional[str]=None | |
dreamRoleBenefit:Optional[str]=None | |
courseDescription:Optional[str]=None | |
courseProvider:Optional[str]=None | |
class UserCourse(BaseModel): | |
employmentStatus:str | |
interimRole:bool | |
interimRoleOptions:Optional[List[str]]=None | |
dreamRole:str | |
motivation:str | |
learningPreference:str | |
timeCommitmentPerDay:str | |
challenges:list | |
timeframeToAchieveDreamRole:str | |
recommendedCourses: Optional[List[RecommendedCourse]]=None | |
class CourseRecommendation(BaseModel): | |
courseName: str | |
completionTime: str | |
def extract_course_info(text: str) -> CourseRecommendation: | |
# Example regex patterns – adjust these as needed based on the response format. | |
course_pattern =r'"coursename":\s*"([^"]+)"' | |
time_pattern = r"(\d+\s*-\s*\d+\s*months)" | |
course_match = re.search(course_pattern, text) | |
time_match = re.search(time_pattern, text) | |
coursename = course_match.group(1).strip() if course_match else "Unknown" | |
completiontime = time_match.group(0).strip() if time_match else "Unknown" | |
return CourseRecommendation(courseName=coursename, completionTime=completiontime) | |
import re | |
from urllib.parse import urlparse | |
def extract_provider(url): | |
# Parse the URL | |
parsed_url = urlparse(url) | |
# Extract domain and split it to get the main part | |
domain = parsed_url.netloc.split('.')[1] | |
# Extract course name | |
match = re.search(r'/course/([^/]+)/', url) | |
course_name = match.group(1) if match else "Not found" | |
return domain | |
def get_course(query): | |
# Example search query | |
results = google_search(query, API_KEY, CX) | |
content=[] | |
if results: | |
for item in results.get('items', []): | |
title = item.get('title') | |
link = item.get('link') | |
snippet = item.get('snippet') | |
provider = extract_provider(link) | |
content_structure={} | |
content_structure["courseTitle"]=title | |
content_structure["courseLink"]=link | |
content_structure["courseSnippet"]= snippet | |
content_structure["provider"]= provider | |
content_structure["scrapedCourseDetails"]= scrapeCourse(url=link) | |
content.append(content_structure) | |
return JSONResponse(content,status_code=200) | |
def get_course_func(query): | |
# Example search query | |
results = google_search(query, API_KEY, CX) | |
content=[] | |
if results: | |
for item in results.get('items', []): | |
title = item.get('title') | |
link = item.get('link') | |
snippet = item.get('snippet') | |
provider = extract_provider(link) | |
content_structure={} | |
content_structure["courseTitle"]=title | |
content_structure["courseLink"]=link | |
content_structure["courseSnippet"]= snippet | |
content_structure["scrapedCourseDetails"]= scrapeCourse(url=link) | |
content.append(content_structure) | |
return content | |
async def upload_file(file: UploadFile = File(...),authorization: str = Header(...)): | |
# Extract the token from the Authorization header (Bearer token) | |
token = authorization.split("Bearer ")[-1] | |
# Here, you would validate the token (e.g., check with a JWT library) | |
decoded_user_id,decoded_access_token = decode_jwt(token) | |
is_valid = verify_access_token(db_uri=MONGO_URI, user_id=decoded_user_id, access_token=decoded_access_token) | |
if is_valid != True: # Example check | |
raise HTTPException(status_code=401, detail="Invalid token") | |
else: | |
content = await file.read() # Read the file content (this will return bytes) | |
sentences=[] | |
print(f"File name: {file.filename}") | |
print(f"File content type: {file.content_type}") | |
print(f"File size: {file.size} bytes") | |
if "pdf" == file.filename.split('.')[1]: | |
pdf_document = fitz.open(stream=BytesIO(content), filetype="pdf") | |
extracted_text = "" | |
for page_num in range(pdf_document.page_count): | |
page = pdf_document.load_page(page_num) | |
extracted_text += page.get_text() | |
elif "docx" == file.filename.split('.')[1]: | |
docx_file = BytesIO(content) | |
doc = docx.Document(docx_file) | |
extracted_text = "" | |
for para in doc.paragraphs: | |
extracted_text += para.text + "\n" | |
sentences = split_text_into_chunks(extracted_text,chunk_size=200) | |
docs = generate_embedding_for_user_resume(data=sentences,user_id=file.filename) | |
response= insert_embeddings_into_pinecone_database(doc=docs,api_key=PINECONE_API_KEY,name_space=decoded_user_id) | |
return {" name": file.filename,"response":str(response) } | |
def ask_ai_about_resume(req:AiAnalysis,authorization: str = Header(...)): | |
# Retrieve context from your vector database | |
token = authorization.split("Bearer ")[-1] | |
# Here, you would validate the token (e.g., check with a JWT library) | |
decoded_user_id,decoded_access_token = decode_jwt(token) | |
is_valid = verify_access_token(db_uri=MONGO_URI, user_id=decoded_user_id, access_token=decoded_access_token) | |
if is_valid != True: # Example check | |
raise HTTPException(status_code=401, detail="Invalid token") | |
context = query_vector_database(query=req.Query, api_key=PINECONE_API_KEY, name_space=decoded_user_id) | |
# Ensure that an event loop is present in this thread. | |
try: | |
loop = asyncio.get_event_loop() | |
except RuntimeError: | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
# Create the Gemini client after the event loop is set up | |
client = genai.Client(api_key=GEMINI_API_KEY) | |
response = client.models.generate_content( | |
model="gemini-2.0-flash", | |
contents=f""" | |
Answer this question using the context provided: | |
question: {req.Query} | |
context: {context} | |
""" | |
) | |
return {"Ai_Response":response.text} | |
def ask_ai_to_recommnd_courses(request:UserCourse,authorization:str=Header(...)): | |
""" | |
User Profile Information for Career Development | |
This section defines the parameters used to gather information from the user to understand their current employment situation, learning preferences, challenges, and goals related to achieving their dream role. | |
Parameters: | |
employment_status (str): | |
A description of the user's current employment situation (e.g., "unemployed", "part-time", "full-time"). | |
interim_role (str): | |
Indicates whether the user is willing to prepare for an interim role to gain experience and income while pursuing their dream role (e.g., "yes" or "no"). | |
desired_role (str): | |
The role the user ultimately wishes to obtain (e.g., "Full-Stack Developer", "Data Scientist"). | |
motivation (str): | |
The user's reasons or motivations for pursuing the desired role. | |
learning_preference (str): | |
Describes how the user prefers to learn new skills (e.g., "online courses", "self-study", "bootcamp"). | |
hours_spent_learning (str or int): | |
The number of hours per day the user can dedicate to learning. | |
challenges (str): | |
Outlines any obstacles or challenges the user faces in reaching their dream role. | |
timeframe_to_achieve_dream_role (str): | |
The ideal timeframe the user has in mind for achieving their dream role (e.g., "6-12 months"). | |
""" | |
# Extract the token from the Authorization header (Bearer token) | |
token = authorization.split("Bearer ")[-1] | |
# Here, you would validate the token (e.g., check with a JWT library) | |
decoded_user_id,decoded_access_token = decode_jwt(token) | |
is_valid = verify_access_token(db_uri=MONGO_URI, user_id=decoded_user_id, access_token=decoded_access_token) | |
if is_valid != True: # Example check | |
raise HTTPException(status_code=401, detail="Invalid token") | |
# Ensure that an event loop is present in this thread. | |
try: | |
loop = asyncio.get_event_loop() | |
except RuntimeError: | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
# # Create the Gemini client after the event loop is set up | |
# client = genai.Client(api_key=GEMINI_API_KEY) | |
# response = client.models.generate_content( | |
# model="gemini-2.0-flash", | |
# contents=f""" | |
# please respond with a JSON object that contains the following keys as a response: | |
# - "coursename": the name of the recommended course, | |
# - "completiontime": an estimate of how long it would take to complete the course. | |
# Do not include any extra text. | |
# Recommend a course using this information below : | |
# Which of the following best describes you?: {request.employmentStatus} | |
# Would you like to prepare for an interim role to gain experience and income while pursuing your dream job?: {request.interimRole} | |
# What is your desired role?: {request.dreamRole} | |
# Why do you want to achieve this desired role?: {request.motivation} | |
# How do you prefer to learn new skills?: {request.learningPreference} | |
# How many hours per day can you dedicate to learning?: {request.timeCommitmentPerDay} | |
# What are the biggest challenges or obstacles you face in reaching your dream role?: {request.challenges} | |
# What is your ideal timeframe for achieving your dream role?: {request.timeframeToAchieveDreamRole} | |
# """ | |
# ) | |
questions=request.model_dump() | |
questions['userId']=decoded_user_id | |
create_questionaire(db_uri=MONGO_URI,db_name="crayonics",collection_name="Questionaire",document=questions) | |
# course_info = extract_course_info(response.text) | |
# courses = get_course_func(query=course_info.courseName) | |
return {"courseInfo":"course_info","courses":"courses"} | |
def login(user:UserBody): | |
user ={"email":user.email,"password":user.password,"firstName":user.firstName,"lastName":user.lastName} | |
print(user) | |
user_id= login_user(db_uri=MONGO_URI,db_name="crayonics",collection_name="users",document=user) | |
if user_id != False: | |
refreshToken=create_refreshToken(db_uri=MONGO_URI,user_id=user_id) | |
accessToken = create_accessToken(db_uri=MONGO_URI,user_id=user_id,refresh_token=refreshToken) | |
result = update_refreshTokenWithPreviouslyUsedAccessToken(db_uri=MONGO_URI,refresh_token=refreshToken,access_token=accessToken) | |
print(result) | |
access_token = encode_jwt(user_id=user_id,access_token=accessToken) | |
return {"refreshToken":refreshToken,"accessToken":access_token} | |
return JSONResponse(status_code=401,content={"detail":"Invalid login details"}) | |
def signUp(user:UserBody): | |
user ={"email":user.email,"password":user.password,"first_name":user.firstName,"last_name":user.lastName} | |
user_id= create_user(db_uri=MONGO_URI,db_name="crayonics",collection_name="users",document=user) | |
if user_id != False: | |
refreshToken=create_refreshToken(db_uri=MONGO_URI,user_id=user_id) | |
accessToken = create_accessToken(db_uri=MONGO_URI,user_id=user_id,refresh_token=refreshToken) | |
result = update_refreshTokenWithPreviouslyUsedAccessToken(db_uri=MONGO_URI,refresh_token=refreshToken,access_token=accessToken) | |
print(result) | |
access_token = encode_jwt(user_id=user_id,access_token=accessToken) | |
return {"refreshToken":refreshToken,"accessToken":access_token} | |
return JSONResponse(status_code=status.HTTP_226_IM_USED,content="user already Exists") | |
def logout(refresh:Token,authorization: str = Header(...)): | |
token = authorization.split("Bearer ")[-1] | |
decoded_user_id,decoded_access_token = decode_jwt(token) | |
is_valid = verify_access_token(db_uri=MONGO_URI, user_id=decoded_user_id, access_token=decoded_access_token) | |
if is_valid != True: # Example check | |
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") | |
result = logout_func(db_uri=MONGO_URI,refresh_token= refresh.refreshToken) | |
if result ==True: | |
return {"content": f"successful"} | |
else: | |
return JSONResponse(status_code=status.HTTP_410_GONE,content={"content": f"unsuccessful"}) | |
def refresh_access_token(refresh_token:Token, authorization: str = Header(...)): | |
token = authorization.split("Bearer ")[-1] | |
# Here, you would validate the token (e.g., check with a JWT library) | |
decoded_user_id,decoded_access_token = decode_jwt(token) | |
is_valid = verify_refresh_access_token(db_uri=MONGO_URI, user_id=decoded_user_id, access_token=decoded_access_token,refresh_token=refresh_token.refreshToken) | |
if is_valid != True: # Example check | |
raise HTTPException(status_code=401, detail="Invalid token") | |
new_access_token = create_accessToken(db_uri=MONGO_URI,user_id=decoded_user_id,refresh_token=refresh_token.refreshToken) | |
update_refreshTokenWithPreviouslyUsedAccessToken(db_uri=MONGO_URI,refresh_token=refresh_token.refreshToken,access_token=new_access_token) | |
newly_encoded_access_token = encode_jwt(user_id=decoded_user_id,access_token=new_access_token) | |
return {"accessToken":newly_encoded_access_token} | |
def get_user_details(authorization: str = Header(...)): | |
# Extract the token from the Authorization header (Bearer token) | |
token = authorization.split("Bearer ")[-1] | |
# Here, you would validate the token (e.g., check with a JWT library) | |
decoded_user_id,decoded_access_token = decode_jwt(token) | |
is_valid = verify_access_token(db_uri=MONGO_URI, user_id=decoded_user_id, access_token=decoded_access_token) | |
if is_valid != True: # Example check | |
raise HTTPException(status_code=401, detail="Invalid token") | |
doc = {"user_id":decoded_user_id} | |
user_info = user_details_func(db_uri=MONGO_URI,document=doc) | |
return { "userInfo": user_info} | |
def protected_route(authorization: str = Header(...)): | |
# Extract the token from the Authorization header (Bearer token) | |
token = authorization.split("Bearer ")[-1] | |
# Here, you would validate the token (e.g., check with a JWT library) | |
decoded_user_id,decoded_access_token = decode_jwt(token) | |
is_valid = verify_access_token(db_uri=MONGO_URI, user_id=decoded_user_id, access_token=decoded_access_token) | |
if is_valid != True: # Example check | |
raise HTTPException(status_code=401, detail="Invalid token") | |
return {"message": "Access granted", "verification": "verified"} | |