Streaming OpenAI data with Elixir

Parinda DardenDec 25, 20223 min read

OpenAI request usually takes more than a minute to get a response back. To solve timeout issue that we'll run into on most hosting services you'll need to use Elixir's Stream module.

For this example, we'll make a call to OpenAI's completion endpoint.

defmodule MyApp.OpenAI do
  @moduledoc """
  Handles OpenAI API request and response. 
  """

  @endpoint "api.openai.com"

  @doc"""
  Returns full response from OpenAI stream given the input and options.
  Takes the following options:
  - model
  - max_tokens
  - temperature
  - consume_func, function that processes streamed response
  """
  @spec generate_completion(String.t(), list()) :: {:ok, list()} | {:error, atom()}
  def generate_completion(_input, opts \\ [])
                                        
  def generate_completion("", _opts), do: {:error, :missing_input}
                                      
  def generate_completion(input, opts) do
    body =
      %{
        "model" => Keyword.get(opts, :model, "text-davinci-003"),
        "prompt" => input,
        "max_tokens" => Keyword.get(opts, :max_tokens, 250),
        "temperature" => Keyword.get(opts, :temperature, 0.7),
        "stream" => true
      }
      |> encode_body()

    url = url("/v1/completions")

    consume_func =
      Keyword.get(opts, :consume_func, fn
        {:status, value}, {_, headers, body} -> {value, headers, body}
        {:headers, value}, {status, headers, body} -> {status, headers ++ value, body}
        {:data, value}, {status, headers, body} -> {status, headers, [value | body]}
      end)

    Finch.build(:post, url, headers(), body)
    |> Finch.stream(MyApp.Finch, {nil, [], []}, consume_func)
    |> handle_stream(:generate_completion)
  end

  defp url(path) do
    "https://" <> @endpoint <> path
  end

  defp handle_stream({:ok, {200, _headers, body}}, _event) do
    {:ok, body |> Enum.reverse()}
  end

  defp handle_stream({:ok, {status, headers, body}}, event) do
    IO.inspect(status, label: "status")
    IO.inspect(headers, label: "headers")
    IO.inspect(body, label: "body")
    {:error, event}
  end

  defp handle_stream({:error, error}, event) do
    IO.inspect(error, label: "error")
    {:error, event}
  end

  defp encode_body(body), do: Jason.encode!(body)

  defp headers do
    [{"Authorization", "Bearer #{api_key()}"}, {"Content-Type", "application/json"}]
  end

  defp api_key do
    Application.get_env(:my_app, :envs)[:open_ai][:api_key]
  end
end

In `def generatecompletion`, the two important parts of the function are the stream option passed to the request, which will tell the API to send back partial progress keeping the connection open, and the `consumefunc` option that will parse through the streamed response.

Now, let's use this module to ask OpenAI a question!

opts = [
  consume_func: fn
    {:status, value}, {_, headers, body} ->
      {value, headers, body}

    {:headers, value}, {status, headers, body} ->
      {status, headers ++ value, body}

    {:data, value}, {status, headers, body} ->
      if value == "data: [DONE]\n\n" do
        {status, headers, body}
      else
        value =
          value
          |> String.replace_prefix("data: ", "")
          |> String.replace_suffix("\n\n", "")
          |> String.split("\n\ndata: ")
          |> Enum.reduce("", fn data, acc ->
            if is_nil(data) || data == "[DONE]" do
              acc
            else
              value = Jason.decode!(data)
              value = List.first(value["choices"], %{}) |> Map.get("text", "")
              acc <> value
            end
          end)

        {status, headers, [value | body]}
      end
  end
]

answer = case MyApp.OpenAI.generate_completion("How do I say hello in Thai?", opts) do
  {:ok, body} ->
    body |> Enum.join("")

  {:error, error} ->
    IO.inspect(error)
end

We're adding a bit more logic to parse data that's streamed from OpenAI. From the API documentation, the `stream` option will stream back the data as they become available, with the stream terminated by a `data: [DONE]` message. We're handling the end of the stream by returning the current `{status, headers, body}` tuple. The data returned from OpenAI will be a text that starts with `"data: "` and ends with `"\n\n"`, which will want to remove by using `|> String.replaceprefix("data: ", "")` and `|> String.replacesuffix("\n\n", "")`.

Sometimes the data will sneak in a few extra instances of `"\n\ndata: "`, so we have to handle this edge case with `|> String.split("\n\ndata: ")`. Because of this edge case, we need to check to make if the stream ended again, and make sure that the data is not empty. Once the data passes all the checks, it gets parsed and added to the previous parsed data.

The final answer from OpenAI completion endpoint should be `Sawasdee (สวัสดี)`.

Subscribe to the newsletter

The latest posts, sent to your inbox.

© 2020 - 2023 Parinda Darden