I have a scenario where I upload a file (csv or zip/gzip) to S3 then I start a backend job which
- Downloads the file from S3 as a stream
- If it is compressed then runs through a decompress step
- Reads the first line from the file and creates a DB table asynchronously in the meanwhile delays processing of the subsequent chunks until the table is created
- When the table is created copies the contents of the file to the table
- The progress reporter is piped as the last step which knows the original size of the file, and knows the length of every read chunk in bytes so it can calculate the progress
Pseudo:
loadRemoteFile()
.createReadStream()
.pipe(unzipper)
.pipe(dbWriter)
.pipe(progressReporter)
This works great for an uncompressed file because ahead-of-time I can query the size of the file with s3.headObject()
or fs.stat()
locally. It works fine because from the original file size I can create checkpoints (say report progress every 10% bytes read) accumulate bytes already read, when we reached a checkpoint we report. But it doesn't work for compressed files because the original size which I can only request is the size of the compressed file, but in the meanwhile I decompress it on the fly. So the reporter will know the size of the decompressed chunks, and these chunks accumulated gives a lot more size than the compressed file size.
Moving the progress reporter before the unzipper
could theoretically work, but since I have an asynchronous step (dbWriter) it is mandatory for me to report progress when I wrote something to the database. Otherwise I would report much faster, even I would report 100% before the table is even created.
via Jim-Y
No comments:
Post a Comment