From 5f6e02d64e3789115ea4327a045b8ad3c39b1808 Mon Sep 17 00:00:00 2001 From: Ralf Jung Date: Mon, 13 Jul 2015 15:57:37 +0200 Subject: [PATCH] implement rgrep, and write part 12 (draft) about it --- solutions/Cargo.lock | 59 +++++++++++++ solutions/Cargo.toml | 3 + solutions/src/lib.rs | 2 - solutions/src/main.rs | 9 ++ solutions/src/rgrep.rs | 113 +++++++++++++++++++++++++ src/main.rs | 3 +- src/part12.rs | 179 ++++++++++++++++++++++++++++++++++++++++ workspace/src/main.rs | 1 + workspace/src/part12.rs | 123 +++++++++++++++++++++++++++ 9 files changed, 489 insertions(+), 3 deletions(-) delete mode 100644 solutions/src/lib.rs create mode 100644 solutions/src/main.rs create mode 100644 solutions/src/rgrep.rs create mode 100644 src/part12.rs create mode 100644 workspace/src/part12.rs diff --git a/solutions/Cargo.lock b/solutions/Cargo.lock index ffb21e6..9aada13 100644 --- a/solutions/Cargo.lock +++ b/solutions/Cargo.lock @@ -1,4 +1,63 @@ [root] name = "solutions" version = "0.1.0" +dependencies = [ + "docopt 0.6.67 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "aho-corasick" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "memchr 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "docopt" +version = "0.6.67" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "regex 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)", + "rustc-serialize 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)", + "strsim 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "libc" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "memchr" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "regex" +version = "0.1.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "aho-corasick 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "memchr 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "regex-syntax 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "regex-syntax" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "rustc-serialize" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "strsim" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" diff --git a/solutions/Cargo.toml b/solutions/Cargo.toml index 8aebfa9..d48a4dd 100644 --- a/solutions/Cargo.toml +++ b/solutions/Cargo.toml @@ -2,3 +2,6 @@ name = "solutions" version = "0.1.0" authors = ["Ralf Jung "] + +[dependencies] +docopt = "*" diff --git a/solutions/src/lib.rs b/solutions/src/lib.rs deleted file mode 100644 index cbe9705..0000000 --- a/solutions/src/lib.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod bigint; -pub mod vec; diff --git a/solutions/src/main.rs b/solutions/src/main.rs new file mode 100644 index 0000000..8afd81c --- /dev/null +++ b/solutions/src/main.rs @@ -0,0 +1,9 @@ +extern crate docopt; + +pub mod bigint; +pub mod vec; +pub mod rgrep; + +pub fn main() { + rgrep::main(); +} \ No newline at end of file diff --git a/solutions/src/rgrep.rs b/solutions/src/rgrep.rs new file mode 100644 index 0000000..a3b74cc --- /dev/null +++ b/solutions/src/rgrep.rs @@ -0,0 +1,113 @@ +use std::io::prelude::*; +use std::{io, fs, thread, process}; +use std::sync::mpsc::{sync_channel, SyncSender, Receiver}; +use std::sync::Arc; + +#[derive(Clone,Copy)] +enum OutputMode { + Print, + SortAndPrint, + Count, +} +use self::OutputMode::*; + +struct Options { + files: Vec, + pattern: String, + output_mode: OutputMode, +} + +struct Line { + data: String, + file: usize, + line: usize, +} + +fn read_files(options: Arc, out_channel: SyncSender) { + for (fileidx, file) in options.files.iter().enumerate() { + let file = fs::File::open(file).unwrap(); + let file = io::BufReader::new(file); + for (lineidx, line) in file.lines().enumerate() { + let line = Line { data: line.unwrap(), file: fileidx, line: lineidx }; + out_channel.send(line).unwrap(); + } + } +} + +fn filter_lines(options: Arc, in_channel: Receiver, out_channel: SyncSender) { + for line in in_channel.iter() { + if line.data.contains(&options.pattern) { + out_channel.send(line).unwrap(); + } + } +} + +fn output_lines(options: Arc, in_channel: Receiver) { + match options.output_mode { + Print => { + for line in in_channel.iter() { + println!("{}:{}: {}", options.files[line.file], line.line, line.data); + } + }, + Count => { + let count = in_channel.iter().count(); + println!("{} hits for {}.", count, options.pattern); + }, + SortAndPrint => { + let _data: Vec = in_channel.iter().collect(); + unimplemented!() + } + } +} + +static USAGE: &'static str = " +Usage: rgrep [-c] [-s] ... + +Options: + -c, --count Count number of matching lines (rather than printing them). + -s, --sort Sort the lines before printing. +"; + +fn get_options() -> Options { + use docopt::Docopt; + + // Parse argv and exit the program with an error message if it fails. + let args = Docopt::new(USAGE).and_then(|d| d.parse()).unwrap_or_else(|e| e.exit()); + let count = args.get_bool("-c"); + let sort = args.get_bool("-s"); + let pattern = args.get_str(""); + let files = args.get_vec(""); + if count && sort { + println!("Setting both '-c' and '-s' at the same time does not make any sense."); + process::exit(1); + } + + // We need to make the strings owned to construct the `Options` instance. + Options { + files: files.iter().map(|file| file.to_string()).collect(), + pattern: pattern.to_string(), + output_mode: if count { Count } else if sort { SortAndPrint } else { Print }, + } +} + +fn run(options: Options) { + let options = Arc::new(options); + + // Set up the chain of threads. Use `sync_channel` with buffer-size of 16 to avoid needlessly filling RAM. + let (line_sender, line_receiver) = sync_channel(16); + let (filtered_sender, filtered_receiver) = sync_channel(16); + + let options1 = options.clone(); + let handle1 = thread::spawn(move || read_files(options1, line_sender)); + let options2 = options.clone(); + let handle2 = thread::spawn(move || filter_lines(options2, line_receiver, filtered_sender)); + let options3 = options.clone(); + let handle3 = thread::spawn(move || output_lines(options3, filtered_receiver)); + handle1.join().unwrap(); + handle2.join().unwrap(); + handle3.join().unwrap(); +} + +pub fn main() { + run(get_options()); +} diff --git a/src/main.rs b/src/main.rs index 4fe4215..8526698 100644 --- a/src/main.rs +++ b/src/main.rs @@ -29,7 +29,7 @@ // first requirement rules out a garbage collector: Rust can run "bare metal". // In fact, Rust rules out more classes of bugs than languages that achieve safety // with a GC: Besides dangling pointers and double-free, Rust also prevents issues -// such as iterator invalidation and race conditions. +// such as iterator invalidation and data races. // // // Getting started @@ -93,6 +93,7 @@ mod part08; mod part09; mod part10; mod part11; +mod part12; // To actually run the code of some part (after filling in the blanks, if necessary), simply edit the `main` // function. diff --git a/src/part12.rs b/src/part12.rs new file mode 100644 index 0000000..edcb9e0 --- /dev/null +++ b/src/part12.rs @@ -0,0 +1,179 @@ +// Rust-101, Part 12: Concurrency (WIP) +// ================= + +use std::io::prelude::*; +use std::{io, fs, thread}; +use std::sync::mpsc::{sync_channel, SyncSender, Receiver}; +use std::sync::Arc; + +//@ This part is introducing the concurrency features of Rust. We are going to write our own small version of "grep", +//@ called *rgrep*, and it is going to make use of multiple cores: One thread reads the input files, one thread does +//@ the actual matching, and one thread writes the output. + +// Before we come to the actual code, we define a data-structure `Options` to store all the information we need +// to complete the job: Which files to work on, which pattern to look for, and how to output.
+// Besides just printing all the matching lines, we will also offer to count them, or alternatively to sort them. +#[derive(Clone,Copy)] +enum OutputMode { + Print, + SortAndPrint, + Count, +} +use self::OutputMode::*; + +struct Options { + files: Vec, + pattern: String, + output_mode: OutputMode, +} + +//@ Now we can write three functions to do the actual job of reading, matching, and printing, respectively. +//@ To get the data from one thread to the next, we will use *message passing*: We will establish communication +//@ channels between the threads, with one thread *sending* data, and the other one receiving it. `SyncSender` +//@ is the type of the sending end of a synchronous channel transmitting data of type `T`. *Synchronous* here +//@ means that the `send` operation could block, waiting for the other side to make progress. We don't want to +//@ end up with the entire files being stored in the buffer of the channels, and the output not being fast enough +//@ to keep up with the speed of input. +//@ +//@ We also need all the threads to have access to the options of the job they are supposed to do. Since it would +//@ be rather unnecessary to actually copy these options around, we will use reference-counting to share them between +//@ all threads. `Arc` is the thread-safe version of `Rc, using atomic operations to keep the reference count up-to-date. +//@ You can also think of this as saying that *all* threads own the `Options` "a bit" - and since there could be other +//@ owners, `Arc` (just like `Rc`) only permits read-only access to its content. That's good enough for the options, though. + +// The first functions reads the files, and sends every line over the `out_channel`. +fn read_files(options: Arc, out_channel: SyncSender) { + for file in options.files.iter() { + // First, we open the file, ignoring any errors. + let file = fs::File::open(file).unwrap(); + // Then we obtain a `BufReader` for it, which provides the `lines` function. + let file = io::BufReader::new(file); + for line in file.lines() { + let line = line.unwrap(); + // Now we send the line over the channel, ignoring the possibility of `send` failing. + out_channel.send(line).unwrap(); + } + } + // When we drop the `out_channel`, it will be closed, which the other end can notice. +} + +// The second function filters the lines it receives through `in_channel` with the pattern, and sends +// matches via `out_channel`. +fn filter_lines(options: Arc, in_channel: Receiver, out_channel: SyncSender) { + // We can simply iterate over the channel, which will stop when the channel is closed. + for line in in_channel.iter() { + // `contains` works on lots of types of patterns, but in particular, we can use it to test whether + // one string is contained in another. + if line.contains(&options.pattern) { + out_channel.send(line).unwrap(); /*@*/ + } + } +} + +// The third function performs the output operations, receiving the relevant lines on its `in_channel`. +fn output_lines(options: Arc, in_channel: Receiver) { + match options.output_mode { + Print => { + // Here, we just print every line we see. + for line in in_channel.iter() { + println!("{}", line); /*@*/ + } + }, + Count => { + // We are supposed to count the number of matching lines. There's a convenient iterator adapter that + // we can use for this job. + let count = in_channel.iter().count(); /*@*/ + println!("{} hits for {}.", count, options.pattern); /*@*/ + }, + SortAndPrint => { + // We are asked to sort the matching lines before printing. So let's collect them all in a local vector... + let data: Vec = in_channel.iter().collect(); + // ...and implement the actual sorting later. + unimplemented!() + } + } +} + +// With the operations of the three threads defined, we can now implement a function that performs grepping according +// to some given options. +fn run(options: Options) { + // We move the `options` into an `Arc`, as that's what the thread workers expect. + let options = Arc::new(options); + + // Set up the channels. Use `sync_channel` with buffer-size of 16 to avoid needlessly filling RAM. + let (line_sender, line_receiver) = sync_channel(16); + let (filtered_sender, filtered_receiver) = sync_channel(16); + + // Spawn the read thread: `thread::spawn` takes a closure that is run in a new thread. + //@ The `move` keyword again tells Rust that we want ownership of captured variables to be moved into the + //@ closure. This means we need to do the `clone` *first*, otherwise we would lose our `options` to the + //@ new thread! + let options1 = options.clone(); + let handle1 = thread::spawn(move || read_files(options1, line_sender)); + + // Same with the filter thread. + let options2 = options.clone(); + let handle2 = thread::spawn(move || filter_lines(options2, line_receiver, filtered_sender)); + + // And the output thread. + let options3 = options.clone(); + let handle3 = thread::spawn(move || output_lines(options3, filtered_receiver)); + + // Finally, wait until all three threads did their job. + handle1.join().unwrap(); + handle2.join().unwrap(); + handle3.join().unwrap(); +} + +// Now we have all the pieces together for testing our `rgrep` with some hard-coded options. +//@ We need to call `to_string` on string literals to convert them to a fully-owned `String`. +pub fn main() { + let options = Options { + files: vec!["src/part10.rs".to_string(), "src/part11.rs".to_string(), "src/part12.rs".to_string()], + pattern: "let".to_string(), + output_mode: Print + }; + run(options); +} + +// **Exercise 12.1**: Change `rgrep` such that it prints now only the matching lines, but also the name of the file +// and the number of the line in the file. You will have to change the type of the channels from `String` to something +// that records this extra information. + +//@ ## Ownership, Borrowing, and Concurrency +//@ The little demo above showed that concurrency in Rust has a fairly simple API. However, considering Rust has closures, +//@ that should not be entirely surprising. However, as I mentioned in the beginning, Rust ensures that well-typed programs +//@ do not have data races. How can that be? A data race is typically defined as having two concurrent, unsynchronized +//@ accesses to the same memory location, at least one of which is a write. In other words, a data race is mutation in +//@ the presence of aliasing, which Rust reliably rules out! It turns out that the same mechanism that makes our single-threaded +//@ programs memory safe, and that prevents us from invalidating iterators, also helps secure our multi-threaded code against +//@ data races. For example, notice how `read_files` sends a `String` to `filter_lines`. At run-time, only the pointer to +//@ the string will actually be moved around (just like when a `String` is passed to a function with full ownership). However, +//@ `read_files` has to *give up* ownership of the string to perform `send`, to it is impossible for an outstanding borrow to +//@ still be around. After it sent the string to the other side, `read_files` has no way to race on the data with someone else. +//@ +//@ However, there is more to this. Remember the `'static` bound we had to add to `register` in the previous part, to make +//@ sure that the callbacks to not reference any pointers that might become invalid? This is just as crucial for spawning +//@ a thread: In general, that thread could last for much longer than the current stack frame. Thus, it must not use +//@ any pointers to data in that stack frame. This is achieved by requiring the `FnOnce` closure passed to `thread::spawn` +//@ to be valid for lifetime `'static`, as you can see in [its documentation](http://doc.rust-lang.org/stable/std/thread/fn.spawn.html). +//@ This avoids another kind of data race, where the thread's access races with the callee deallocating its stack frame. + +//@ ## Send +//@ However, the story goes further. I said above that `Arc` is a thread-safe version of `Rc`, which uses atomic operations +//@ to manipulate the reference count. It is thus crucial that we don't use `Rc` above, or the reference count may become invalid. +//@ And indeed, if you replace `Arc` by `Rc` (and add the appropriate imports), Rust will tell you that something is wrong. +//@ That's great, of course, but how did it do that? +//@ +//@ The answer is already hinted at in the error: It will say something about `Send`. You may have noticed that the closure in +//@ `thread::spawn` does not just have a `'static` bound, but also has to satisfy `Send`. `Send` is a trait, and just like `Copy`, +//@ it's just a marker - there are no functions provided by `Send` What the trait says is that types which are `Send`, can be +//@ safely sent to another thread without causing trouble. Of course, all the primitive data-types are `Send`. So is `Arc`, +//@ which is why Rust accepted our code. But `Rc` is not `Send`, and for a good reason! +//@ +//@ Now, `Send` as a trait is fairly special. It has a so-called *default implementation*. This means that *every type* implements +//@ `Send`, unless it opts out. Opting out is viral: If your type contains a type that opted out, then you don't have `Send`, either. +//@ So if the environment of your closure contains an `Rc`, it won't be `Send`, preventing it from causing trouble. If however every +//@ captured variable *is* `Send`, then so is the entire environment, and you are good. + +//@ [index](main.html) | [previous](part11.html) | [next](main.html) diff --git a/workspace/src/main.rs b/workspace/src/main.rs index 8531db6..98e8e8d 100644 --- a/workspace/src/main.rs +++ b/workspace/src/main.rs @@ -13,6 +13,7 @@ mod part08; mod part09; mod part10; mod part11; +mod part12; // This decides which part is actually run. fn main() { diff --git a/workspace/src/part12.rs b/workspace/src/part12.rs new file mode 100644 index 0000000..1d75bfd --- /dev/null +++ b/workspace/src/part12.rs @@ -0,0 +1,123 @@ +// Rust-101, Part 12: Concurrency (WIP) +// ================= + +use std::io::prelude::*; +use std::{io, fs, thread}; +use std::sync::mpsc::{sync_channel, SyncSender, Receiver}; +use std::sync::Arc; + + +// Before we come to the actual code, we define a data-structure `Options` to store all the information we need +// to complete the job: Which files to work on, which pattern to look for, and how to output.
+// Besides just printing all the matching lines, we will also offer to count them, or alternatively to sort them. +#[derive(Clone,Copy)] +enum OutputMode { + Print, + SortAndPrint, + Count, +} +use self::OutputMode::*; + +struct Options { + files: Vec, + pattern: String, + output_mode: OutputMode, +} + + +// The first functions reads the files, and sends every line over the `out_channel`. +fn read_files(options: Arc, out_channel: SyncSender) { + for file in options.files.iter() { + // First, we open the file, ignoring any errors. + let file = fs::File::open(file).unwrap(); + // Then we obtain a `BufReader` for it, which provides the `lines` function. + let file = io::BufReader::new(file); + for line in file.lines() { + let line = line.unwrap(); + // Now we send the line over the channel, ignoring the possibility of `send` failing. + out_channel.send(line).unwrap(); + } + } + // When we drop the `out_channel`, it will be closed, which the other end can notice. +} + +// The second function filters the lines it receives through `in_channel` with the pattern, and sends +// matches via `out_channel`. +fn filter_lines(options: Arc, in_channel: Receiver, out_channel: SyncSender) { + // We can simply iterate over the channel, which will stop when the channel is closed. + for line in in_channel.iter() { + // `contains` works on lots of types of patterns, but in particular, we can use it to test whether + // one string is contained in another. + if line.contains(&options.pattern) { + unimplemented!() + } + } +} + +// The third function performs the output operations, receiving the relevant lines on its `in_channel`. +fn output_lines(options: Arc, in_channel: Receiver) { + match options.output_mode { + Print => { + // Here, we just print every line we see. + for line in in_channel.iter() { + unimplemented!() + } + }, + Count => { + // We are supposed to count the number of matching lines. There's a convenient iterator adapter that + // we can use for this job. + unimplemented!() + }, + SortAndPrint => { + // We are asked to sort the matching lines before printing. So let's collect them all in a local vector... + let data: Vec = in_channel.iter().collect(); + // ...and implement the actual sorting later. + unimplemented!() + } + } +} + +// With the operations of the three threads defined, we can now implement a function that performs grepping according +// to some given options. +fn run(options: Options) { + // We move the `options` into an `Arc`, as that's what the thread workers expect. + let options = Arc::new(options); + + // Set up the channels. Use `sync_channel` with buffer-size of 16 to avoid needlessly filling RAM. + let (line_sender, line_receiver) = sync_channel(16); + let (filtered_sender, filtered_receiver) = sync_channel(16); + + // Spawn the read thread: `thread::spawn` takes a closure that is run in a new thread. + let options1 = options.clone(); + let handle1 = thread::spawn(move || read_files(options1, line_sender)); + + // Same with the filter thread. + let options2 = options.clone(); + let handle2 = thread::spawn(move || filter_lines(options2, line_receiver, filtered_sender)); + + // And the output thread. + let options3 = options.clone(); + let handle3 = thread::spawn(move || output_lines(options3, filtered_receiver)); + + // Finally, wait until all three threads did their job. + handle1.join().unwrap(); + handle2.join().unwrap(); + handle3.join().unwrap(); +} + +// Now we have all the pieces together for testing our `rgrep` with some hard-coded options. +pub fn main() { + let options = Options { + files: vec!["src/part10.rs".to_string(), "src/part11.rs".to_string(), "src/part12.rs".to_string()], + pattern: "let".to_string(), + output_mode: Print + }; + run(options); +} + +// **Exercise 12.1**: Change `rgrep` such that it prints now only the matching lines, but also the name of the file +// and the number of the line in the file. You will have to change the type of the channels from `String` to something +// that records this extra information. + + + -- 2.30.2