Source code for florist.api.routes.server.job

"""FastAPI routes for the job."""

from typing import List, Union

import requests
from fastapi import APIRouter, Body, Request, status
from fastapi.responses import JSONResponse

from florist.api.db.entities import MAX_RECORDS_TO_FETCH, Job, JobStatus


router = APIRouter()


[docs] @router.get( path="/{job_id}", response_description="Retrieves a job by ID", status_code=status.HTTP_200_OK, response_model=Job, ) async def get_job(job_id: str, request: Request) -> Union[Job, JSONResponse]: """ Retrieve a training job by its ID. :param request: (fastapi.Request) the FastAPI request object. :param job_id: (str) The ID of the job to be retrieved. :return: (Union[Job, JSONResponse]) The job with the given ID, or a 400 JSONResponse if it hasn't been found. """ job = await Job.find_by_id(job_id, request.app.database) if job is None: return JSONResponse(content={"error": f"Job with ID {job_id} does not exist."}, status_code=400) return job
[docs] @router.post( path="", response_description="Create a new job", status_code=status.HTTP_201_CREATED, response_model=Job, ) async def new_job(request: Request, job: Job = Body(...)) -> Job: # noqa: B008 """ Create a new training job. If calling from the REST API, it will receive the job attributes as the Request Body in raw/JSON format. See `florist.api.db.entities.Job` to check the list of attributes and their requirements. :param request: (fastapi.Request) the FastAPI request object. :param job: (Job) The Job instance to be saved in the database. :return: (Job) The job that has been saved in the database. :raises: (HTTPException) status 400 if job.server_info is not None and cannot be parsed into JSON. """ job_id = await job.create(request.app.database) job_in_db = await Job.find_by_id(job_id, request.app.database) assert job_in_db is not None return job_in_db
[docs] @router.get( path="/status/{status}", response_description="List jobs with the specified status", response_model=List[Job], ) async def list_jobs_with_status(status: JobStatus, request: Request) -> List[Job]: """ List jobs with specified status. Fetches list of Job with max length MAX_RECORDS_TO_FETCH. :param status: (JobStatus) The status of jobs to query the Job DB for. :param request: (fastapi.Request) the FastAPI request object. :return: (List[Dict[str, Any]]) A list where each entry is a dictionary with the attributes of a Job instance with the specified status. """ return await Job.find_by_status(status, MAX_RECORDS_TO_FETCH, request.app.database)
[docs] @router.post(path="/change_status", response_description="Change job to the specified status") async def change_job_status(job_id: str, status: JobStatus, request: Request) -> JSONResponse: """ Change job job_id to specified status. :param job_id: (str) The id of the job to change the status of. :param status: (JobStatus) The status to change job_id to. :param request: (fastapi.Request) the FastAPI request object. :return: (JSONResponse) If successful, returns 200. If not successful, returns response with status code 400 and body: {"error": <error message>} """ job_in_db = await Job.find_by_id(job_id, request.app.database) try: assert job_in_db is not None, f"Job {job_id} not found" await job_in_db.set_status(status, request.app.database) return JSONResponse(content={"status": "success"}) except AssertionError as assertion_e: return JSONResponse(content={"error": str(assertion_e)}, status_code=400) except Exception as general_e: return JSONResponse(content={"error": str(general_e)}, status_code=500)
[docs] @router.get("/get_server_log/{job_id}") async def get_server_log(job_id: str, request: Request) -> JSONResponse: """ Return the contents of the server's log file for the given job id. :param job_id: (str) the ID of the job to get the server logs for. :param request: (fastapi.Request) the FastAPI request object. :return: (JSONResponse) if successful, returns the contents of the file as a string. If not successful, returns the appropriate error code with a JSON with the format below: {"error": <error message>} """ try: job = await Job.find_by_id(job_id, request.app.database) assert job is not None, f"Job {job_id} not found" assert job.server_log_file_path is not None and job.server_log_file_path != "", ( "Log file path is None or empty" ) with open(job.server_log_file_path, "r") as f: content = f.read() return JSONResponse(content) except AssertionError as assertion_e: return JSONResponse(content={"error": str(assertion_e)}, status_code=400) except Exception as general_e: return JSONResponse(content={"error": str(general_e)}, status_code=500)
[docs] @router.get("/get_client_log/{job_id}/{client_index}") async def get_client_log(job_id: str, client_index: int, request: Request) -> JSONResponse: """ Return the contents of the log file for the client with given index under given job id. :param job_id: (str) the ID of the job to get the client logs for. :param client_index: (int) the index of the client within the job. :param request: (fastapi.Request) the FastAPI request object. :return: (JSONResponse) if successful, returns the contents of the file as a string. If not successful, returns the appropriate error code with a JSON with the format below: {"error": <error message>} """ try: job = await Job.find_by_id(job_id, request.app.database) assert job is not None, f"Job {job_id} not found" assert job.clients_info is not None, "Job has no clients." assert 0 <= client_index < len(job.clients_info), ( f"Client index {client_index} is invalid (total: {len(job.clients_info)})" ) client_info = job.clients_info[client_index] assert client_info.log_file_path is not None and client_info.log_file_path != "", ( "Log file path is None or empty" ) response = requests.get( url=f"http://{client_info.service_address}/api/client/get_log", params={"log_file_path": client_info.log_file_path}, ) json_response = response.json() return JSONResponse(json_response) except AssertionError as assertion_e: return JSONResponse(content={"error": str(assertion_e)}, status_code=400) except Exception as general_e: return JSONResponse(content={"error": str(general_e)}, status_code=500)