Serving Machine Learning Models#

Model Serving#

The process of deploying trained machine learning models to production environment to make predictions on new data. This involves making the model accessible via an API or a GUI, allowing users or applications to send data to the model and receive predictions in return.

Inference Strategies#

Deployment strategies refer to the methods and practices used to deploy machine learning models into production. These strategies can vary based on the requirements of the application, the infrastructure available, and the scale at which the model needs to operate. Common deployment strategies include:

  • Batch Inference: Running predictions on a batch of data at scheduled intervals. This is suitable for applications where real-time predictions are not required.

  • Online Inference: Making predictions in real-time as data comes in. This is suitable for applications that require immediate responses, such as recommendation systems or fraud detection.

  • Streaming Inference: Continuously processing and making predictions on a stream of data, such as sensor data or user interactions. This is suitable for applications that require real-time analysis of continuous data streams.

  • Edge Inference: Deploying models on edge devices (e.g., IoT devices, mobile phones) to reduce latency and bandwidth usage. This is suitable for applications that require low-latency responses and can operate with limited resources.

Feature

Batch Inference

Online Inference

Streaming Inference

Edge Inference

Trigger

Scheduled (e.g., daily, hourly)

On-demand (API call)

On data arrival in stream

Local event or sensor input

Input Type

Large dataset at once

Single or few inputs

Continuous data stream

Input from local device

Latency

High (minutes to hours)

Low (ms–s)

Low to moderate (near real-time)

Very low (ms)

Throughput

Very high

Low to moderate

High

Varies (device-limited)

Use Cases

Analytics, periodic predictions (e.g., churn)

Web apps, chatbots, image APIs

Fraud detection, log monitoring, IoT pipelines

Autonomous vehicles, drones, offline apps

Deployment

Server, cloud job (e.g., Airflow, SageMaker)

REST API, gRPC (FastAPI, Ray Serve)

Stream processors (Kafka, Flink, Spark)

Embedded device, mobile, microcontroller

Example

Predict next week’s demand from entire dataset

Classify one uploaded image

Classify every transaction in real time

Detect person using mobile camera

Model Refresh

Periodic (daily, weekly)

On model redeploy

Stream model updates possible

Often static (limited updates)

Resource Needs

High (GPU/TPU batch servers)

Scalable APIs (autoscaling possible)

Stream processing engines + serving infra

Limited CPU/RAM, no GPU

Type

Focus

Best For

Batch

Scale

Periodic bulk jobs, forecasts, analytics

Online

Responsiveness

Real-time user apps (low-latency, request/response)

Streaming

Reactivity

Always-on systems reacting to live data

Edge

Locality

Low-latency, disconnected, on-device inference

Tools for Model Serving#

Different tools can be used to deploy and serve machine learning models as scale, including:

  • FastAPI: a high-performance web framework to build REST APIs for serving models.

  • Flask: a lightweight web framework to build REST APIs for serving models.

  • TensorFlow Serving: a specialized server for serving TensorFlow models.

  • TorchServe: a specialized server for serving PyTorch models.

  • ONNX Runtime: a cross-platform inference engine for ONNX models.

  • Seldon Core: a Kubernetes-native platform to deploy and manage machine learning models.

  • BentoML: a framework to package and deploy machine learning models as APIs.

  • LitServe: a framework to serve ML models with minimal code and powerful features, such as batching, streaming, GPU acceleration, and autoscaling.

LitServe#

An opens-source model serving framework that provides a simple and efficient way to deploy machine learning models as APIs. It is designed to be easy to use, flexible, and scalable, making it suitable for both small and large-scale deployments. It is built on top of FastAPI and provides a set of features to simplify the process of serving models, such as:

  • Automatic Batching: Automatically batches incoming requests to improve throughput and reduce latency.

  • Streaming Responses: Supports streaming responses for real-time applications.

  • GPU Acceleration: Supports GPU acceleration for faster inference.

  • Autoscaling: Automatically scales the number of workers based on the incoming traffic.

