I want to implement reactive write to file. In low level, it uses NIO and Futures. After reading this section I created this code but maybe there’s more clean or less code solution.
Thanks.
public class LocalImageService implements IImageService { private String imageDir = "/tmp"; @Override public Mono<String> storeImage(@NonNull InputStream imageStream, @NonNull String ext) { val processor = new ImageProcessor(); processor.fileName = UUID.randomUUID() + "." + ext; val targetFile = new File(imageDir, processor.fileName); final Flux<CompletableFuture<Integer>> bridge = Flux.push(sink -> { try { processor.sink = sink; processor.inputChannel = Channels.newChannel(imageStream); processor.fileChannel = AsynchronousFileChannel.open(targetFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE); processor.processNextChunk(); } catch (IOException e) { processor.onError(new RuntimeException("Can not write image file: " + processor.fileName, e)); } }); return bridge.flatMap(Mono::fromFuture) .reduce(0, (acc, num) -> { processor.buffer.compact(); processor.processNextChunk(); return acc + num; }) .then(Mono.just(targetFile.getPath())); } private static final class ImageProcessor { int position = 0; ReadableByteChannel inputChannel; AsynchronousFileChannel fileChannel; FluxSink<CompletableFuture<Integer>> sink; final ByteBuffer buffer = ByteBuffer.allocate(4096); String fileName; void onComplete() { closeStreams(); if (null != sink) { sink.complete(); } } private void closeStreams() { IOUtils.closeQuietly(inputChannel); IOUtils.closeQuietly(fileChannel); } void onError(Throwable e) { closeStreams(); if (null != sink) { sink.error(e); } } void processNextChunk() { try { int receivedBytes; if ((receivedBytes = inputChannel.read(buffer)) >= 0 || buffer.position() > 0) { buffer.flip(); val lastWrite = fileChannel.write(buffer, position); position += receivedBytes; if (lastWrite != null) { sink.next(CompletableFuture.supplyAsync(() -> { try { return lastWrite.get(); } catch (InterruptedException | ExecutionException e) { onError(new RuntimeException("Can not write image file: " + fileName, e)); return -1; } })); } } else { onComplete(); } } catch (IOException e) { onError(new RuntimeException("Can not write image file: " + fileName, e)); } } } }