Process Substitution Without Shell?

post by jefftk (jkaufman) · 2023-11-29T03:20:06.922Z · LW · GW · 18 comments

While I still write a decent amount of shell I generally try to avoid it. It's hard for others to read, has a lot of sharp edges, tends to swallow errors, and handles the unusual situations poorly. But one thing that keeps me coming back to it is how easily I can set up trees of processes.

Say I have a program that reads two files together in a single pass [1] and writes something out. The inputs you have are compressed, so you'll need to decompress them, and the output needs to be compressed before you write it out to storage. You could do:

# download the files
aws s3 cp "$path1" .
aws s3 cp "$path2" .

# decompress the files
gunzip "$file1"
gunzip "$file2"

# run the command
cmd -1 "$file1" -2 "$file2" > "$fileOut"

# compress the output
gzip "$fileOut"

# upload the output
aws s3 cp "$fileOut.gz" "$pathOut"

This works, but for large files it's slow and needs too much space. We're waiting for each step to finish before starting the next, and we're storing some very large intermediate files on the local machine.

Instead, we'd like to stream the inputs down, decompress them as we go, compress the output as it comes out, and stream the output back up. In bash this is reasonably straightforward to write:

cmd -1 <(aws s3 cp "$path1" - | gunzip) \
    -2 <(aws s3 cp "$path2" - | gunzip) \
  | gzip | aws s3 cp - "$pathOut"

This uses almost no disk space and it parallelizes the decompression, command, and recompression. But it's also shell...

I tend to use python for this kind of thing, where I'm gluing things together and want it to be clear what I'm doing. It seems like it should be possible to do this sort of thing with the subprocess module, but while I've played with it a few times I haven't figured it out. I'd like an API like:

pipeline = subprocess.Pipeline()
dl1 = pipeline.process(
    ["aws", "s3", "cp", path1, "-"])
gunzip1 = pipeline.process(
    ["gunzip"], stdin=dl1.stdout)
dl2 = pipeline.process(
    ["aws", "s3", "cp", path2, "-"])
gunzip2 = pipeline.process(
    ["gunzip"], stdin=dl2.stdout)
cmd = pipeline.process(
    ["cmd", "-1", dl1.stdout,
            "-2", dl2.stdout])
gzip = pipeline.process(
    ["gzip"], stdin=cmd.stdout)
pipeline.process(
    ["aws", "s3", "cp", "-", pathOut],
    stdin=gzip.stdout)
pipeline.check_call()

Or:

from subprocess import check_call, PIPE, InputFile

check_call([
  "cmd",
  "-1", InputFile([
          "aws", "s3", "cp", path1, "-",
          PIPE, "gunzip"]),
  "-2", InputFile([
          "aws", "s3", "cp", path2, "-",
          PIPE, "gunzip"]),
  PIPE, "gzip",
  PIPE, "aws", "s3", "cp", "-", pathOut])

These are 5x and 3x the length of the bash version, but I'd be willing to put up with that for having something that's more robust. The difference would also be smaller in practice as the commands would typically have a lot of arguments.

I see these stack overflow answers suggesting named pipes, but they seem awkward, hard to read, and easy to get wrong. Is there a better way? Should I just stick with bash when doing something bash is this good a fit for, especially now that people can paste my code into an LLM and get an explanation of what it's doing?


[1] Interleaved fastq files, where the Nth record in file 1 corresponds to the Nth record in file 2.

18 comments

Comments sorted by top scores.

comment by Nathan Helm-Burger (nathan-helm-burger) · 2023-11-29T03:50:21.921Z · LW(p) · GW(p)

In my past job experience there has just always been a small handful of tasks that get left up to linux shell no matter what the rest of the codebase is written in. It's just a lot more convenient for certain things.

comment by mishka · 2023-11-29T05:38:00.981Z · LW(p) · GW(p)

I got curious whether I can get a good answer from GPT-4. Not a "clean, ready-to-use" answer, but something which would point me in the right direction

