Baca file csv besar di nodejs

Baru-baru ini saya sedang mengerjakan tugas yang melibatkan membaca dan memproses csv besar (berukuran ~3GB) dan mengunggah sekumpulan baris ke AWS SQS. Menjadi cukup nyaman dengan Node dan cara kerjanya, saya tahu bahwa membaca file sebesar itu secara langsung menggunakan fs.readFile() tidak akan berfungsi dan saya membutuhkan sesuatu yang lain. Pengejaran untuk mencari tahu solusinya adalah tentang apa postingan ini

Sebelum berbicara tentang solusinya, mari kita lihat cara yang salah tentang ini

Jalan yang salah

Solusi di atas akan bekerja dengan file kecil hingga beberapa MB tetapi tidak lebih jauh karena fs.readFile akan memuat seluruh file dalam memori. Sekarang secara umum Node akan macet ketika penggunaan memori Anda melampaui batas tertentu, yang melihat komunitas masalah seperti itu, sekitar 1. 4GB. Tentu, Anda pasti dapat meningkatkan batas itu dengan meneruskan flag baris perintah --max-old-space-size saat memulai proses node tetapi itu bukan cara yang efisien untuk memperbaikinya

Cara efisien (dan alat yang tepat) untuk masalah khusus ini adalah Node.js. js Stream

Memanfaatkan aliran Node

Sekarang kita telah melihat contoh aliran, mari kita lihat bagaimana kita dapat memproses file besar untuk kasus penggunaan tertentu yang terdiri dari langkah-langkah berbeda. Dalam skenario khusus saya, saya harus melakukan hal berikut

  1. Baca baris dari csv besar
  2. Kirim setiap baris sebagai pesan antrean AWS SQS

Tujuan yang lebih besar dari proses di atas adalah untuk membaca setiap baris dari input file csv dan mengirim pesan yang terdiri dari data dari baris tersebut ke antrian SQS

Menggunakan aliran secara tidak benar

Jadi mengapa solusi di atas salah?

Menjalankan kode di atas pada file dengan jutaan baris pada akhirnya akan menghentikan proses Node meskipun kami menggunakan aliran untuk membaca kontennya. Alasannya adalah - setelah file dibaca dan satu baris diuraikan, kami mengirim pesan ke AWS SQS yang merupakan operasi asinkron. Ini berarti bahwa setiap permintaan jaringan dapat mengambil waktu manisnya sendiri untuk dieksekusi

Aliran yang membaca file csv tidak mengetahui hal ini dan terus membaca baris secepat mungkin. Sekarang pertimbangkan ratusan baris sedang dibaca dan diurai dengan sangat cepat oleh data yang masuk dari aliran pembacaan, ini berarti ratusan permintaan jaringan dihasilkan secara bersamaan. Mengingat kami memiliki bandwidth terbatas, tidak semua permintaan segera selesai dan loop acara Node harus menyimpan setiap detail permintaan di memori hingga selesai

Saat permintaan mulai tersumbat, Node pada akhirnya akan macet ketika kita mencapai batas memori. Jadi mari kita perbaiki ini

Jalan yang benar

Jadi sebelum melihat solusi yang mungkin, mari pikirkan masalah yang ingin kita selesaikan. Meskipun kami menggunakan aliran untuk membaca dari file csv, kami tidak dapat menginstruksikan atau memberi tahu aliran itu bahwa konsumen di sisi lain lambat dan perlu melambat dan menjeda ketika situasi ini terjadi. Jadi kami memerlukan beberapa API dalam modul aliran Node yang memungkinkan kami melakukan ini

API tersebut adalah stream.pipe

Secara umum, Anda dapat menggunakan pipe() pada aliran yang dapat dibaca. Contoh aliran yang dapat dibaca dapat sebagai berikut

  • Membaca file
  • Menjalankan kueri pemilihan pada database
  • Tanggapan HTTP
  • Lebih banyak lagi

Piping adalah mekanisme di mana kami menyediakan output dari satu aliran sebagai input ke aliran lain. Mari perbaiki contoh kita menggunakan stream.pipe()

pipe dalam terminologi UNIX/Linux dasar adalah perintah yang memungkinkan Anda mentransfer keluaran dari satu perintah sebagai masukan ke perintah lain. Di Node. js, kita dapat mengartikannya sebagai kemampuan untuk mentransfer output dari aliran yang dapat dibaca ke aliran yang dapat dibaca/ditulis lainnya