Getting Started with LitServe#

  • Install LitServe using pip:

    pip install litserve
    
  • Create a new directory for your project and navigate to it:

    mkdir demo_litserve
    cd demo_litserve
    
  • Create a new Python file named server.py and add the following code:

    import litserve as ls
    
    
    class SimpleLitAPI(ls.LitAPI):
        def setup(self, device):
            self.model1 = lambda x: x**2  # pylint:disable=W0201
            self.model2 = lambda x: x**3  # pylint:disable=W0201
    
        def decode_request(self, request):  # type: ignore pylint:disable=W0221
            return request["input"]
    
        def predict(self, x):  # type: ignore pylint:disable=W0221
            squared = self.model1(x)
            cubed = self.model2(x)
            output = squared + cubed
            return {"output": output}
    
        def encode_response(self, output):  # type: ignore pylint:disable=W0221
            return {"output": output}
    
    
    if __name__ == "__main__":
        api = SimpleLitAPI()
        server = ls.LitServer(api, accelerator="auto")
        server.run(port=8000)
    
  • Deploy the server on your local machine:

    lightning deploy server.py
    
  • Proceed to the link http://localhost:8000/docs to access the API documentation and test the API.

  • A client.py file will be automatically generated in the same directory to test the API. You can run it to see how the API works:

    python client.py
    
  • You can define PyDantic models for request and response validation.

    from typing import Dict
    
    import litserve as ls
    from pydantic import BaseModel as PydanticBaseModel
    from fastapi.middleware.cors import CORSMiddleware
    
    class BaseModel(PydanticBaseModel):
        class config:
            arbitrary_types_allowed = True
    
    class SimpleLitRequest(BaseModel):
        input: float
    
    class SimpleLitResponse(BaseModel):
        output: float
    
    class SimpleLitAPI(ls.LitAPI):
        def setup(self, device):
            self.model1 = lambda x: x**2  # pylint:disable=W0201
            self.model2 = lambda x: x**3  # pylint:disable=W0201
    
        def decode_request(self, request: SimpleLitRequest):  # type: ignore pylint:disable=W0221
            return request.input
    
        def predict(self, x: float):  # type: ignore pylint:disable=W0221
            squared = self.model1(x)
            cubed = self.model2(x)
            output = squared + cubed
            return {"output": output}
    
        def encode_response(self, output: Dict) -> SimpleLitResponse:  # type: ignore pylint:disable=W0221
            return SimpleLitResponse(output=output["output"])
    
    if __name__ == "__main__":
        api = SimpleLitAPI()
        cors_middleware = (
        CORSMiddleware,
        {
            "allow_origins": ["*"],  # Allows all origins
            "allow_methods": ["GET", "POST"],  # Allows GET and POST methods
            "allow_headers": ["*"],  # Allows all headers
        },
        )
        server = ls.LitServer(api, accelerator="auto", middlewares=[cors_middleware])
        server.run(port=8000)
    
  • Test the API

    import requests
    
    response = requests.post("<http://127.0.0.1:8000/predict>", json={"input": 4.0})
    print(f"Status: {response.status_code}\nResponse:\n {response.text}")
    
  • Enabling concurrency with async allows the server to handle multiple tasks seemlessly at the same time by switching between them without blocking the execution of other tasks. This is particularly useful for I/O-bound operations, such as making API calls or reading from a database, where the server can continue processing other requests while waiting for the I/O operation to complete.

    from typing import Dict
    
    import litserve as ls
    from pydantic import BaseModel as PydanticBaseModel
    from fastapi.middleware.cors import CORSMiddleware
    
    class BaseModel(PydanticBaseModel):
        class config:
            arbitrary_types_allowed = True
    
    class SimpleLitRequest(BaseModel):
        input: float
    
    class SimpleLitResponse(BaseModel):
        output: float
    
    class AsyncLitAPI(ls.LitAPI):
        def setup(self, device):
            self.model1 = lambda x: x**2  # pylint:disable=W0201
            self.model2 = lambda x: x**3  # pylint:disable=W0201
    
        async def decode_request(self, request: SimpleLitRequest):  # type: ignore pylint:disable=W0221
            return request.input
    
        async def predict(self, x: float):  # type: ignore pylint:disable=W0221
            squared = self.model1(x)
            cubed = self.model2(x)
            output = squared + cubed
            return {"output": output}
    
        async def encode_response(self, output: Dict) -> SimpleLitResponse:  # type: ignore pylint:disable=W0221
            return SimpleLitResponse(output=output["output"])
    
    if __name__ == "__main__":
        api = AsyncLitAPI(enable_async=True)
        cors_middleware = (
        CORSMiddleware,
        {
            "allow_origins": ["*"],  # Allows all origins
            "allow_methods": ["GET", "POST"],  # Allows GET and POST methods
            "allow_headers": ["*"],  # Allows all headers
        },
        )
        server = ls.LitServer(api, accelerator="auto", middlewares=[cors_middleware])
        server.run(port=8000)
    
    
  • Test the API

    import httpx
    import asyncio
    
    async def main():
        async with httpx.AsyncClient() as client:
            response = await client.post("<http://localhost:8000>", json={"input": 4})
            print(response.json())
    
    asyncio.run(main())
    
  • LitServe allows stream inference

    from typing import Dict
    
    import litserve as ls
    from pydantic import BaseModel as PydanticBaseModel
    from fastapi.middleware.cors import CORSMiddleware
    
    class BaseModel(PydanticBaseModel):
        class Config:
            arbitrary_types_allowed = True
    
    class SimpleLitRequest(BaseModel):
        input: float
    
    class SimpleLitResponse(BaseModel):
        output: float
    
    class AsyncLitAPI(ls.LitAPI):
        def setup(self, device):
            self.model1 = lambda x: x**2  # pylint:disable=W0201
            self.model2 = lambda x: x**3  # pylint:disable=W0201
    
        async def decode_request(self, request: SimpleLitRequest):  # type: ignore pylint:disable=W0221
            return request.input
    
        async def predict(self, x: float):  # type: ignore pylint:disable=W0221
            for i in range(10):
                squared = self.model1(x + i)
                cubed = self.model2(x + i)
                output = squared + cubed
                yield {"output": output}
    
        async def encode_response(self, output: Dict) -> SimpleLitResponse:  # type: ignore pylint:disable=W0221
            output = output[0]
            yield SimpleLitResponse(output=output["output"])
    
    if __name__ == "__main__":
        api = AsyncLitAPI(enable_async=True)
        cors_middleware = (
        CORSMiddleware,
        {
            "allow_origins": ["*"],  # Allows all origins
            "allow_methods": ["GET", "POST"],  # Allows GET and POST methods
            "allow_headers": ["*"],  # Allows all headers
        },
        )
        server = ls.LitServer(
            api, accelerator="auto", middlewares=[cors_middleware], stream=True
        )
        server.run(port=8000)
    
  • LitServe allow authentication and support FastAPI advanced custom authentication mechanisms, such as OAuth and HTTP Bearer.

  • Autoscaling can be enabled within a machine or across multiple machines.

      import litserve as ls
      if __name__ == "__main__":
          api = ls.test_examples.SimpleLitAPI()
          # When running on machine with 4 GPUs, these are equivalent
          # server = ls.LitServer(api)
          # server = ls.LitServer(api, devices="auto")
          server = ls.LitServer(api, devices=3)
          server.run(port=8000)
    
  • You can also scale the API server

    import litserve as ls
    
    if __name__ == "__main__":
        api = ls.test_examples.SimpleLitAPI()
        server = ls.LitServer(api, workers_per_device=2)
        # Run the server on port 8000 with 4 API servers running in separate processes
        server.run(port=8000, num_api_servers=4)
    
  • LitServe enables request batching, by combining multiple incoming requests into a single batch to improve throughput and reduce latency. This is particularly useful where minimzing latency per request is less critical than maximizing overall throughput.

    import numpy as np
    
    import litserve as ls
    
    class SimpleBatchedAPI(ls.LitAPI):
        def setup(self, device):
            self.model = lambda x: x ** 2
    
        def decode_request(self, request):
            return np.asarray(request["input"])
    
        def predict(self, x):
            result = self.model(x)
            return result
    
        def encode_response(self, output):
            return {"output": output}
    
    if __name__ == "__main__":
        api = SimpleBatchedAPI(max_batch_size=8, batch_timeout=0.05)
        server = ls.LitServer(api)
        server.run(port=8000)
    
  • LitServe allows dockerizing your server using the command

      litserve dockerize server.py --port 8000 --gpu
    