Generally, I was having a good experience getting software help out of it. For example, I wanted to understand how to transform nested dictionaries in immutable fashion, so that JAX autodiff would be able to take gradients through the code, and after exchanging 4 questions and answers I got a good idea how to do it.

Here is that dialog (the first answer it gave was nearly useless, but it got much better with further conversation): https://chat.openai.com/share/86553014-a143-4b3c-8602-7d9b5e017343

So, I asked GPT-4 for the initial suggestions here: https://chat.openai.com/share/30dd8073-b85f-4a43-a298-a5a0c2169433.

Unfortunately, I didn't know enough to evaluate if it started to move in the right direction. But it did suggest something different from what's in the post, namely using Popen class from subprocess module. I would be curious to know if this seems to be moving in the right direction...

Replies from: jkaufman
comment by jefftk (jkaufman) · 2023-11-29T10:24:43.218Z · LW(p) · GW(p)

GPT 4 in your conversation is concatenating the two downloads and providing them to cmd on it's stdin, unfortunately.

(That was also my experience with GPT 3.5)

Replies from: mishka
comment by mishka · 2023-11-29T15:19:36.203Z · LW(p) · GW(p)

Yes, so, just like in my example, the first round of conversation is nearly useless. Let's see, if the second iteration is any better (it does turn to named pipes):

ME (without triple quotes, but here I am including them for better formatting):

Unfortunately, this is concatenating the two downloads and providing them to cmd on it's stdin

    # Run the command with the output of the two gunzip processes as inputs
    cmd = subprocess.Popen(["cmd", "-1", "-", "-2", "-"],
                           stdin=subprocess.PIPE, stdout=subprocess.PIPE)

What we'd like to happen instead is to process those two downloads by cmd in parallel.

GPT-4:

To process two downloads in parallel and provide them as separate inputs to a command using Python's subprocess module, you can employ named pipes (FIFOs). Named pipes can be created using the os.mkfifo method, which allows two independent data streams for the two downloads.

Here's a revised version of the script that uses named pipes for parallel processing:

import subprocess
import os
import threading

def stream_to_fifo(fifo_name, command):
    with subprocess.Popen(command, stdout=subprocess.PIPE) as process:
        with open(fifo_name, 'wb', 0) as fifo:
            for line in process.stdout:
                fifo.write(line)

def run_pipeline(path1, path2, pathOut):
    fifo1 = "/tmp/fifo1"
    fifo2 = "/tmp/fifo2"

    # Create named pipes
    os.mkfifo(fifo1)
    os.mkfifo(fifo2)

    # Start the download and gunzip processes, streaming to FIFOs
    threading.Thread(target=stream_to_fifo, args=(fifo1, ["aws", "s3", "cp", path1, "-", "|", "gunzip"])).start()
    threading.Thread(target=stream_to_fifo, args=(fifo2, ["aws", "s3", "cp", path2, "-", "|", "gunzip"])).start()

    # Run the command with the FIFOs as inputs
    with open(fifo1, 'rb') as input1, open(fifo2, 'rb') as input2:
        cmd = subprocess.Popen(["cmd", "-1", "-", "-2", "-"], stdin=subprocess.PIPE, stdout=subprocess.PIPE)

        # Create threads to avoid blocking
        def stream_to_cmd(input_stream):
            for line in input_stream:
                cmd.stdin.write(line)
            input_stream.close()

        threading.Thread(target=stream_to_cmd, args=(input1,)).start()
        threading.Thread(target=stream_to_cmd, args=(input2,)).start()

        # Close cmd's stdin after the inputs are done
        cmd.stdin.close()

        # Pipe cmd's output to gzip and then upload
        gzip = subprocess.Popen(["gzip"], stdin=cmd.stdout, stdout=subprocess.PIPE)
        subprocess.run(["aws", "s3", "cp", "-", pathOut], stdin=gzip.stdout)

    # Cleanup
    os.remove(fifo1)
    os.remove(fifo2)

