如何:使用并行类循环访问文件目录

在很多情况下,可以轻松地对文件迭代操作进行并行化。 主题如何:使用 PLINQ 循环访问文件目录介绍了在很多情况下执行此任务的最简单方式。 然而,如果您的代码必须处理访问文件系统时可能引发的许多类型的异常,情况可能会变得很复杂。 下面的示例演示该问题的一种解决方法。 它使用基于堆栈的迭代遍历指定目录下的所有文件和文件夹,并且它允许您的代码捕获和处理各种异常。 当然,处理异常的方式取决于您。

示例

在下面的示例中,对目录的迭代按顺序执行,但是对文件的处理并行执行。 文件与目录的比率较大时,这可能是最佳方法。 还可以并行化目录迭代,并按顺序访问每个文件。 并行化两个循环可能还不够,除非专门针对具有大量处理器的计算机。 然而,与在所有情况下一样,应该全面测试您的应用程序以确定最佳方法。

Imports System
Imports System.Collections.Generic
Imports System.Diagnostics
Imports System.IO
Imports System.Linq
Imports System.Text
Imports System.Threading
Imports System.Threading.Tasks

Module Parallel_File
    Sub Main(ByVal args() As String)
        TraverseTreeParallelForEach("C:\Program Files", Sub(f)

                                                            ' For this demo we don't do anything with the data
                                                            ' except to read it.
                                                            Dim data() As Byte = File.ReadAllBytes(f)

                                                            ' For user interest, although it slows down the operation.
                                                            Console.WriteLine(f)
                                                        End Sub)

        ' Keep the console window open.
        Console.ReadKey()
    End Sub



    Public Sub TraverseTreeParallelForEach(ByVal root As String, ByVal action As Action(Of String))


        'Count of files traversed and timer for diagnostic output
        Dim fileCount As Integer = 0
        Dim sw As Stopwatch = Stopwatch.StartNew()

        ' Use this value to determine whether to parallelize
        ' file processing on each folder.
        Dim procCount As Integer = System.Environment.ProcessorCount

        ' Data structure to hold names of subfolders to be
        ' examined for files.
        Dim dirs As Stack(Of String) = New Stack(Of String)

        If System.IO.Directory.Exists(root) = False Then

            Throw New ArgumentException()
        End If
        dirs.Push(root)

        While (dirs.Count > 0)

            Dim currentDir As String = dirs.Pop()
            Dim subDirs() As String = Nothing
            Dim files() As String = Nothing

            Try
                subDirs = System.IO.Directory.GetDirectories(currentDir)
                ' An UnauthorizedAccessException exception will be thrown if we do not have
                ' discovery permission on a folder or file. It may or may not be acceptable 
                ' to ignore the exception and continue enumerating the remaining files and 
                ' folders. It is also possible (but unlikely) that a DirectoryNotFound exception 
                ' will be raised. This will happen if currentDir has been deleted by
                ' another application or thread after our call to Directory.Exists. The 
                ' choice of which exceptions to catch depends entirely on the specific task 
                ' you are intending to perform and also on how much you know with certainty 
                ' about the systems on which this code will run.
            Catch e As UnauthorizedAccessException

                Console.WriteLine(e.Message)
                Continue While

            Catch e As System.IO.DirectoryNotFoundException

                Console.WriteLine(e.Message)
                Continue While
            End Try

            Try
                files = System.IO.Directory.GetFiles(currentDir)
            Catch e As UnauthorizedAccessException

                Console.WriteLine(e.Message)
                Continue While


            Catch e As System.IO.DirectoryNotFoundException

                Console.WriteLine(e.Message)
                Continue While
            End Try

            ' Perform the required action on each file here in parallel
            ' if there are a sufficient number of files in the directory
            ' or else sequentially if not. Files are opened and processed
            ' synchronously but this could be modified to perform async I/O.
            Try

                If files.Length < procCount Then

                    For Each file In files

                        action(file)
                        fileCount = fileCount + 1
                    Next
                Else
                    Parallel.ForEach(files, Function() 0, Function(file, loopState, localCount)
                                                              action(file)
                                                              localCount = localCount + 1
                                                              Return CType(localCount, Integer)
                                                          End Function,
                    Sub(c)
                        Interlocked.Exchange(fileCount, fileCount + c)
                    End Sub)
                End If
            Catch ae As AggregateException
                ae.Handle(Function(ex)

                              If TypeOf (ex) Is UnauthorizedAccessException Then

                                  ' Here we just output a message and go on.
                                  Console.WriteLine(ex.Message)
                                  Return True
                              End If
                              ' Handle other exceptions here if necessary...

                              Return False
                          End Function)
            End Try
            ' Push the subdirectories onto the stack for traversal.
            ' This could also be done before handing the files.
            For Each str As String In subDirs
                dirs.Push(str)
            Next

            ' For diagnostic purposes.
            Console.WriteLine("Processed {0}  files in {1}  milleseconds", fileCount, sw.ElapsedMilliseconds)
        End While
