implement rgrep, and write part 12 (draft) about it
authorRalf Jung <post@ralfj.de>
Mon, 13 Jul 2015 13:57:37 +0000 (15:57 +0200)
committerRalf Jung <post@ralfj.de>
Mon, 13 Jul 2015 13:58:08 +0000 (15:58 +0200)
solutions/Cargo.lock
solutions/Cargo.toml
solutions/src/lib.rs [deleted file]
solutions/src/main.rs [new file with mode: 0644]
solutions/src/rgrep.rs [new file with mode: 0644]
src/main.rs
src/part12.rs [new file with mode: 0644]
workspace/src/main.rs
workspace/src/part12.rs [new file with mode: 0644]

index ffb21e6e344049bc708e4cb042dce5ff5c539a22..9aada1302251ac37682f9c17582545cb20db7800 100644 (file)
@@ -1,4 +1,63 @@
 [root]
 name = "solutions"
 version = "0.1.0"
+dependencies = [
+ "docopt 0.6.67 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "aho-corasick"
+version = "0.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "memchr 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "docopt"
+version = "0.6.67"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "regex 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)",
+ "rustc-serialize 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)",
+ "strsim 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "libc"
+version = "0.1.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
+name = "memchr"
+version = "0.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "libc 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "regex"
+version = "0.1.40"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "aho-corasick 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
+ "memchr 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
+ "regex-syntax 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "regex-syntax"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
+name = "rustc-serialize"
+version = "0.3.15"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
+name = "strsim"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
 
index 8aebfa98aedf6665a2d947337741b8e965aacc2b..d48a4dd3d95a968df9e7d2082fc339abd4cd9094 100644 (file)
@@ -2,3 +2,6 @@
 name = "solutions"
 version = "0.1.0"
 authors = ["Ralf Jung <post@ralfj.de>"]
+
+[dependencies]
+docopt = "*"
diff --git a/solutions/src/lib.rs b/solutions/src/lib.rs
deleted file mode 100644 (file)
index cbe9705..0000000
+++ /dev/null
@@ -1,2 +0,0 @@
-pub mod bigint;
-pub mod vec;
diff --git a/solutions/src/main.rs b/solutions/src/main.rs
new file mode 100644 (file)
index 0000000..8afd81c
--- /dev/null
@@ -0,0 +1,9 @@
+extern crate docopt;
+
+pub mod bigint;
+pub mod vec;
+pub mod rgrep;
+
+pub fn main() {
+    rgrep::main();
+}
\ No newline at end of file
diff --git a/solutions/src/rgrep.rs b/solutions/src/rgrep.rs
new file mode 100644 (file)
index 0000000..a3b74cc
--- /dev/null
@@ -0,0 +1,113 @@
+use std::io::prelude::*;
+use std::{io, fs, thread, process};
+use std::sync::mpsc::{sync_channel, SyncSender, Receiver};
+use std::sync::Arc;
+
+#[derive(Clone,Copy)]
+enum OutputMode {
+    Print,
+    SortAndPrint,
+    Count,
+}
+use self::OutputMode::*;
+
+struct Options {
+    files: Vec<String>,
+    pattern: String,
+    output_mode: OutputMode,
+}
+
+struct Line {
+    data: String,
+    file: usize,
+    line: usize,
+}
+
+fn read_files(options: Arc<Options>, out_channel: SyncSender<Line>) {
+    for (fileidx, file) in options.files.iter().enumerate() {
+        let file = fs::File::open(file).unwrap();
+        let file = io::BufReader::new(file);
+        for (lineidx, line) in file.lines().enumerate() {
+            let line = Line { data: line.unwrap(), file: fileidx, line: lineidx };
+            out_channel.send(line).unwrap();
+        }
+    }
+}
+
+fn filter_lines(options: Arc<Options>, in_channel: Receiver<Line>, out_channel: SyncSender<Line>) {
+    for line in in_channel.iter() {
+        if line.data.contains(&options.pattern) {
+            out_channel.send(line).unwrap();
+        }
+    }
+}
+
+fn output_lines(options: Arc<Options>, in_channel: Receiver<Line>) {
+    match options.output_mode {
+        Print => {
+            for line in in_channel.iter() {
+                println!("{}:{}: {}", options.files[line.file], line.line, line.data);
+            }
+        },
+        Count => {
+            let count = in_channel.iter().count();
+            println!("{} hits for {}.", count, options.pattern);
+        },
+        SortAndPrint => {
+            let _data: Vec<Line> = in_channel.iter().collect();
+            unimplemented!()
+        }
+    }
+}
+
+static USAGE: &'static str = "
+Usage: rgrep [-c] [-s] <pattern> <file>...
+
+Options:
+    -c, --count  Count number of matching lines (rather than printing them).
+    -s, --sort   Sort the lines before printing.
+";
+
+fn get_options() -> Options {
+    use docopt::Docopt;
+
+    // Parse argv and exit the program with an error message if it fails.
+    let args = Docopt::new(USAGE).and_then(|d| d.parse()).unwrap_or_else(|e| e.exit());
+    let count = args.get_bool("-c");
+    let sort = args.get_bool("-s");
+    let pattern = args.get_str("<pattern>");
+    let files = args.get_vec("<file>");
+    if count && sort {
+        println!("Setting both '-c' and '-s' at the same time does not make any sense.");
+        process::exit(1);
+    }
+
+    // We need to make the strings owned to construct the `Options` instance.
+    Options {
+        files: files.iter().map(|file| file.to_string()).collect(),
+        pattern: pattern.to_string(),
+        output_mode: if count { Count } else if sort { SortAndPrint } else { Print },
+    }
+}
+
+fn run(options: Options) {
+    let options = Arc::new(options);
+
+    // Set up the chain of threads. Use `sync_channel` with buffer-size of 16 to avoid needlessly filling RAM.
+    let (line_sender, line_receiver) = sync_channel(16);
+    let (filtered_sender, filtered_receiver) = sync_channel(16);
+
+    let options1 = options.clone();
+    let handle1 = thread::spawn(move || read_files(options1, line_sender));
+    let options2 = options.clone();
+    let handle2 = thread::spawn(move || filter_lines(options2, line_receiver, filtered_sender));
+    let options3 = options.clone();
+    let handle3 = thread::spawn(move || output_lines(options3, filtered_receiver));
+    handle1.join().unwrap();
+    handle2.join().unwrap();
+    handle3.join().unwrap();
+}
+
+pub fn main() {
+    run(get_options());
+}
index 4fe4215e1333da181745742e0741fa8d6126c16d..8526698a06683e749c110216ebbc312c2376d1fd 100644 (file)
@@ -29,7 +29,7 @@
 // first requirement rules out a garbage collector: Rust can run "bare metal".
 // In fact, Rust rules out more classes of bugs than languages that achieve safety
 // with a GC: Besides dangling pointers and double-free, Rust also prevents issues
-// such as iterator invalidation and race conditions.
+// such as iterator invalidation and data races.
 // 
 // 
 // Getting started
@@ -93,6 +93,7 @@ mod part08;
 mod part09;
 mod part10;
 mod part11;
+mod part12;
 
 // To actually run the code of some part (after filling in the blanks, if necessary), simply edit the `main`
 // function.
diff --git a/src/part12.rs b/src/part12.rs
new file mode 100644 (file)
index 0000000..edcb9e0
--- /dev/null
@@ -0,0 +1,179 @@
+// Rust-101, Part 12: Concurrency (WIP)
+// =================
+
+use std::io::prelude::*;
+use std::{io, fs, thread};
+use std::sync::mpsc::{sync_channel, SyncSender, Receiver};
+use std::sync::Arc;
+
+//@ This part is introducing the concurrency features of Rust. We are going to write our own small version of "grep",
+//@ called *rgrep*, and it is going to make use of multiple cores: One thread reads the input files, one thread does
+//@ the actual matching, and one thread writes the output.
+
+// Before we come to the actual code, we define a data-structure `Options` to store all the information we need
+// to complete the job: Which files to work on, which pattern to look for, and how to output. <br/>
+// Besides just printing all the matching lines, we will also offer to count them, or alternatively to sort them.
+#[derive(Clone,Copy)]
+enum OutputMode {
+    Print,
+    SortAndPrint,
+    Count,
+}
+use self::OutputMode::*;
+
+struct Options {
+    files: Vec<String>,
+    pattern: String,
+    output_mode: OutputMode,
+}
+
+//@ Now we can write three functions to do the actual job of reading, matching, and printing, respectively.
+//@ To get the data from one thread to the next, we will use *message passing*: We will establish communication
+//@ channels between the threads, with one thread *sending* data, and the other one receiving it. `SyncSender<T>`
+//@ is the type of the sending end of a synchronous channel transmitting data of type `T`. *Synchronous* here
+//@ means that the `send` operation could block, waiting for the other side to make progress. We don't want to
+//@ end up with the entire files being stored in the buffer of the channels, and the output not being fast enough
+//@ to keep up with the speed of input.
+//@
+//@ We also need all the threads to have access to the options of the job they are supposed to do. Since it would
+//@ be rather unnecessary to actually copy these options around, we will use reference-counting to share them between
+//@ all threads. `Arc` is the thread-safe version of `Rc, using atomic operations to keep the reference count up-to-date.
+//@ You can also think of this as saying that *all* threads own the `Options` "a bit" - and since there could be other
+//@ owners, `Arc` (just like `Rc`) only permits read-only access to its content. That's good enough for the options, though.
+
+// The first functions reads the files, and sends every line over the `out_channel`.
+fn read_files(options: Arc<Options>, out_channel: SyncSender<String>) {
+    for file in options.files.iter() {
+        // First, we open the file, ignoring any errors.
+        let file = fs::File::open(file).unwrap();
+        // Then we obtain a `BufReader` for it, which provides the `lines` function.
+        let file = io::BufReader::new(file);
+        for line in file.lines() {
+            let line = line.unwrap();
+            // Now we send the line over the channel, ignoring the possibility of `send` failing.
+            out_channel.send(line).unwrap();
+        }
+    }
+    // When we drop the `out_channel`, it will be closed, which the other end can notice.
+}
+
+// The second function filters the lines it receives through `in_channel` with the pattern, and sends
+// matches via `out_channel`.
+fn filter_lines(options: Arc<Options>, in_channel: Receiver<String>, out_channel: SyncSender<String>) {
+    // We can simply iterate over the channel, which will stop when the channel is closed.
+    for line in in_channel.iter() {
+        // `contains` works on lots of types of patterns, but in particular, we can use it to test whether
+        // one string is contained in another.
+        if line.contains(&options.pattern) {
+            out_channel.send(line).unwrap();                        /*@*/
+        }
+    }
+}
+
+// The third function performs the output operations, receiving the relevant lines on its `in_channel`.
+fn output_lines(options: Arc<Options>, in_channel: Receiver<String>) {
+    match options.output_mode {
+        Print => {
+            // Here, we just print every line we see.
+            for line in in_channel.iter() {
+                println!("{}", line);                               /*@*/
+            }
+        },
+        Count => {
+            // We are supposed to count the number of matching lines. There's a convenient iterator adapter that
+            // we can use for this job.
+            let count = in_channel.iter().count();                  /*@*/
+            println!("{} hits for {}.", count, options.pattern);    /*@*/
+        },
+        SortAndPrint => {
+            // We are asked to sort the matching lines before printing. So let's collect them all in a local vector...
+            let data: Vec<String> = in_channel.iter().collect();
+            // ...and implement the actual sorting later.
+            unimplemented!()
+        }
+    }
+}
+
+// With the operations of the three threads defined, we can now implement a function that performs grepping according
+// to some given options.
+fn run(options: Options) {
+    // We move the `options` into an `Arc`, as that's what the thread workers expect.
+    let options = Arc::new(options);
+
+    // Set up the channels. Use `sync_channel` with buffer-size of 16 to avoid needlessly filling RAM.
+    let (line_sender, line_receiver) = sync_channel(16);
+    let (filtered_sender, filtered_receiver) = sync_channel(16);
+
+    // Spawn the read thread: `thread::spawn` takes a closure that is run in a new thread.
+    //@ The `move` keyword again tells Rust that we want ownership of captured variables to be moved into the
+    //@ closure. This means we need to do the `clone` *first*, otherwise we would lose our `options` to the
+    //@ new thread!
+    let options1 = options.clone();
+    let handle1 = thread::spawn(move || read_files(options1, line_sender));
+
+    // Same with the filter thread.
+    let options2 = options.clone();
+    let handle2 = thread::spawn(move || filter_lines(options2, line_receiver, filtered_sender));
+
+    // And the output thread.
+    let options3 = options.clone();
+    let handle3 = thread::spawn(move || output_lines(options3, filtered_receiver));
+
+    // Finally, wait until all three threads did their job.
+    handle1.join().unwrap();
+    handle2.join().unwrap();
+    handle3.join().unwrap();
+}
+
+// Now we have all the pieces together for testing our `rgrep` with some hard-coded options.
+//@ We need to call `to_string` on string literals to convert them to a fully-owned `String`.
+pub fn main() {
+    let options = Options {
+        files: vec!["src/part10.rs".to_string(), "src/part11.rs".to_string(), "src/part12.rs".to_string()],
+        pattern: "let".to_string(),
+        output_mode: Print
+    };
+    run(options);
+}
+
+// **Exercise 12.1**: Change `rgrep` such that it prints now only the matching lines, but also the name of the file
+// and the number of the line in the file. You will have to change the type of the channels from `String` to something
+// that records this extra information.
+
+//@ ## Ownership, Borrowing, and Concurrency
+//@ The little demo above showed that concurrency in Rust has a fairly simple API. However, considering Rust has closures,
+//@ that should not be entirely surprising. However, as I mentioned in the beginning, Rust ensures that well-typed programs
+//@ do not have data races. How can that be? A data race is typically defined as having two concurrent, unsynchronized
+//@ accesses to the same memory location, at least one of which is a write. In other words, a data race is mutation in
+//@ the presence of aliasing, which Rust reliably rules out! It turns out that the same mechanism that makes our single-threaded
+//@ programs memory safe, and that prevents us from invalidating iterators, also helps secure our multi-threaded code against
+//@ data races. For example, notice how `read_files` sends a `String` to `filter_lines`. At run-time, only the pointer to
+//@ the string will actually be moved around (just like when a `String` is passed to a function with full ownership). However,
+//@ `read_files` has to *give up* ownership of the string to perform `send`, to it is impossible for an outstanding borrow to
+//@ still be around. After it sent the string to the other side, `read_files` has no way to race on the data with someone else.
+//@ 
+//@ However, there is more to this. Remember the `'static` bound we had to add to `register` in the previous part, to make
+//@ sure that the callbacks to not reference any pointers that might become invalid? This is just as crucial for spawning
+//@ a thread: In general, that thread could last for much longer than the current stack frame. Thus, it must not use
+//@ any pointers to data in that stack frame. This is achieved by requiring the `FnOnce` closure passed to `thread::spawn`
+//@ to be valid for lifetime `'static`, as you can see in [its documentation](http://doc.rust-lang.org/stable/std/thread/fn.spawn.html).
+//@ This avoids another kind of data race, where the thread's access races with the callee deallocating its stack frame.
+
+//@ ## Send
+//@ However, the story goes further. I said above that `Arc` is a thread-safe version of `Rc`, which uses atomic operations
+//@ to manipulate the reference count. It is thus crucial that we don't use `Rc` above, or the reference count may become invalid.
+//@ And indeed, if you replace `Arc` by `Rc` (and add the appropriate imports), Rust will tell you that something is wrong.
+//@ That's great, of course, but how did it do that?
+//@ 
+//@ The answer is already hinted at in the error: It will say something about `Send`. You may have noticed that the closure in
+//@ `thread::spawn` does not just have a `'static` bound, but also has to satisfy `Send`. `Send` is a trait, and just like `Copy`,
+//@ it's just a marker - there are no functions provided by `Send` What the trait says is that types which are `Send`, can be
+//@ safely sent to another thread without causing trouble. Of course, all the primitive data-types are `Send`. So is `Arc`,
+//@ which is why Rust accepted our code. But `Rc` is not `Send`, and for a good reason!
+//@ 
+//@ Now, `Send` as a trait is fairly special. It has a so-called *default implementation*. This means that *every type* implements
+//@ `Send`, unless it opts out. Opting out is viral: If your type contains a type that opted out, then you don't have `Send`, either.
+//@ So if the environment of your closure contains an `Rc`, it won't be `Send`, preventing it from causing trouble. If however every
+//@ captured variable *is* `Send`, then so is the entire environment, and you are good.
+
+//@ [index](main.html) | [previous](part11.html) | [next](main.html)
index 8531db62abfb3f0a509e2701789db1df4353a7cf..98e8e8d1aec9bb37d359deefef58dc4fa6ed68ee 100644 (file)
@@ -13,6 +13,7 @@ mod part08;
 mod part09;
 mod part10;
 mod part11;
+mod part12;
 
 // This decides which part is actually run.
 fn main() {
diff --git a/workspace/src/part12.rs b/workspace/src/part12.rs
new file mode 100644 (file)
index 0000000..1d75bfd
--- /dev/null
@@ -0,0 +1,123 @@
+// Rust-101, Part 12: Concurrency (WIP)
+// =================
+
+use std::io::prelude::*;
+use std::{io, fs, thread};
+use std::sync::mpsc::{sync_channel, SyncSender, Receiver};
+use std::sync::Arc;
+
+
+// Before we come to the actual code, we define a data-structure `Options` to store all the information we need
+// to complete the job: Which files to work on, which pattern to look for, and how to output. <br/>
+// Besides just printing all the matching lines, we will also offer to count them, or alternatively to sort them.
+#[derive(Clone,Copy)]
+enum OutputMode {
+    Print,
+    SortAndPrint,
+    Count,
+}
+use self::OutputMode::*;
+
+struct Options {
+    files: Vec<String>,
+    pattern: String,
+    output_mode: OutputMode,
+}
+
+
+// The first functions reads the files, and sends every line over the `out_channel`.
+fn read_files(options: Arc<Options>, out_channel: SyncSender<String>) {
+    for file in options.files.iter() {
+        // First, we open the file, ignoring any errors.
+        let file = fs::File::open(file).unwrap();
+        // Then we obtain a `BufReader` for it, which provides the `lines` function.
+        let file = io::BufReader::new(file);
+        for line in file.lines() {
+            let line = line.unwrap();
+            // Now we send the line over the channel, ignoring the possibility of `send` failing.
+            out_channel.send(line).unwrap();
+        }
+    }
+    // When we drop the `out_channel`, it will be closed, which the other end can notice.
+}
+
+// The second function filters the lines it receives through `in_channel` with the pattern, and sends
+// matches via `out_channel`.
+fn filter_lines(options: Arc<Options>, in_channel: Receiver<String>, out_channel: SyncSender<String>) {
+    // We can simply iterate over the channel, which will stop when the channel is closed.
+    for line in in_channel.iter() {
+        // `contains` works on lots of types of patterns, but in particular, we can use it to test whether
+        // one string is contained in another.
+        if line.contains(&options.pattern) {
+            unimplemented!()
+        }
+    }
+}
+
+// The third function performs the output operations, receiving the relevant lines on its `in_channel`.
+fn output_lines(options: Arc<Options>, in_channel: Receiver<String>) {
+    match options.output_mode {
+        Print => {
+            // Here, we just print every line we see.
+            for line in in_channel.iter() {
+                unimplemented!()
+            }
+        },
+        Count => {
+            // We are supposed to count the number of matching lines. There's a convenient iterator adapter that
+            // we can use for this job.
+            unimplemented!()
+        },
+        SortAndPrint => {
+            // We are asked to sort the matching lines before printing. So let's collect them all in a local vector...
+            let data: Vec<String> = in_channel.iter().collect();
+            // ...and implement the actual sorting later.
+            unimplemented!()
+        }
+    }
+}
+
+// With the operations of the three threads defined, we can now implement a function that performs grepping according
+// to some given options.
+fn run(options: Options) {
+    // We move the `options` into an `Arc`, as that's what the thread workers expect.
+    let options = Arc::new(options);
+
+    // Set up the channels. Use `sync_channel` with buffer-size of 16 to avoid needlessly filling RAM.
+    let (line_sender, line_receiver) = sync_channel(16);
+    let (filtered_sender, filtered_receiver) = sync_channel(16);
+
+    // Spawn the read thread: `thread::spawn` takes a closure that is run in a new thread.
+    let options1 = options.clone();
+    let handle1 = thread::spawn(move || read_files(options1, line_sender));
+
+    // Same with the filter thread.
+    let options2 = options.clone();
+    let handle2 = thread::spawn(move || filter_lines(options2, line_receiver, filtered_sender));
+
+    // And the output thread.
+    let options3 = options.clone();
+    let handle3 = thread::spawn(move || output_lines(options3, filtered_receiver));
+
+    // Finally, wait until all three threads did their job.
+    handle1.join().unwrap();
+    handle2.join().unwrap();
+    handle3.join().unwrap();
+}
+
+// Now we have all the pieces together for testing our `rgrep` with some hard-coded options.
+pub fn main() {
+    let options = Options {
+        files: vec!["src/part10.rs".to_string(), "src/part11.rs".to_string(), "src/part12.rs".to_string()],
+        pattern: "let".to_string(),
+        output_mode: Print
+    };
+    run(options);
+}
+
+// **Exercise 12.1**: Change `rgrep` such that it prints now only the matching lines, but also the name of the file
+// and the number of the line in the file. You will have to change the type of the channels from `String` to something
+// that records this extra information.
+
+
+