Task.Supervisor.async_stream

You're seeing just the function async_stream, go back to Task.Supervisor module for more information.
Link to this function

async_stream(supervisor, enumerable, fun, options \\ [])

View Source (since 1.4.0)

Specs

async_stream(
  Supervisor.supervisor(),
  Enumerable.t(),
  (term() -> term()),
  keyword()
) :: Enumerable.t()

Returns a stream that runs the given function fun concurrently on each element in enumerable.

Each element in enumerable is passed as argument to the given function fun and processed by its own task. The tasks will be spawned under the given supervisor and linked to the current process, similarly to async/2.

See async_stream/6 for discussion, options, and examples.

Link to this function

async_stream(supervisor, enumerable, module, function, args, options \\ [])

View Source (since 1.4.0)

Specs

async_stream(
  Supervisor.supervisor(),
  Enumerable.t(),
  module(),
  atom(),
  [term()],
  keyword()
) :: Enumerable.t()

Returns a stream where the given function (module and function) is mapped concurrently on each element in enumerable.

Each element will be prepended to the given args and processed by its own task. The tasks will be spawned under the given supervisor and linked to the current process, similarly to async/4.

When streamed, each task will emit {:ok, value} upon successful completion or {:exit, reason} if the caller is trapping exits. The order of results depends on the value of the :ordered option.

The level of concurrency and the time tasks are allowed to run can be controlled via options (see the "Options" section below).

If you find yourself trapping exits to handle exits inside the async stream, consider using async_stream_nolink/6 to start tasks that are not linked to the calling process.

Options

  • :max_concurrency - sets the maximum number of tasks to run at the same time. Defaults to System.schedulers_online/0.

  • :ordered - whether the results should be returned in the same order as the input stream. This option is useful when you have large streams and don't want to buffer results before they are delivered. This is also useful when you're using the tasks for side effects. Defaults to true.

  • :timeout - the maximum amount of time to wait (in milliseconds) without receiving a task reply (across all running tasks). Defaults to 5000.

  • :on_timeout - what do to when a task times out. The possible values are:

    • :exit (default) - the process that spawned the tasks exits.
    • :kill_task - the task that timed out is killed. The value emitted for that task is {:exit, :timeout}.
  • :shutdown - :brutal_kill if the tasks must be killed directly on shutdown or an integer indicating the timeout value. Defaults to 5000 milliseconds.

Examples

Let's build a stream and then enumerate it:

stream = Task.Supervisor.async_stream(MySupervisor, collection, Mod, :expensive_fun, [])
Enum.to_list(stream)