Baru-baru ini saya sedang mengerjakan tugas yang melibatkan membaca dan memproses csv besar (berukuran ~3GB) dan mengunggah sekumpulan baris ke AWS SQS. Menjadi cukup nyaman dengan Node dan cara kerjanya, saya tahu bahwa membaca file sebesar itu secara langsung menggunakan fs.readFile() tidak akan berfungsi dan saya membutuhkan sesuatu yang lain. Pengejaran untuk mencari tahu solusinya adalah tentang apa postingan ini
Sebelum berbicara tentang solusinya, mari kita lihat cara yang salah tentang ini
Jalan yang salah
Solusi di atas akan bekerja dengan file kecil hingga beberapa MB tetapi tidak lebih jauh karena fs.readFile akan memuat seluruh file dalam memori. Sekarang secara umum Node akan macet ketika penggunaan memori Anda melampaui batas tertentu, yang melihat komunitas masalah seperti itu, sekitar 1. 4GB. Tentu, Anda pasti dapat meningkatkan batas itu dengan meneruskan flag baris perintah --max-old-space-size saat memulai proses node tetapi itu bukan cara yang efisien untuk memperbaikinya
Cara efisien (dan alat yang tepat) untuk masalah khusus ini adalah Node.js. js Stream
Memanfaatkan aliran Node
Sekarang kita telah melihat contoh aliran, mari kita lihat bagaimana kita dapat memproses file besar untuk kasus penggunaan tertentu yang terdiri dari langkah-langkah berbeda. Dalam skenario khusus saya, saya harus melakukan hal berikut
- Baca baris dari csv besar
- Kirim setiap baris sebagai pesan antrean AWS SQS
Tujuan yang lebih besar dari proses di atas adalah untuk membaca setiap baris dari input file csv dan mengirim pesan yang terdiri dari data dari baris tersebut ke antrian SQS
Menggunakan aliran secara tidak benar
Jadi mengapa solusi di atas salah?
Menjalankan kode di atas pada file dengan jutaan baris pada akhirnya akan menghentikan proses Node meskipun kami menggunakan aliran untuk membaca kontennya. Alasannya adalah - setelah file dibaca dan satu baris diuraikan, kami mengirim pesan ke AWS SQS yang merupakan operasi asinkron. Ini berarti bahwa setiap permintaan jaringan dapat mengambil waktu manisnya sendiri untuk dieksekusi
Aliran yang membaca file csv tidak mengetahui hal ini dan terus membaca baris secepat mungkin. Sekarang pertimbangkan ratusan baris sedang dibaca dan diurai dengan sangat cepat oleh data yang masuk dari aliran pembacaan, ini berarti ratusan permintaan jaringan dihasilkan secara bersamaan. Mengingat kami memiliki bandwidth terbatas, tidak semua permintaan segera selesai dan loop acara Node harus menyimpan setiap detail permintaan di memori hingga selesai
Saat permintaan mulai tersumbat, Node pada akhirnya akan macet ketika kita mencapai batas memori. Jadi mari kita perbaiki ini
Jalan yang benar
Jadi sebelum melihat solusi yang mungkin, mari pikirkan masalah yang ingin kita selesaikan. Meskipun kami menggunakan aliran untuk membaca dari file csv, kami tidak dapat menginstruksikan atau memberi tahu aliran itu bahwa konsumen di sisi lain lambat dan perlu melambat dan menjeda ketika situasi ini terjadi. Jadi kami memerlukan beberapa API dalam modul aliran Node yang memungkinkan kami melakukan ini
API tersebut adalah stream.pipe
Secara umum, Anda dapat menggunakan pipe() pada aliran yang dapat dibaca. Contoh aliran yang dapat dibaca dapat sebagai berikut
- Membaca file
- Menjalankan kueri pemilihan pada database
- Tanggapan HTTP
- Lebih banyak lagi
Piping adalah mekanisme di mana kami menyediakan output dari satu aliran sebagai input ke aliran lain. Mari perbaiki contoh kita menggunakan stream.pipe()
pipe dalam terminologi UNIX/Linux dasar adalah perintah yang memungkinkan Anda mentransfer keluaran dari satu perintah sebagai masukan ke perintah lain. Di Node. js, kita dapat mengartikannya sebagai kemampuan untuk mentransfer output dari aliran yang dapat dibaca ke aliran yang dapat dibaca/ditulis lainnya
Melihat kode di atas Anda mungkin memiliki pertanyaan mengapa kita harus membuat file terpisah dengan beberapa sintaks kelas aneh yang memperluas beberapa kelas stream.Transform aneh lainnya dan semua itu. Tenang saya mendapat penjelasan yang akan kita bahas sedikit
Dalam kode di atas, keajaiban terjadi di let readable = fs.createReadStream('large.csv'); readable .pipe(streamA()) .pipe(streamB()) .pipe(streamC()) .pipe(streamD()) .pipe(streamE()) 0 baris di let readable = fs.createReadStream('large.csv'); readable .pipe(streamA()) .pipe(streamB()) .pipe(streamC()) .pipe(streamD()) .pipe(streamE()) 1. Di sini kami meneruskan data yang diterima dari aliran yang dapat dibaca i. e. aliran yang membaca file csv kami dalam potongan dan meneruskan potongan tersebut ke aliran lain i. e. let readable = fs.createReadStream('large.csv'); readable .pipe(streamA()) .pipe(streamB()) .pipe(streamC()) .pipe(streamD()) .pipe(streamE()) 2 kelas yang telah kita kodekan dengan tangan. Efek menggunakan pipa di sini adalah bahwa sekarang aliran yang dapat dibaca kita cerdas dan mengetahui apakah aliran yang menyalurkan data melambat atau bekerja secara normal dan karenanya berhenti/melanjutkan membaca dari file csv sendiri tanpa kita menulis satu pun
Memahami kebutuhan menulis aliran Transform kustom
Sekarang kembali ke sintaks khusus yang aneh di kelas PushToSqs. Setiap kali kita perlu menyalurkan aliran yang dapat dibaca ke aliran lain mana pun, aliran lain itu harus berupa aliran yang dapat dibaca atau aliran yang dapat ditulis atau aliran transformasi atau aliran dupleks. Ini pada dasarnya berarti kita tidak dapat memiliki fungsi acak yang sewenang-wenang dalam argumen ke fungsi pipe di atas. Kami hanya perlu memasok salah satu aliran yang saya sebutkan sebelumnya
const fs = require('fs');const zlib = require('zlib');
let readable = fs.createReadStream('large.csv');
readable
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('large.csv.gz'));
Anda pasti pernah melihat contoh seperti di atas melalui internet di mana orang menggunakan berbagai perpustakaan pihak ke-3 dan menyelesaikan pekerjaannya. Jadi mengapa kita tidak dapat menggunakan perpustakaan populer seperti itu dan menyelesaikan pekerjaan tanpa harus repot membuat aliran kita sendiri?
Melihat contoh di atas, yang dilakukan adalah - baca file csv besar, kompres dan tulis kembali ke disk, semuanya menggunakan aliran. Paket pihak ke-3 zlib menyediakan logika kompresi dan paket inbuilt fs melakukan tugas menulis file ke disk. Sekarang mari kita fokus pada pernyataan masalah yang kita lihat sebelumnya di awal artikel ini - baca csv dan setiap baris harus dikirim sebagai pesan ke sqs
Saya tidak menemukan modul pihak ke-3 dengan jenis logika bisnis khusus yang sesuai dengan kebutuhan saya dan karenanya perlu menulis kelas aliran khusus kami sendiri
Saya tidak akan merinci apa itu Transform stream tetapi saya hanya akan menambahkan beberapa detail agar mudah untuk memahami apa yang terjadi dalam kode. Aliran transformasi pada dasarnya digunakan untuk mengubah aliran byte dari aliran masuk dan mengirimkannya ke aliran berikutnya. Fungsi let readable = fs.createReadStream('large.csv'); readable .pipe(streamA()) .pipe(streamB()) .pipe(streamC()) .pipe(streamD()) .pipe(streamE()) 4 pada baris 134 di kelas let readable = fs.createReadStream('large.csv'); readable .pipe(streamA()) .pipe(streamB()) .pipe(streamC()) .pipe(streamD()) .pipe(streamE()) 2 dipanggil setiap kali menerima data dari upstream. Dalam hal ini, kami mengirimkan pesan ke AWS SQS dan kemudian memanggil panggilan balik let readable = fs.createReadStream('large.csv'); readable .pipe(streamA()) .pipe(streamB()) .pipe(streamC()) .pipe(streamD()) .pipe(streamE()) 6 yang memberi tahu Node bahwa potongan tertentu ini telah berhasil diproses dan kami siap memproses potongan data berikutnya
Penanganan kesalahan
Lihatlah kalimat ini dari Node. dokumentasi js untuk
Satu peringatan penting adalah jika aliran yang Dapat Dibaca mengeluarkan kesalahan selama pemrosesan, tujuan yang Dapat Ditulis tidak ditutup secara otomatis. Jika terjadi kesalahan, setiap aliran harus ditutup secara manual untuk mencegah kebocoran memori
Jadi apa artinya bagi solusi kami adalah jika karena suatu alasan atau lainnya aliran yang dapat dibaca kami yang membaca dari file csv memiliki beberapa kesalahan dan file tidak dapat dibaca lagi (bisa rusak byte, kesalahan pembacaan disk, dll) maka yang lain . Ini mungkin tampak tidak begitu serius tetapi misalkan kita memiliki banyak aliran yang disalurkan pada aliran yang dapat dibaca seperti di bawah ini
let readable = fs.createReadStream('large.csv'); readable .pipe(streamA()) .pipe(streamB()) .pipe(streamC()) .pipe(streamD()) .pipe(streamE())Semua aliran yang disalurkan ke rantai atau semacam akan tetap berjalan meskipun aliran yang dapat dibaca telah mengalami kesalahan. Idealnya, dalam hal ini, kita juga harus membiarkan semua aliran lain di telepon memiliki informasi ini bahwa telah terjadi kesalahan di hulu dan Anda harus keluar dengan anggun sehingga semuanya dibersihkan dan kami tidak mengalami kebocoran memori. Tapi saat menggunakan stream. pipa kita harus melakukannya sendiri dengan menulis penangan kesalahan yang tepat dan secara manual menutup aliran lain setiap kali terjadi kesalahan
Tetapi ada solusi yang lebih baik sejak Node. js 10. X -
Menggunakan let readable = fs.createReadStream('large.csv'); readable .pipe(streamA()) .pipe(streamB()) .pipe(streamC()) .pipe(streamD()) .pipe(streamE()) _7 menutup semua aliran jika terjadi kesalahan di salah satu aliran dalam rantai dan memungkinkan Anda untuk menjalankan logika penanganan kesalahan apa pun yang mungkin Anda inginkan untuk membersihkan sesuatu sebelum keluar, sehingga mencegah kebocoran memori
Kesimpulan
Hanya itu saja. Kami melihat cara yang benar dan salah dalam menangani file besar di Node. js dan membuat kelas aliran buatan tangan khusus yang sesuai dengan kebutuhan kita. Saya harap ini membantu seseorang di luar sana menghemat beberapa jam mencoba memproses file besar di Node