ESA WolrdCover Classification Model Deployment Using LitServe#

  • First, make a new directory for the project and navigate to it:

    mkdir eas_worldcover_litserve
    cd eas_worldcover_litserve
    
  • Create new Python files for the model, the configurations and the server:

    server.py

    from typing import Dict, List
    import torch
    import numpy as np
    import litserve as ls
    from fastapi.middleware.cors import CORSMiddleware
    from pydantic import BaseModel as PydanticBaseModel
    from model import UNet
    from config import IN_CHANNELS, OUT_CLASSES, MODEL_PATH
    
    class BaseModel(PydanticBaseModel):
        class Config:
            arbitrary_types_allowed = True
    
    class ESAWCRequest(BaseModel):
        image: List[List[List[float]]]
    
    class ESAWCResponse(BaseModel):
        output: List[List[int]]
    
    class ESAWorldCoverLitAPI(ls.LitAPI):
        def setup(self, device):
            self.model = UNet(  # pylint:disable=W0201
                in_channels=IN_CHANNELS, out_classes=OUT_CLASSES, dropout=0.0
            ).to(device)
            self.model.load_state_dict(torch.load(MODEL_PATH, weights_only=True,map_location="cpu"))
            self.model.eval()
            self.device = device
    
        def decode_request(self, request: ESAWCRequest):  # type: ignore pylint:disable=W0221,W0236
            image_array = np.array(request.image, dtype=np.float32)
            return image_array
    
        def predict(self, x: np.array):  # type: ignore pylint:disable=W0221,W0236
            x = x / 10000.0
            x = torch.from_numpy(x).float()
            x = x.clip(min=0.0, max=1.0)
            x = x.to(self.device)
            with torch.no_grad():
                prediction = self.model(x.unsqueeze(0))  # Add batch dimension
            mask = torch.argmax(prediction, dim=1)
            return {"output": mask.cpu().numpy()[0, ...].tolist()}
    
        def encode_response(self, output: Dict) -> ESAWCResponse:  # type: ignore pylint:disable=W0221,W0236
            return ESAWCResponse(output=output["output"])  # type: ignore
    
    if __name__ == "__main__":
        api = ESAWorldCoverLitAPI()
        cors_middleware = (
            CORSMiddleware,
            {
                "allow_origins": ["*"],  # Allows all origins
                "allow_methods": ["GET", "POST"],  # Allows GET and POST methods
                "allow_headers": ["*"],  # Allows all headers
            },
        )
        server = ls.LitServer(
            api,
            accelerator="auto",
            middlewares=[cors_middleware],
            devices=3,
        )
        server.run(port=8000)
    
  • Test locally

    lightning deploy server.py
    
  • Dockerize the server using the command:

    litserve dockerize server.py --port 8000
    
  • Create requirements.txt file with the following content:

    torch==2.7.0
    litserve==0.2.10
    python-multipart
    numpy==2.2.4
    
  • Update the Dockerfile to include the requirements file:

    ARG PYTHON_VERSION=3.12
    
    FROM python:$PYTHON_VERSION-slim
    
    ####### Add your own installation commands here #######
    
    # RUN pip install some-package
    
    # RUN wget <https://path/to/some/data/or/weights>
    
    # RUN apt-get update && apt-get install -y <package-name>
    
    WORKDIR /app
    COPY . /app
    
    # Install litserve and requirements
    
    RUN pip install --no-cache-dir -r requirements.txt
    EXPOSE 8000
    CMD ["python", "/app/server.py"]
    
  • Before building the image, make sure to comment devices=3 in the server.py file.

  • Build the Docker image:

    docker build -t eas_litserve .
    
  • Run the Docker container:

    docker run -p 8000:8000 eas_litserve
    
  • Test the API using the test_esa_litserve_api.ipynb

  • Tag and push the docker image to Docker Hub:

    docker tag eas_litserve albughdadim/eas_litserve:latest
    docker push albughdadim/eas_litserve:latest
    
  • Deploy to Kubernetes using the k8s/deployment.yml file

 1apiVersion: apps/v1
 2kind: Deployment
 3metadata:
 4  name: esa-litserve-deployment
 5  namespace: flask-app-ns
 6spec:
 7  replicas: 2
 8  selector:
 9    matchLabels:
10      app: esa-litserve-deployment
11  template:
12    metadata:
13      labels:
14        app: esa-litserve-deployment
15    spec:
16      containers:
17        - name: esa-litserve-container
18          image: albughdadim/esa_litserve:latest
19          ports:
20            - containerPort: 5000
  • Apply the deployment to your Kubernetes cluster:

    kubectl apply -f k8s/deployment.yml
    
  • Expose the deployment using a service

 1apiVersion: v1
 2kind: Service
 3metadata:
 4  name: esa-litserve-service
 5  namespace: flask-app-ns
 6spec:
 7  type: ClusterIP
 8  selector:
 9    app: esa-litserve-deployment
10  ports:
11    - protocol: TCP
12      port: 80
13      targetPort: 8000
  • Apply the service to your Kubernetes cluster:

    kubectl apply -f k8s/service.yml
    
  • Expose the service using an Ingress controller

 1apiVersion: networking.k8s.io/v1
 2kind: Ingress
 3metadata:
 4  name: esa-litserve-ingress
 5  namespace: flask-app-ns
 6  annotations:
 7    cert-manager.io/cluster-issuer: ecmwf-meditwin-issuer
 8    nginx.ingress.kubernetes.io/backend-protocol: HTTP
 9    nginx.ingress.kubernetes.io/ssl-redirect: "true"
10    nginx.ingress.kubernetes.io/proxy-body-size: "500m"
11    external-dns.alpha.kubernetes.io/hostname: molitserve.internal.meditwin-project.eu
12spec:
13  rules:
14    - host: molitserve.internal.meditwin-project.eu
15      http:
16        paths:
17          - path: /
18            pathType: Prefix
19            backend:
20              service:
21                name: esa-litserve-service
22                port:
23                  number: 80
24  tls:
25    - hosts:
26        - molitserve.internal.meditwin-project.eu
27      secretName: litserve-tls
  • Apply the ingress to your Kubernetes cluster:

    kubectl apply -f k8s/ingress.yml
    
  • Make sure to use your own namespace and host in the k8s yaml files.

  • Test the API using the test_esa_litserve_api.ipynb file by changing the URL to the Ingress URL.