Melihat kode di atas Anda mungkin memiliki pertanyaan mengapa kita harus membuat file terpisah dengan beberapa sintaks kelas aneh yang memperluas beberapa kelas stream.Transform aneh lainnya dan semua itu. Tenang saya mendapat penjelasan yang akan kita bahas sedikit

Dalam kode di atas, keajaiban terjadi di

let readable = fs.createReadStream('large.csv');

readable
    .pipe(streamA())
	.pipe(streamB())
	.pipe(streamC())
	.pipe(streamD())
	.pipe(streamE())
0 baris di
let readable = fs.createReadStream('large.csv');

readable
    .pipe(streamA())
	.pipe(streamB())
	.pipe(streamC())
	.pipe(streamD())
	.pipe(streamE())
1. Di sini kami meneruskan data yang diterima dari aliran yang dapat dibaca i. e. aliran yang membaca file csv kami dalam potongan dan meneruskan potongan tersebut ke aliran lain i. e.
let readable = fs.createReadStream('large.csv');

readable
    .pipe(streamA())
	.pipe(streamB())
	.pipe(streamC())
	.pipe(streamD())
	.pipe(streamE())
2 kelas yang telah kita kodekan dengan tangan. Efek menggunakan pipa di sini adalah bahwa sekarang aliran yang dapat dibaca kita cerdas dan mengetahui apakah aliran yang menyalurkan data melambat atau bekerja secara normal dan karenanya berhenti/melanjutkan membaca dari file csv sendiri tanpa kita menulis satu pun

Memahami kebutuhan menulis aliran Transform kustom

Sekarang kembali ke sintaks khusus yang aneh di kelas PushToSqs. Setiap kali kita perlu menyalurkan aliran yang dapat dibaca ke aliran lain mana pun, aliran lain itu harus berupa aliran yang dapat dibaca atau aliran yang dapat ditulis atau aliran transformasi atau aliran dupleks. Ini pada dasarnya berarti kita tidak dapat memiliki fungsi acak yang sewenang-wenang dalam argumen ke fungsi pipe di atas. Kami hanya perlu memasok salah satu aliran yang saya sebutkan sebelumnya

const fs = require('fs');
const zlib = require('zlib');
let readable = fs.createReadStream('large.csv');

readable
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('large.csv.gz'));

Anda pasti pernah melihat contoh seperti di atas melalui internet di mana orang menggunakan berbagai perpustakaan pihak ke-3 dan menyelesaikan pekerjaannya. Jadi mengapa kita tidak dapat menggunakan perpustakaan populer seperti itu dan menyelesaikan pekerjaan tanpa harus repot membuat aliran kita sendiri?

Melihat contoh di atas, yang dilakukan adalah - baca file csv besar, kompres dan tulis kembali ke disk, semuanya menggunakan aliran. Paket pihak ke-3 zlib menyediakan logika kompresi dan paket inbuilt fs melakukan tugas menulis file ke disk. Sekarang mari kita fokus pada pernyataan masalah yang kita lihat sebelumnya di awal artikel ini - baca csv dan setiap baris harus dikirim sebagai pesan ke sqs

Saya tidak menemukan modul pihak ke-3 dengan jenis logika bisnis khusus yang sesuai dengan kebutuhan saya dan karenanya perlu menulis kelas aliran khusus kami sendiri

Saya tidak akan merinci apa itu Transform stream tetapi saya hanya akan menambahkan beberapa detail agar mudah untuk memahami apa yang terjadi dalam kode. Aliran transformasi pada dasarnya digunakan untuk mengubah aliran byte dari aliran masuk dan mengirimkannya ke aliran berikutnya. Fungsi

let readable = fs.createReadStream('large.csv');

readable
    .pipe(streamA())
	.pipe(streamB())
	.pipe(streamC())
	.pipe(streamD())
	.pipe(streamE())
4 pada baris 134 di kelas
let readable = fs.createReadStream('large.csv');

readable
    .pipe(streamA())
	.pipe(streamB())
	.pipe(streamC())
	.pipe(streamD())
	.pipe(streamE())
2 dipanggil setiap kali menerima data dari upstream. Dalam hal ini, kami mengirimkan pesan ke AWS SQS dan kemudian memanggil panggilan balik
let readable = fs.createReadStream('large.csv');

