今の所、 15840
, 50Mb
の json ファイルの各エントリを書いて、読んで、消すのを、 86.7ms
でやれてるからいいかなって。5.5μs だとすると、
https://ryhl.io/blog/async-what-is-blocking/
の目安で言うと non-blocking
と見なしてもいいくらい。
To give a sense of scale of how much time is too much, a good rule of thumb is no more than 10 to 100 microseconds between each .await. That said, this depends on the kind of application you are writing.
もちろん、データが大きかったりするから実際も async 対応しますけど。
やり始めたら超厳密にやったほうがいいんだろうけど、数字で比較してるだけいいでしょ…。
下記が現状の比較結果:
Benchmark 1: ~/code/sandbox/databases/db_test_redis/target/release/db_test_redis Time (mean ± σ): 514.7 ms ± 2.2 ms [User: 180.8 ms, System: 738.8 ms] Range (min … max): 511.7 ms … 526.5 ms 50 runs Benchmark 2: ~/code/sandbox/databases/db_test_rocksdb/target/release/db_test_rocksdb Time (mean ± σ): 103.8 ms ± 2.6 ms [User: 192.6 ms, System: 569.2 ms] Range (min … max): 97.8 ms … 112.0 ms 50 runs Benchmark 3: ~/code/sandbox/databases/db_test_sqlite/target/release/db_test_sqlite Time (mean ± σ): 140.8 ms ± 0.9 ms [User: 136.1 ms, System: 3.2 ms] Range (min … max): 139.2 ms … 143.0 ms 50 runs Summary ~/code/sandbox/databases/db_test_rocksdb/target/release/db_test_rocksdb ran 1.36 ± 0.03 times faster than ~/code/sandbox/databases/db_test_sqlite/target/release/db_test_sqlite 4.96 ± 0.12 times faster than ~/code/sandbox/databases/db_test_redis/target/release/db_test_redis
testing script
hyperfine --warmup 3 '~/code/sandbox/databases/db_test_redis/target/release/db_test_redis' '~/code/sandbox/databases/db_test_rocksdb/target/release/db_test_rocksdb' '~/code/sandbox/databases/db_test_sqlite/target/release/db_test_sqlite'
single thread
Benchmark 1: ~/code/sandbox/databases/db_test_redis/target/release/db_test_redis Time (mean ± σ): 1.010 s ± 0.020 s [User: 0.106 s, System: 0.275 s] Range (min … max): 0.970 s … 1.029 s 10 runs Benchmark 2: ~/code/sandbox/databases/db_test_rocksdb/target/release/db_test_rocksdb Time (mean ± σ): 181.5 ms ± 10.5 ms [User: 117.8 ms, System: 72.2 ms] Range (min … max): 174.1 ms … 212.0 ms 16 runs Benchmark 3: ~/code/sandbox/databases/db_test_sqlite/target/release/db_test_sqlite Time (mean ± σ): 42.610 s ± 7.241 s [User: 0.471 s, System: 6.406 s] Range (min … max): 33.006 s … 54.726 s 10 runs Summary ~/code/sandbox/databases/db_test_rocksdb/target/release/db_test_rocksdb ran 5.57 ± 0.34 times faster than ~/code/sandbox/databases/db_test_redis/target/release/db_test_redis 234.73 ± 42.15 times faster than ~/code/sandbox/databases/db_test_sqlite/target/release/db_test_sqlite
ただし、sqlite を Connection::open_in_memory()
とすると、一番になるという事実で、こうすると当然だけど発揮性になる:
Benchmark 3: ~/code/sandbox/databases/db_test_sqlite/target/release/db_test_sqlite Time (mean ± σ): 109.0 ms ± 1.3 ms [User: 104.2 ms, System: 3.3 ms] Range (min … max): 106.8 ms … 112.9 ms 27 runs
トレードオフとして、 sqlite
を in memory
運用し出すと、
- バックアップに対するポリシー
- 再起動時の再読み込み
を考えないと行けなくなり、そんなんなら普通に HashMap
で保存したっていい気がする。 redis
を使うのは、メモリだとしても、バックアップ等、非発揮でデータを保存できるからである。
もう一つ、運営上便利なのはアドミン画面で、
db | redis | rocksdb | sqlite | sqlite (in memory) |
speed | 3rd | 2nd | 4th | 1st |
volatile | false | false | false | true |
admin | true | false | true | false |
こうすると、なんだかんだで、 redis
が最高速じゃないけど、そこそこ何も考えなければいいってことになる。
sqlite でもいいと思ったんだけどなぁ…。
multithread (horizontal scaling)
redis
redis がかわいそうなので、マルチスレッドにして、connection を複数にしてみた。
Benchmark 1: ~/code/sandbox/databases/db_test_redis/target/release/db_test_redis Time (mean ± σ): 390.2 ms ± 5.4 ms [User: 101.0 ms, System: 480.3 ms] Range (min … max): 384.9 ms … 404.0 ms 10 runs
そしたら、大体 3 倍弱のスピード増。
マルチスレッドにしたから、system time が wall clock time (実時間) よりも大きくなってる(確認)
それでもまだ sqlite
や rocksdb
には及ばない。一エントリ当たり、24 マイクロ秒なので、これを async 環境で使うなら、 block_on
を使わないと行けない。
rocksdb
同じように、rocksdb もマルチスレッドにした。上の例と同じように、一つの database に対して、一つの接続をするんだけど、column family と言うアトミック write ができるようにした。
こっちはもっとデータの格納レベルで分けてる。sharding。ドキュメント的には、column ごとに、atomic write だから、db 分ける必要ないんだけど… まあいいや。
Benchmark 2: ~/code/sandbox/databases/db_test_rocksdb/target/release/db_test_rocksdb Time (mean ± σ): 86.7 ms ± 3.0 ms [User: 155.0 ms, System: 446.3 ms] Range (min … max): 82.1 ms … 94.8 ms 50 runs
大体スピードは約 2 倍 sqlite
抜いた。
db | redis | rocksdb | sqlite | sqlite (in memory) |
speed | 3rd | 1st | 4th | 2nd |
volatile | false | false | false | true |
admin | true | false | true | false |
結構 sharding しちゃってるから、メンテで頭が痛いけど(特に、admin 画面)、rocksdb が他のオプションと比べても、合理的かもね。
Databases
redis
single thread
use redis::Commands; use serde::{Serialize, Deserialize}; use std::{fs::File, io::Read}; #[derive(Debug, Serialize, Deserialize, PartialEq)] struct People { name: String, language: String, bio: String, version: f64 } const JSON_PATH: &'static str = "/Users/yasushi/code/sandbox/databases/5MB.json"; fn main() { let mut file = File::open(JSON_PATH).unwrap(); let mut buf: String = String::new(); // read json data; file.read_to_string(&mut buf).unwrap(); let values: Vec<People> = serde_json::from_str(&buf).unwrap(); // create a db connection let client = redis::Client::open("redis://127.0.0.1:6379/").unwrap(); let mut con = client.get_connection().unwrap(); for i in 0..values.len() { let key = format!("p{i}"); let bytes: Vec<u8> = serde_json::to_vec(&values[i]).unwrap(); let _: () = con.set(key, &bytes).unwrap(); } for i in 0..values.len() { let key = format!("p{i}"); let bytes: Vec<u8> = con.get(&key).unwrap(); let v: People = serde_json::from_slice(&bytes).unwrap(); assert_eq!(&v, &values[i]); } for i in 0..values.len() { let key = format!("p{i}"); let _: () = con.del(key).unwrap(); } }
multi thread
途端に難しくなるね。
Mutex
を使ってもいいが、 channel
を使うことにした。
use crossbeam::channel; use oneshot; use redis::Commands; use serde::{Deserialize, Serialize}; use std::{fs::File, io::Read}; const NUM_THREADS: usize = 16; #[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] struct People { name: String, language: String, bio: String, version: f64, } enum MessageType { READ, WRITE, DEL, } enum Request { People((usize, People)), Id(usize), } enum Response { People(People), Ok, } struct Message { pub kind: MessageType, pub request: Request, pub response: oneshot::Sender<Response>, } impl Message { fn new(request: Request, kind: MessageType) -> (Self, oneshot::Receiver<Response>) { let (response, r) = oneshot::channel(); let mes = Self { kind, response, request, }; (mes, r) } fn new_read(id: usize) -> (Self, oneshot::Receiver<Response>) { Self::new(Request::Id(id), MessageType::READ) } fn new_write(id: usize, people: People) -> (Self, oneshot::Receiver<Response>) { Self::new(Request::People((id, people)), MessageType::WRITE) } fn new_del(id: usize) -> (Self, oneshot::Receiver<Response>) { Self::new(Request::Id(id), MessageType::DEL) } } const JSON_PATH: &'static str = "/Users/yasushi/code/sandbox/databases/5MB.json"; fn create_connection_thread(client: &redis::Client) -> channel::Sender<Message> { // assuming redis clients can have multiple connection in one client. let mut con = client.get_connection().unwrap(); let (s, r): (channel::Sender<Message>, channel::Receiver<Message>) = channel::unbounded(); std::thread::spawn(move || { while let Ok(mes) = r.recv() { match mes.kind { MessageType::READ => { let Request::Id(i) = mes.request else { continue; }; let key = format!("p{i}"); let bytes: Vec<u8> = con.get(&key).unwrap(); let p: People = serde_json::from_slice(&bytes).unwrap(); let _ = mes.response.send(Response::People(p)); } MessageType::WRITE => { let Request::People((i, p)) = mes.request else { continue; }; let bytes: Vec<u8> = serde_json::to_vec(&p).unwrap(); let key = format!("p{i}"); let _: () = con.set(key, &bytes).unwrap(); let _ = mes.response.send(Response::Ok); } MessageType::DEL => { let Request::Id(i) = mes.request else { continue; }; let key = format!("p{i}"); let _: () = con.del(key).unwrap(); let _ = mes.response.send(Response::Ok); } } } }); return s; } fn main() { let mut file = File::open(JSON_PATH).unwrap(); let mut buf: String = String::new(); // read json data; file.read_to_string(&mut buf).unwrap(); let values: Vec<People> = serde_json::from_str(&buf).unwrap(); // create a db connection let client = redis::Client::open("redis://127.0.0.1:6379/").unwrap(); let connections = (0..NUM_THREADS) .map(|_i| create_connection_thread(&client)) .collect::<Vec<_>>(); let mut responses = Vec::with_capacity(values.len()); for i in 0..values.len() { let (mes, r) = Message::new_write(i, values[i].clone()); connections[i % NUM_THREADS].send(mes).unwrap(); responses.push(r); } for r in responses { r.recv().unwrap(); } let mut responses = Vec::with_capacity(values.len()); for i in 0..values.len() { let (mes, r) = Message::new_read(i); connections[i % NUM_THREADS].send(mes).unwrap(); responses.push(r); } for (i, r) in responses.into_iter().enumerate() { let Response::People(p) = r.recv().unwrap() else { panic!(); }; assert_eq!(&p, &values[i]); } let mut responses = Vec::with_capacity(values.len()); for i in 0..values.len() { let (mes, r) = Message::new_del(i); connections[i % NUM_THREADS].send(mes).unwrap(); responses.push(r); } for r in responses { r.recv().unwrap(); } }
rocksdb
use serde::{Serialize, Deserialize}; use std::{fs::File, io::Read}; #[derive(Debug, Serialize, Deserialize, PartialEq)] struct People { name: String, language: String, bio: String, version: f64 } const JSON_PATH: &'static str = "/Users/yasushi/code/sandbox/databases/5MB.json"; fn main() { let mut options = rocksdb::Options::default(); options.set_error_if_exists(false); options.create_if_missing(true); options.create_missing_column_families(true); let db_path: &str = "./tmp"; let cfs = rocksdb::DB::list_cf(&options, db_path).unwrap_or(vec![]); let cf_exists = cfs.iter().find(|cf| cf == &"cf").is_none(); let mut instance = rocksdb::DB::open_cf(&options, db_path, cfs).unwrap(); if cf_exists { let options = rocksdb::Options::default(); instance.create_cf("cf", &options).unwrap(); } let cf = instance.cf_handle("cf").unwrap(); let mut file = File::open(JSON_PATH).unwrap(); let mut buf: String = String::new(); // read json data; file.read_to_string(&mut buf).unwrap(); let values: Vec<People> = serde_json::from_str(&buf).unwrap(); for i in 0..values.len() { let key = format!("p{i}"); let bytes: Vec<u8> = serde_json::to_vec(&values[i]).unwrap(); let _: () = instance.put_cf(cf, &key, &bytes).unwrap(); } for i in 0..values.len() { let key = format!("p{i}"); let bytes: Vec<u8> = instance.get_cf(cf, &key).unwrap().unwrap(); let v: People = serde_json::from_slice(&bytes).unwrap(); assert_eq!(&v, &values[i]); } for i in 0..values.len() { let key = format!("p{i}"); let _: () = instance.delete(key).unwrap(); } }
multi thread
use crossbeam::channel; use rocksdb::{MultiThreaded, OptimisticTransactionDB}; use serde::{Deserialize, Serialize}; use std::{fs::File, io::Read}; const NUM_THREADS: usize = 16; const DB_BASE_NAME: &str = "./tmp"; const CF_BASE_NAME: &str = "cf"; #[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] struct People { name: String, language: String, bio: String, version: f64, } enum MessageType { READ, WRITE, DEL, } enum Request { People((usize, People)), Id(usize), } enum Response { People(People), Ok, } struct Message { pub kind: MessageType, pub request: Request, pub response: oneshot::Sender<Response>, } impl Message { fn new(request: Request, kind: MessageType) -> (Self, oneshot::Receiver<Response>) { let (response, r) = oneshot::channel(); let mes = Self { kind, response, request, }; (mes, r) } fn new_read(id: usize) -> (Self, oneshot::Receiver<Response>) { Self::new(Request::Id(id), MessageType::READ) } fn new_write(id: usize, people: People) -> (Self, oneshot::Receiver<Response>) { Self::new(Request::People((id, people)), MessageType::WRITE) } fn new_del(id: usize) -> (Self, oneshot::Receiver<Response>) { Self::new(Request::Id(id), MessageType::DEL) } } const JSON_PATH: &'static str = "/Users/yasushi/code/sandbox/databases/5MB.json"; fn create_column_family_thread(options: &rocksdb::Options, id: usize) -> channel::Sender<Message> { let (s, r): (channel::Sender<Message>, channel::Receiver<Message>) = channel::unbounded(); let cf_name = format!("{CF_BASE_NAME}{id}"); let db_path = format!("{DB_BASE_NAME}{id}"); let options = options.clone(); std::thread::spawn(move || { let cfs = rocksdb::DB::list_cf(&options, &db_path).unwrap_or(vec![]); let cf_exists = cfs.iter().find(|cf| *cf == &cf_name).is_some(); let instance = rocksdb::DB::open_cf(&options, db_path, cfs).unwrap(); if !cf_exists { let options = rocksdb::Options::default(); instance.create_cf(&cf_name, &options).unwrap(); } let cf = instance.cf_handle(&cf_name).unwrap(); while let Ok(mes) = r.recv() { match mes.kind { MessageType::READ => { let Request::Id(i) = mes.request else { continue; }; let key = format!("p{i}"); let bytes: Vec<u8> = instance.get_cf(&cf, key).unwrap().unwrap(); let p: People = serde_json::from_slice(&bytes).unwrap(); let _ = mes.response.send(Response::People(p)); } MessageType::WRITE => { let Request::People((i, p)) = mes.request else { continue; }; let bytes: Vec<u8> = serde_json::to_vec(&p).unwrap(); let key = format!("p{i}"); let _: () = instance.put_cf(&cf, key, &bytes).unwrap(); let _ = mes.response.send(Response::Ok); } MessageType::DEL => { let Request::Id(i) = mes.request else { continue; }; let key = format!("p{i}"); let _: () = instance.delete_cf(&cf, key).unwrap(); let _ = mes.response.send(Response::Ok); } } } }); s } fn main() { let mut file = File::open(JSON_PATH).unwrap(); let mut buf: String = String::new(); // read json data; file.read_to_string(&mut buf).unwrap(); let values: Vec<People> = serde_json::from_str(&buf).unwrap(); let mut options = rocksdb::Options::default(); options.set_error_if_exists(false); options.create_if_missing(true); options.set_write_buffer_size(16 << 30); options.create_missing_column_families(true); let connections = (0..NUM_THREADS) .map(|i| create_column_family_thread(&options, i)) .collect::<Vec<_>>(); let mut responses = Vec::with_capacity(values.len()); for i in 0..values.len() { let (mes, r) = Message::new_write(i, values[i].clone()); connections[i % NUM_THREADS].send(mes).unwrap(); responses.push(r); } for r in responses { r.recv().unwrap(); } let mut responses = Vec::with_capacity(values.len()); for i in 0..values.len() { let (mes, r) = Message::new_read(i); connections[i % NUM_THREADS].send(mes).unwrap(); responses.push(r); } for (i, r) in responses.into_iter().enumerate() { let Response::People(p) = r.recv().unwrap() else { panic!(); }; assert_eq!(&p, &values[i]); } let mut responses = Vec::with_capacity(values.len()); for i in 0..values.len() { let (mes, r) = Message::new_del(i); connections[i % NUM_THREADS].send(mes).unwrap(); responses.push(r); } for r in responses { r.recv().unwrap(); } }
column family level lock できたんだけど、db のハンドルでやっぱりボトルネックなのか遅くなっちゃった。
Benchmark 2: ~/code/sandbox/databases/db_test_rocksdb/target/release/db_test_rocksdb Time (mean ± σ): 218.4 ms ± 18.4 ms [User: 333.9 ms, System: 400.0 ms] Range (min … max): 186.2 ms … 245.3 ms 15 runs
sqlite
use serde::{Serialize, Deserialize}; use serde_json::Value; use std::{fs::File, io::Read}; use rusqlite::{Connection, params}; #[derive(Debug, Serialize, Deserialize, PartialEq)] struct People { name: String, language: String, bio: String, version: f64 } const JSON_PATH: &'static str = "/Users/yasushi/code/sandbox/databases/5MB.json"; fn main() { // let con = Connection::open_in_memory().unwrap(); let con = Connection::open("db.sqlite3").unwrap(); let create = "create table if not exists people(id integer primary KEY, data json)"; con.execute(create, ()).unwrap(); let mut file = File::open(JSON_PATH).unwrap(); let mut buf: String = String::new(); file.read_to_string(&mut buf).unwrap(); let values: Vec<People> = serde_json::from_str(&buf).unwrap(); for i in 0..values.len() { let string = serde_json::to_string(&values[i]).unwrap(); let index = i + 1; let q = format!("insert into people (id, data) values(?1, ?2)"); con.execute(&q, (&index, &string)).unwrap(); } for i in 0..values.len() { let index = i + 1; let q = format!("select * from people where id={index}"); let mut statement = con.prepare(&q).unwrap(); let value: Value = statement.query_row((), |r| r.get(1)).unwrap(); let v: People = serde_json::from_value(value).unwrap(); assert_eq!(&v, &values[i]); } for i in 0..values.len() { let index = i + 1; con.execute("delete from people where id=?1", params![index]).unwrap(); } con.execute("drop table if exists people", []).unwrap(); }