# Example usage
run_pipeline("s3://path/to/file1", "s3://path/to/file2", "s3://path/to/output")

This script creates two named pipes and starts two threads to handle the downloads and the decompression. The outputs of these processes are directed to the named pipes. The main command reads from these pipes. The use of threads ensures that the reading and writing to the command's stdin do not block each other.

This approach is more complex than the initial script but allows the parallel processing of the two data streams as required. As before, error handling and cleanup are simplified for demonstration purposes and should be robustly implemented for production use.

Replies from: jkaufman, mishka
comment by jefftk (jkaufman) · 2023-11-29T18:22:53.995Z · LW(p) · GW(p)

Unfortunately while this starts off looking promising it's missing key steps:

cmd = subprocess.Popen(["cmd", "-1", "-", "-2", "-"], stdin=subprocess.PIPE, stdout=subprocess.PIPE)

It's still trying to stream the input files to cmd's standard input, instead of providing cmd with two separate filenames which when read stream their respective inputs.

Replies from: mishka
comment by mishka · 2023-11-29T19:06:24.061Z · LW(p) · GW(p)

Let's do another iteration with your comment as the prompt:

GPT-4:

To correctly handle the situation where cmd expects filenames as arguments and those files are actually streams of data from subprocesses, you can use the /dev/fd filesystem on Unix-like systems. This special filesystem allows you to treat file descriptors as file paths, which is essentially what Bash's process substitution does.

Here's how you can modify the Python script to achieve this:

  1. Use subprocesses to handle the download and decompression, just like before.
  2. Instead of writing to named pipes, pass the file descriptors of these subprocesses directly to cmd.

Here's the revised script:

import subprocess
import os

def run_pipeline(path1, path2, pathOut):
    # Start downloading the first file and pipe it to gunzip
    dl1 = subprocess.Popen(["aws", "s3", "cp", path1, "-"], stdout=subprocess.PIPE)
    gunzip1 = subprocess.Popen(["gunzip"], stdin=dl1.stdout, stdout=subprocess.PIPE)

    # Start downloading the second file and pipe it to gunzip
    dl2 = subprocess.Popen(["aws", "s3", "cp", path2, "-"], stdout=subprocess.PIPE)
    gunzip2 = subprocess.Popen(["gunzip"], stdin=dl2.stdout, stdout=subprocess.PIPE)

    # Wait for the pipes to be ready
    dl1.stdout.close()
    dl2.stdout.close()

    # Run the command with file descriptors
    cmd = subprocess.Popen(["cmd", "-1", f"/dev/fd/{gunzip1.stdout.fileno()}", "-2", f"/dev/fd/{gunzip2.stdout.fileno()}"], stdout=subprocess.PIPE)

    # Pipe cmd's output to gzip and then upload
    gzip = subprocess.Popen(["gzip"], stdin=cmd.stdout, stdout=subprocess.PIPE)
    subprocess.run(["aws", "s3", "cp", "-", pathOut], stdin=gzip.stdout)

    # Cleanup
    gunzip1.stdout.close()
    gunzip2.stdout.close()
    gzip.stdout.close()

# Example usage
run_pipeline("s3://path/to/file1", "s3://path/to/file2", "s3://path/to/output")

In this script:

* `gunzip1.stdout.fileno()` and `gunzip2.stdout.fileno()` give the file descriptors of the gunzip processes.
* These descriptors are converted to file paths using `/dev/fd/`.
* `cmd` is then invoked with these file paths as arguments, making it read from the streams.

This approach replicates the behavior of Bash's process substitution in Python, allowing cmd to read from the streams as if they were files. Note that this method is specific to Unix-like systems that support the /dev/fd filesystem.

Replies from: jkaufman
comment by jefftk (jkaufman) · 2023-11-29T20:30:30.946Z · LW(p) · GW(p)