readable
    .pipe(streamA())
	.pipe(streamB())
	.pipe(streamC())
	.pipe(streamD())
	.pipe(streamE())
6 yang memberi tahu Node bahwa potongan tertentu ini telah berhasil diproses dan kami siap memproses potongan data berikutnya

Penanganan kesalahan

Lihatlah kalimat ini dari Node. dokumentasi js untuk

Satu peringatan penting adalah jika aliran yang Dapat Dibaca mengeluarkan kesalahan selama pemrosesan, tujuan yang Dapat Ditulis tidak ditutup secara otomatis. Jika terjadi kesalahan, setiap aliran harus ditutup secara manual untuk mencegah kebocoran memori

Jadi apa artinya bagi solusi kami adalah jika karena suatu alasan atau lainnya aliran yang dapat dibaca kami yang membaca dari file csv memiliki beberapa kesalahan dan file tidak dapat dibaca lagi (bisa rusak byte, kesalahan pembacaan disk, dll) maka yang lain . Ini mungkin tampak tidak begitu serius tetapi misalkan kita memiliki banyak aliran yang disalurkan pada aliran yang dapat dibaca seperti di bawah ini

let readable = fs.createReadStream('large.csv');

readable
    .pipe(streamA())
	.pipe(streamB())
	.pipe(streamC())
	.pipe(streamD())
	.pipe(streamE())

Semua aliran yang disalurkan ke rantai atau semacam akan tetap berjalan meskipun aliran yang dapat dibaca telah mengalami kesalahan. Idealnya, dalam hal ini, kita juga harus membiarkan semua aliran lain di telepon memiliki informasi ini bahwa telah terjadi kesalahan di hulu dan Anda harus keluar dengan anggun sehingga semuanya dibersihkan dan kami tidak mengalami kebocoran memori. Tapi saat menggunakan stream. pipa kita harus melakukannya sendiri dengan menulis penangan kesalahan yang tepat dan secara manual menutup aliran lain setiap kali terjadi kesalahan

Tetapi ada solusi yang lebih baik sejak Node. js 10. X -

Menggunakan

let readable = fs.createReadStream('large.csv');

readable
    .pipe(streamA())
	.pipe(streamB())
	.pipe(streamC())
	.pipe(streamD())
	.pipe(streamE())
_7 menutup semua aliran jika terjadi kesalahan di salah satu aliran dalam rantai dan memungkinkan Anda untuk menjalankan logika penanganan kesalahan apa pun yang mungkin Anda inginkan untuk membersihkan sesuatu sebelum keluar, sehingga mencegah kebocoran memori

Kesimpulan

Hanya itu saja. Kami melihat cara yang benar dan salah dalam menangani file besar di Node. js dan membuat kelas aliran buatan tangan khusus yang sesuai dengan kebutuhan kita. Saya harap ini membantu seseorang di luar sana menghemat beberapa jam mencoba memproses file besar di Node

Bagaimana cara membaca file CSV besar di NodeJS?

Untuk tutorial ini, Anda akan menggunakan Node. js stream untuk memproses file CSV besar. .
Langkah 1. Membuat Node. Aplikasi js. .
Langkah 2. Menginstal Ketergantungan. Selanjutnya, instal paket fs dan readline. .
Langkah 3. Membaca File. .
Langkah 4. Mengurai File. .
Langkah 5. Mengeluarkan Data Parsing

Bagaimana cara membaca file CSV yang besar?

Jadi, bagaimana cara membuka file CSV besar di Excel? . Pisahkan file CSV menjadi beberapa file yang lebih kecil yang sesuai dengan batas 1.048.576 baris ; .

Metode mana yang Anda pilih untuk membaca file besar saat menggunakan NodeJS?

Yang paling mudah adalah fs. readFile() di mana, seluruh file dibaca ke dalam memori dan kemudian ditindaklanjuti setelah Node membacanya, dan opsi kedua adalah fs.

Bagaimana cara menggunakan penguraian CSV?

Karena format CSV yang sederhana, Anda dapat menguraikan file-file ini dengan hampir semua bahasa pemrograman. .
Buka file menggunakan jalur file lengkap. .
Baca catatan pertama dalam file dan muat catatan ke dalam variabel yang sesuai. .
Mulai loop yang berakhir saat file mencapai akhir