End Sub
End Module
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Parallel_File
{
    class Program
    {

        static void Main(string[] args)
        {            

            TraverseTreeParallelForEach(@"C:\Program Files", (f) =>
            {
                // For this demo we don't do anything with the data
                // except to read it.
                byte[] data = File.ReadAllBytes(f);

                // For user interest, although it slows down the operation.
                Console.WriteLine(f);
            });

            // Keep the console window open.
            Console.ReadKey();
        }



        public static void TraverseTreeParallelForEach(string root, Action<string> action)
        {

            //Count of files traversed and timer for diagnostic output
            int fileCount = 0;
            var sw = Stopwatch.StartNew();

            // Use this value to determine whether to parallelize
            // file processing on each folder.
            int procCount = System.Environment.ProcessorCount;

            // Data structure to hold names of subfolders to be
            // examined for files.
            Stack<string> dirs = new Stack<string>();

            if (!System.IO.Directory.Exists(root))
            {
                throw new ArgumentException();
            }
            dirs.Push(root);

            while (dirs.Count > 0)
            {
                string currentDir = dirs.Pop();
                string[] subDirs = null;
                string[] files = null;

                try
                {
                    subDirs = System.IO.Directory.GetDirectories(currentDir);
                }
                // An UnauthorizedAccessException exception will be thrown if we do not have
                // discovery permission on a folder or file. It may or may not be acceptable 
                // to ignore the exception and continue enumerating the remaining files and 
                // folders. It is also possible (but unlikely) that a DirectoryNotFound exception 
                // will be raised. This will happen if currentDir has been deleted by
                // another application or thread after our call to Directory.Exists. The 
                // choice of which exceptions to catch depends entirely on the specific task 
                // you are intending to perform and also on how much you know with certainty 
                // about the systems on which this code will run.
                catch (UnauthorizedAccessException e)
                {
                    Console.WriteLine(e.Message);
                    continue;
                }
                catch (System.IO.DirectoryNotFoundException e)
                {
                    Console.WriteLine(e.Message);
                    continue;
                }

                try
                {
                    files = System.IO.Directory.GetFiles(currentDir);
                }

                catch (UnauthorizedAccessException e)
                {
                    Console.WriteLine(e.Message);
                    continue;
                }

                catch (System.IO.DirectoryNotFoundException e)
                {
                    Console.WriteLine(e.Message);
                    continue;
                }

                // Perform the required action on each file here in parallel
                // if there are a sufficient number of files in the directory
                // or else sequentially if not. Files are opened and processed
                // synchronously but this could be modified to perform async I/O.
                try
                {
                    if (files.Length < procCount)
                    {
                        foreach (var file in files)
                        {
                            action(file);
                            fileCount++;                            
                        }
                    }
                    else
                    {

                        Parallel.ForEach(files, () => 0, (file, loopState, localCount) =>
                        {
                            action(file);
                            return (int) ++localCount;

                        },
                        (c) =>
                        {
                            Interlocked.Exchange(ref fileCount, fileCount + c);                          
                        });
                    }
                }
                catch (AggregateException ae)
                {
                    ae.Handle((ex) =>
                        {
                            if (ex is UnauthorizedAccessException) 
                            {
                                // Here we just output a message and go on.
                                Console.WriteLine(ex.Message);
                                return true;
                            }
                            // Handle other exceptions here if necessary...

                            return false;
                        });
                }

                // Push the subdirectories onto the stack for traversal.
                // This could also be done before handing the files.
                foreach (string str in subDirs)
                    dirs.Push(str);
            }

            // For diagnostic purposes.
            Console.WriteLine("Processed {0} files in {1} milleseconds", fileCount, sw.ElapsedMilliseconds);
        }
    }
}

在此示例中,文件 I/O 同步执行。 处理大文件或者网络连接较慢时,异步访问文件可能更好。 可以将异步 I/O 技术与并行循环处理结合使用。 有关更多信息,请参见 TPL 和传统 .NET 异步编程

请注意,如果在主线程上引发异常,则 ForEach 方法启动的线程可继续运行。 若要停止这些线程,可以在异常处理程序中设置一个布尔变量,并在并行循环的每次迭代中检查它的值。 如果值表明已引发异常,请使用 ParallelLoopState 变量停止循环或从循环中断。 有关更多信息,请参见如何:停止或中断 Parallel.For 循环

请参见

概念

数据并行(任务并行库)