Thanks! I haven't tested this, but reading through it seems plausible. This is the first GPT response I've seen where it "understood" that the command needed its inputs as two file arguments.

(Though all of my playing was with free 3.5)

Replies from: mishka
comment by mishka · 2023-11-29T20:42:40.679Z · LW(p) · GW(p)

Great!

Yes, it often requires a conversation; taking a few turns to discuss the situation and create enough context for it...

The difference between GPT-4 and GPT-3.5 is rather drastic. GPT-4 might still be the only "next generation" system available for public use in the world today (although, to be fair, I have not explored new Anthropic models), and I think it is the first system which often seems to actually demonstrate "true understanding", whatever the nature of that phenomenon might be.

comment by mishka · 2023-11-29T15:28:23.152Z · LW(p) · GW(p)

(But, of course, if one would really like an API like pipeline = subprocess.Pipeline() then perhaps it is possible to implement a Pipeline class which would do exactly what you describe in the example.

Or, perhaps, a Pythonic wrapper which has a reasonable user-facing syntax, but converts it into the shell call above, would be even better...)

comment by philh · 2023-12-05T16:41:23.392Z · LW(p) · GW(p)

I'm curious how bash does it. Suppose you do cat <(cat), this gives you

  • One cat process with stdout writing to a pipe
  • One cat /proc/self/fd/11 process with file descriptors 3 and 11 both reading from this pipe.

So if I had to guess: bash calls pipe() to get two ends of a pipe. Let's say these have file descriptors 10 and 11. Then it forks twice, which copies the list of file descriptors. We now have three processes with file descriptors 10 (open for writing) and 11 (open for reading). Parent process closes both ends, one child closes the read end, one child closes the write end.

The writing child process uses dup2() or something to set its stdout to be the pipe, then calls exec("cat", []) or whatever.

The reading child process has open file descriptor 11, and can access it as a file at /proc/self/fd/11. So it calls exec("cat", ["/proc/self/fd/11"]). The process which is now cat still has that open file descriptor, but it doesn't know that, it just knows it's been given a path; it opens that path and gets file descriptor 3, also reading from that pipe.

So if you want to duplicate this in python:

  • You'll need to figure out how to access the pipe() function.
  • Do the various subprocess libraries close file descriptors? If not, you can maybe just pass them /proc/self/fd/(fileno) as the path to read/write. If they do, you might need to do the fork and exec manually?
  • Maybe you also need to figure out "use this specific file descriptor for the stdin/stdout of this process"? Not sure. At minimum a fork/exec would work if you do need to do this, but maybe there's something better.

None of this is very high confidence.

comment by faul_sname · 2023-11-29T05:24:39.832Z · LW(p) · GW(p)

I find your bash examples to be much more readable than your Python ones. My general rule of thumb for when to switch from shell to Python is "when I find myself needing to do nontrivial control flow". Even then, it is perfectly legitimate to extract a single self-contained bit of shell that involves lots of stream management into its own perform_specific_operation.sh and invoke that from a Python wrapper program that handles control flow. Just be sure you're handling quoting properly, which in practice just means "you should always pass a list as the first argument to subprocess.Popen(), never a concatenated string".

Replies from: jkaufman
comment by jefftk (jkaufman) · 2023-11-29T13:41:25.896Z · LW(p) · GW(p)

perform_specific_operation.sh is essentially what I did (compute-alignments.sh) though it has more control flow than I'd like because it needs to branch to handle 1- and 2-input cases. And since I use a string to build up CMD it now means all of the arguments need to be well behaved.

Replies from: faul_sname
comment by faul_sname · 2023-11-29T20:03:09.635Z · LW(p) · GW(p)

And since I use a string to build up CMD it now means all of the arguments need to be well behaved.

I'm not entirely sure what you mean by that -- it looks like you're already using arrays instead of string concatenation to construct your command on the python side and properly quoting shell args on the bash side, so I wouldn't expect you to run into any quoting issues.

