Go slowly   About  Contact  Archives

Flux<Databuffer> to InputStream

How can we convert a Flux<DataBuffer>, say, in a Spring’s FilePart.content() when uploading data, into a InputStream for consuming?

By using pipes!

InputStream getInputStreamFromFluxDataBuffer(Flux<DataBuffer> data) throws IOException {
    PipedOutputStream osPipe = new PipedOutputStream();
    PipedInputStream isPipe = new PipedInputStream(osPipe);

    DataBufferUtils.write(data, osPipe)
            .subscribeOn(Schedulers.elastic())
            .doOnComplete(() -> {
                try {
                    osPipe.close();
                } catch (IOException ignored) {
                }
            })
            .subscribe(DataBufferUtils.releaseConsumer());
    return isPipe;
}

The code is quite trivial, but some notes worth mentioning here:

Written on April 11, 2020.