Julia Tasks 101
post by SatvikBeri · 2024-05-27T11:32:27.188Z · LW · GW · 0 commentsContents
Uses Use Threads.@spawn 90% of the Time Micromanaging with @task and Channels Warnings Recursively Spawned Tasks Distributed.@spawn Deprecated macro @async Threads.@threads Glossary None No comments
Tasks are how Julia handles parallelism & concurrency. Tasks are defined at the program level and Julia's scheduler maps them to hardware/OS threads.
Tasks have many names in other languages: "symmetric coroutines, lightweight threads, cooperative multitasking, or one-shot continuations". They're particularly similar to the coroutines used by Go and Cilk.
There's already a lot written on the details of tasks, so instead I'm going to focus on how to use them.
Uses
Use Threads.@spawn
% of the Time
The most common, recommended way to create Tasks is with Threads.@spawn
. You can use it with arbitrary Julia expressions to create and immediately schedule a Task that can be run on any thread – this gives us parallelism and concurrency.
using Base.Threads
@time begin
task1 = @spawn (println(threadid()); sleep(1))
task2 = @spawn (println(threadid()); sleep(1))
wait.([task1, task2])
end
> 2
3
1.014021 seconds (15.98 k allocations: 1.108 MiB, 1.79% compilation time)
Tasks are small, so we can create a lot of them:
@time @sync for i in 1:10_000
Threads.@spawn sleep(1)
end
> 1.046200 seconds (125.11 k allocations: 30.180 MiB, 13.47% compilation time)
(@sync
is a convenience macro that will wait
for all the created tasks to finish.)
@spawn
gives the Julia scheduler the most freedom. It's allowed to run the Task on any thread, use multiple threads, pause a Task, move a Task from one thread to another, and so on. The scheduler has a lot of information at runtime, so it can typically make pretty good decisions.
Consider this function hash_lots
. It spends ms sleeping and then about ms working. If we run it once, it takes about ms:
function hash_lots(x)
sleep(.075)
for i in 1:9_860_000
x = hash(x)
end
return x
end
@btime hash_lots(5)
> 150.162 ms (6 allocations: 144 bytes)
(@btime
is a benchmarking macro that runs a function many times to get a more accurate estimate of runtime.) We get a similar time if we run copies in parallel, where is the number of threads.
@btime begin
@sync for i in 1:nthreads()
@spawn hash_lots(i)
end
end
> 151.340 ms (189 allocations: 12.56 KiB)
Now let's say we run copies of this. How long would you expect it to take?
With no switching, each thread would process four copies of the function sequentially, and it would take about ~ms. With perfect switching, Julia would start the sleep
in each Task almost immediately, taking ms across all Tasks. Then the hashing would take ms of CPU time, and there are CPUs, for another ms. So our best possible time would be ms.
@btime begin
@sync for i in 1:(4*n)
@spawn hash_lots(i)
end
end
> 376.614 ms (720 allocations: 49.30 KiB)
ms is pretty good. We get almost all of the maximum possible benefit while using default settings. Most importantly, we didn't annotate the line sleep(.075)
in any way! We wrote hash_lots
as normal, synchronous code – we only needed to wrap it in a Task at the top level, and the scheduler took care of the rest. You can do this with any code, as long as its thread-safe.
Micromanaging with @task
and Channels
Ok, but what if your code isn't thread-safe? Or what if it's 99% thread-safe, but there's one part where your simultaneous Tasks write to a dict? We can control execution with the @task
macro.
t = @spawn println("Hola!")
is equivalent to the following:
t = @task println("Hola!")
t.sticky = false #defaults to true for historical reasons
schedule(t)
Going back to the dict example – if your worker Tasks write to the dict directly, you'll get a segfault. Instead, have one Task that writes directly to the dict, one element at a time, and use a Channel (a threadsafe queue) to communicate between your Tasks:
using Base.Threads
channel = Channel{Task}(Inf)
dict = Dict()
function make_worker(channel::Channel{Task}, dict::Dict)
worker_task = @spawn begin
while true
k, v = do_stuff()
update_task = @task dict[k] = v
put!(channel, update_task)
end
end
return worker_task
end
function make_consumer(channel::Channel{Task})
consumer_task = @spawn begin
while true
t = take!(channel)
schedule(t)
wait(t)
end
return consumer_task
end
workers = [make_worker(channel, dict) for i in 1:8]
consumer = make_consumer(channel)
Each worker will run the bulk of its work concurrently. Then instead of updating the dict directly, they create Tasks that get sent to the Channel. There's only one consumer reading from the Channel and updating the dict, and that consumer calls wait
after each task, so it will only run one update at a time, sequentially.
Warnings
Recursively Spawned Tasks
Julia has a relatively simple mark-sweep Garbage Collector. It's fast but can get confused in some cases, like recursively spawned tasks – it often isn't able to free memory until the entire stack is cleared.
So if you have a case that uses a lot of RAM, avoid having Tasks create other Tasks.
Distributed.@spawn
The Distributed
package also has a @spawn
macro, which is deprecated and shouldn't be used. So if you're using both packages, make sure to explicitly call Threads.@spawn
Deprecated macro @async
"for new code there is no reason to use @async
" - vchuravy
@async
is an earlier macro you might see in some old code. It's similar to @spawn
, but the spawned Task is "sticky", meaning it will only run on the same hardware thread as the code that calls it. In other words, @async
gives concurrency without parallelism.
Stickiness makes tasks a lot less composable, because a sticky Task will also limit its parent. A very low-level @async
can lead to surprising bad performance across an application. Furthermore, there's no performance gain from disabling parallelism – the overhead of @async
and @spawn
is the same.
Threads.@threads
@threads
has a similar issue – it only creates as many Tasks as there are threads, so it doesn't work well with Task switching. This might be what you want in some cases (e.g. to conserve memory), but most of the time you should use @spawn
@btime begin
Threads.@threads for i in 1:(4*n)
sleep(.1)
end
end
> 415.664 ms (212 allocations: 9.17 KiB)
Glossary
These terms are a bit scattered across the Julia documentation, so here's a list:
Task(my_function)
create a Task from a callable function with no arguments.@task
create a Task from an arbitrary Julia expressionschedule(task::Task)
scheduletask
to be runtask.sticky
if true,task
can only be run on the same hardware thread whereschedule
was called. If false, it can be assigned to any thread on the scheduler. The current recommendation is to use non-sticky tasks almost all the time, but tasks are sticky by default for historical reasons.Threads.@spawn
create and immediately schedule a non-sticky Task. This gives the Julia scheduler freedom to run the Task in the way that it thinks is optimal@async
(deprecated) create and immediately schedule a sticky taskThreads.@threads
(deprecated) run a for loop in parallelwait(task::Task)
waits for a task to completetask.result
once the task is done, contains the output. Containsnothing
otherwisefetch(task::Task)
:wait
for the task, then return its result value@sync
use this before an expression that creates multiple tasks, and it willwait
until all those tasks are done.Channel
"a waitable first-in first-out queue which can have multiple tasks reading from and writing to it". Channels are a robust way of communicating between tasks. If you're familiar with Go, you use Tasks and Channels in Julia the way you use Goroutines and Channels in Go.put!(channel::Channel, value)
appendvalue
to channel, blocking if it's fulltake!(channel::Channel)
return the next available value from channel, blocking if it's empty
0 comments
Comments sorted by top scores.