Replies from: jkaufman
comment by jefftk (jkaufman) · 2023-11-29T20:37:23.464Z · LW(p) · GW(p)

I have CMD+=" -S $1", so if $1 has any spaces in it the parsing will be wrong.

Now, I know this about my script and will be careful not to do that, but it's still a risk.

Replies from: faul_sname
comment by faul_sname · 2023-11-29T20:50:30.222Z · LW(p) · GW(p)

Ah, I see.

It may be worthwhile to instead define CMD as an array, rather than a string, like this

CMD=("do_some_thing")
if [[ "$USE_S_FLAG" == 1 ]]; then
    CMD+=("-S", "$1")
fi
"${CMD[@]}" "the" "rest" "of" "your" "args"

Of course at that point you're losing some of the readability benefits of using bash in the first place...

Edit: or, of course, you keep the script simple and readable at the cost of some duplication, e.g.

if [[ "$USE_S_FLAG" == 1 ]]; then
   "$CMD" -S "$1" "the" "rest" "of" "your" "args"
else
   "$CMD" "the" "rest" "of" "your" "args"
fi
comment by bismuth · 2023-11-29T19:53:42.426Z · LW(p) · GW(p)

Looks like there are two different problems here:

  • How to use process substitution in Python
  • How to write pipelines in a more compact way like in the shell

For the first one, I would rewrite your shell code as follows:

from subprocess import Popen, PIPE
dl1 = Popen(["aws", "s3", "cp", path1, "-"], stdout=PIPE)
gunzip1 = Popen(["gunzip"], stdin=dl1.stdout, stdout=PIPE)
gunzip1_fd = gunzip1.stdout.fileno()
dl2 = Popen(["aws", "s3", "cp", path2, "-"], stdout=PIPE)
gunzip2 = Popen(["gunzip"], stdin=dl2.stdout, stdout=PIPE)
gunzip2_fd = gunzip2.stdout.fileno()
cmd = Popen(["cmd", "-1", f"/dev/fd/{gunzip1_fd}", "-2", f"/dev/fd/{gunzip2_fd}"],
    stdout=PIPE, pass_fds=[gunzip1_fd, gunzip2_fd])
gzip = Popen(["gzip"], stdin=cmd.stdout)
upload = Popen(["aws", "s3", "cp", "-", pathOut], stdin=gzip.stdout)
for p in dl1, gunzip1, dl2, gunzip2, cmd, gzip:
   p.stdout.close()
outs, errs = upload.communicate()

This assumes /dev/fd support on your platform. Shells fall back to named pipes on platforms where /dev/fd is not present, but I haven't reproduced that behavior here.

For expressing pipelines in a more compact and intuitive way, PyPI has packages like sh or plumbum. While they don't seem to support process substitution out of the box, it looks like it wouldn't be too hard to extend them.

comment by sflicht · 2023-11-29T14:47:35.475Z · LW(p) · GW(p)

I agree with @faul_sname [LW · GW] that the bash is more readable.

But maybe a better (more readable/maintainable) Python alternative is to explicitly use Amazon's Python API for S3 downloads? I've never used it myself, but googling suggests:

import json
import boto3
from io import BytesIO
import gzip

try:
     s3 = boto3.resource('s3')
     key='YOUR_FILE_NAME.gz'
     obj = s3.Object('YOUR_BUCKET_NAME',key)
     n = obj.get()['Body'].read()
     gzipfile = BytesIO(n)
     gzipfile = gzip.GzipFile(fileobj=gzipfile)
     content = gzipfile.read()
     print(content)
except Exception as e:
    print(e)
    raise e

You could wrap that in a function to parallelize the download/decompression of path1 and path2 (using your favorite python parallelization paradigm). But this wouldn't handle piping the decompressed files to cmd without using temp files...

Replies from: jkaufman
comment by jefftk (jkaufman) · 2023-11-29T18:19:15.057Z · LW(p) · GW(p)

I don't see how that solves any of the problems I have here?