2020年8月22日星期六

C#中异步多线程的常见用法

  此篇文章是我一个小白对异步多线程的理解和总结,请高手多多批评指教。废话不说了,先来看几个基本概念(纯属个人见解,可能不准确):

进程:程序运行时,占用的全部运行资源的总和。

线程:线程是隶属于操作系统管理的,也可以有自己的计算资源,是程序执行流的最小单位。任何的操作都是由线程来完成的。

多线程:多核cpu协同工作,多个执行流同时运行,是用资源换时间。(单核cpu,不存在所谓的多线程)。

Thread

  Thread的对象是非线程池中的线程,有自己的生命周期(有创建和销毁的过程),所以不可以被重复利用(一个操作中,不会出现二个相同Id的线程)。

Thread的常见用法:

private void button5_Click(object sender, EventArgs e) {  
Console.WriteLine($"===============Method start time is {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")},Thread ID is {Thread.CurrentThread.ManagedThreadId},is back ground: {Thread.CurrentThread.IsBackground}==================="); //开启一个线程,构造方法可重载两种委托,一个是无参无返回值,一个是带参无返回值 Thread thread = new Thread(a => DoSomeThing("Thread")); //当前线程状态 Console.WriteLine($"thread's state is {thread.ThreadState},thread's priority is {thread.Priority} ,thread is alived :{thread.IsAlive},thread is background:{thread.IsBackground},thread is pool threads: {thread.IsThreadPoolThread}"); //告知操作系统,当前线程可以被执行了。 thread.Start(); //阻塞当前执行线程,等待此thread线程实例执行完成。 thread.Join(); //最大等待的时间是5秒(不管是否执行完成,不再等待) thread.Join(5000); Console.WriteLine($"===============Method end time is {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")},,Thread ID is {Thread.CurrentThread.ManagedThreadId},is back ground: {Thread.CurrentThread.IsBackground}==================="); }
private void DoSomeThing(string name) { Console.WriteLine($"do some thing start time is {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")},Thread ID is {Thread.CurrentThread.ManagedThreadId},is back ground: {Thread.CurrentThread.IsBackground}"); long result = 0; for (long i = 0; i < 10000 * 10000; i++) { result += i; } Console.WriteLine($"do some thing end time is {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")},Thread ID is {Thread.CurrentThread.ManagedThreadId},is back ground: {Thread.CurrentThread.IsBackground}"); }

注意 :thread 默认是前台线程,启动后一定要完成任务的,即使程序关掉(进程退出)也要执行完。可以把thread 指定为后台线程,随着进程的退出而终止。

//false,默认是前台线程,启动后一定要完成任务的,即使程序关掉(进程退出)也要执行完。Console.WriteLine(thread.IsBackground); thread.IsBackground = true;//指定为后台线程。(随着进程的退出而退出)

Thread的回调用法:

Thread没有像Framework中的delegate的回调用法,如果需要回调得自动动手改造:

private void CallBack(Action action, Action calback){ Thread thread = new Thread(() => { action(); calback(); }); thread.Start();}//无参无返回值CallBack(() => Console.WriteLine("好吗?"), () => Console.WriteLine("好的!"));
private Func<T> CallBackReturn<T>(Func<T> func){ T t = default(T); Thread thread = new Thread(() => {  t = func(); }); thread.Start(); return () => {  thread.Join();  return t; };}//带返回值得用法Func<int> func = CallBackReturn<int>(() => DateTime.Now.Second);Console.WriteLine("线程未阻塞");int result = func.Invoke();Console.WriteLine("result:" + result);

ThreadPool 线程池

Thread的功能太过强大,像我这样的小白是用不好的(之前在项目中大量使用Thread的API,出现了许多意想不到的bug)。线程池中的线程在同一操作中可以被重复利用。

 //开启多线程 ThreadPool.QueueUserWorkItem(n => DoSomeThing("ThreadPool"));

 个人觉得尽量不要阻塞线程池的线程,因为线程池里的线程数量是有限的,当线程池中没有线程可用时,会出现死锁。如果非要等待,用法如下:

ManualResetEvent manualResetEvent = new ManualResetEvent(false);ThreadPool.QueueUserWorkItem(n =>{ DoSomethingLong("ThreadPool"); manualResetEvent.Set();});//等待线程完成manualResetEvent.WaitOne();

 Task

Task是基于ThreadPool的基础上做的封装,属于线程池中的线程。

Task启动多线程的方式:

方式一:指定任务的开始时机

/// <summary>/// 使用Task或Task<T>创建任务,需指定任务的开始时机(任务调度)。/// </summary>public static void Demo1(){ Task task = new Task(() => {  Thread.Sleep(3000); 
Console.WriteLine($"Current thread id is {Thread.CurrentThread.ManagedThreadId}"); }); task.Start();//任务调度(开始任务) Console.WriteLine($"Current thread name is {Thread.CurrentThread.ManagedThreadId}"); Console.WriteLine($"当前任务状态:{task.Status}"); task.Wait(); //等待任务执行完成 Console.WriteLine($"当前任务状态:{task.Status}");}

 

 

方式二:一步完成多线程的创建和启动

/// <summary>/// 使用Task.Run()方法一步完成多线程的创建和启动(当前线程立即准备启动任务)。/// <remark>/// 如果不需要对任务的创建和调度做更多操作,Task.Run()方法是创建和启动任务的首选方式。/// </remark>/// </summary>public static void Demo2(){ Task task = Task.Run(() => { Thread.Sleep(3000); Console.WriteLine($"Current thread id is {Thread.CurrentThread.ManagedThreadId}"); }); task.Wait(); //等待,直到任务完成}

 

方式三:需要想多线程任务传递状态参数

/// <summary>/// Task和Task<TResult>都有静态属性Factory,它返回默认的实例TaskFactory./// 使用Task.Factory.StartNew()方法也可以一步完成任务的创建和启动。/// 当前需要向任务传递一个状态(参数)。可以使用此方法。/// </summary>public static void Demo3(){ Task[] tasks = new Task[10]; for (int i = 0; i < tasks.Length; i++) {  tasks[i] = Task.Factory.StartNew((obj) =>  {   CustomData data = obj as CustomData;   data.ThreadId = Thread.CurrentThread.ManagedThreadId;  }, new CustomData { CreationTime = DateTime.Now.Ticks, Index = i}); }
//以阻塞当前线程的方式,等待所以子线程的完成 Task.WaitAll(tasks); foreach (var task in tasks) { //通过任务的AsyncState属性,可以获取任务状态(提供给任务的参数). var data = task.AsyncState as CustomData; Console.WriteLine(JsonConvert.SerializeObject(data)); }}//Task.Factory.StartNew() 调用无返回值的任务//Task<TResult>.Factory.StartNew() 调用有返回值的任务

 

Task<TResult>

public static void Demo4(){ Task<Double>[] tasks = {Task<Double>.Factory.StartNew(() => DoComputation(1.0)),Task<Double>.Factory.StartNew(() => DoComputation(100.0)),Task<Double>.Factory.StartNew(() => DoComputation(1000.0)) }; var results = new Double[tasks.Length]; Double sum = 0; for (int i = 0; i < tasks.Length; i++) {  //Task<TResult>.Result属性包含任务的计算结果,如果在任务完成之前调用,则会阻塞线程直到任务完成  results[i] = tasks[i].Result;   Console.Write("{0:N1} {1}", results[i],       i == tasks.Length - 1 ? "= " : "+ ");  sum += results[i]; } Console.WriteLine("{0:N1}", sum);}private static Double DoComputation(Double start){ Double sum = 0; for (var value = start; value <= start + 10; value += .1)  sum += value; return sum;}

 

Task的常用API

WaitAny和WaitAll,会阻塞当前线程(主线程)的执行:

List<Task> tasks = new List<Task>();tasks.Add(Task.Run(() => DoSomeThing("Task1")));tasks.Add(Task.Run(() => DoSomeThing("Task2")));tasks.Add(Task.Run(() => DoSomeThing("Task3")));//阻塞当前线程的执行,等待任意一个子线程任务完成后继续往下执行Task.WaitAny(tasks.ToArray());//阻塞当前线程的执行,等待所有子线程任务完成后继续往下执行Task.WaitAll(tasks.ToArray());

 

WhenAll和WhenAny,是通过返回一个Task 对象的方式,来达到非阻塞式的等待

//不阻塞当前线程的执行,等待所有子线程任务完成后,异步执行后续的操作Task.WhenAll(tasks).ContinueWith(t =>{ Console.WriteLine($"不阻塞,{Thread.CurrentThread.ManagedThreadId}");});//工厂模式的实现Task.Factory.ContinueWhenAll(tasks.ToArray(), s =>{Console.WriteLine("不阻塞" + s.Length);});

ContinueWith,是一个实例方式,并且返回Task实例,所以可以使用这种链式结构来完成按顺序执行。

public static void Demo8(){ var task = Task.Factory  .StartNew(() => { Console.WriteLine("1"); return 10; })  .ContinueWith(i => { Console.WriteLine("2"); return i.Result + 1; })  .ContinueWith(i => { Console.WriteLine("3"); return i.Result + 1; }); Console.WriteLine(task.Result);}

控制线程数量的使用,(核心思想来自别人,我感觉控制的很好):

/// <summary>/// 线程数量的控制/// </summary>/// <param name="sender"></param>/// <param name="e"></param>private void Test(object sender, EventArgs e){ //完成10000个任务,但只要11个线程。 List<int> intList = new List<int>(); for (int i = 0; i < 10000; i++) {  intList.Add(i); } Action<int> action = i => {  Console.WriteLine(Thread.CurrentThread.ManagedThreadId);  Thread.Sleep(new Random(i).Next(100, 300)); }; List<Task> tasks = new List<Task>(); foreach (var item in intList) {  int i = item;  tasks.Add(Task.Run(() => action(i)));  //当已使用了11个线程的时候,即时释放已完成的线程。  if (tasks.Count > 10)  {   Task.WaitAny(tasks.ToArray());   tasks = tasks.Where(n => n.Status != TaskStatus.RanToCompletion).ToList();  } } Task.WaitAll(tasks.ToArray());}

 注意:应当避免在子线程委托的内部直接使用主线程变量(闭包的弊端问题)

 public static void Demo5() {  Task[] taskArray = new Task[10];  for (int i = 0; i < taskArray.Length; i++)  {   taskArray[i] = Task.Factory.StartNew(() =>   {    //当您使用lambda表达式创建委托时,虽然可以访问变量范围内可见的所有变量。    //但是在某些情况下(最明显的是在循环中),lambda不能像预期的那样捕获变量    //(本例中,它只能捕获最后一个值,而不每次迭代的值)。    //因为任务的运行时机不确定。可以通过传递参数的方式,避免此问题的发生。    Console.WriteLine(i);//输出10个10   });  }  Task.WaitAll(taskArray); }
创建分离的子任务

  在父任务中创建子任务,如果未指定AttachedToParent选项时,子任务不会与父任务同步。

public static void Demo9(){ //创建父任务 var outer = Task.Run(() => {  Console.WriteLine("父任务开始启动!");  //创建子任务  var child = Task.Run(() =>  {   Thread.SpinWait(5000000);   Console.WriteLine("分离的任务完成");  }); }); outer.Wait(); //父任务不会等待子任务的完成 Console.WriteLine("父任务完成.");}

当在任务中运行的代码使用AttachedToParent选项创建新任务时,新任务称为父任务的附加子任务。可以使用AttachedToParent选项来表达结构化任务并行性,因为父任务隐式等待所有附加的子任务完成。

 public static void Demo10() {  var parent = Task.Factory.StartNew(() => {   Console.WriteLine("Parent task beginning.");   for (int i = 0; i < 10; i++)   {    Task.Factory.StartNew((x) => {     Thread.SpinWait(5000000);     Console.WriteLine("Attached child #{0} completed.",x);    }, i, TaskCreationOptions.AttachedToParent);   }  });  parent.Wait();  Console.WriteLine("Parent task completed."); }

注意:如果父任务启动DenyChildAttach选项,子任务即时启用AttachedToParent选项也不会附加到父任务。

Parallelparallel为并行计算,主线程也参与计算
 //Parallel.For:public static void Main(string[] args) {  //计算目录的大小  long totalSize = 0;  String[] files =Directory.GetFiles(@"C:\Users\Administrator\Desktop");  Parallel.For(0, files.Length,     index => {      FileInfo fi = new FileInfo(files[index]);      long size = fi.Length;      Interlocked.Add(ref totalSize, size); //将两个64位整数相加,并用和替换第一个整数,作为     });  Console.WriteLine("{0:N0} files, {1:N0} bytes", files.Length, totalSize); }
//旋转图片 static void Main(string[] args) {  // A simple source for demonstration purposes. Modify this path as necessary.  string[] files = Directory.GetFiles(@"C:\Users\Administrator\Desktop\test");  string newDir = @"C:\Users\Administrator\Desktop\test\Modified";  if (!Directory.Exists(newDir))   Directory.CreateDirectory(newDir);  // Method signature: Parallel.ForEach(IEnumerable<TSource> source, Action<TSource> body)  Parallel.ForEach(files, (currentFile) =>  {   // The more computational work you do here, the greater   // the speedup compared to a sequential foreach loop.   string filename = Path.GetFileName(currentFile);   var bitmap = new Bitmap(currentFile);   bitmap.RotateFlip(RotateFlipType.Rotate180FlipNone);   bitmap.Save(Path.Combine(newDir, filename));   // Peek behind the scenes to see how work is parallelized.   // But be aware: Thread contention for the Console slows down parallel loops!!!   Console.WriteLine($"Processing {filename} on thread {Thread.CurrentThread.ManagedThreadId}");   //close lambda expression and method invocation  }); }

 

 分区局部变量

 static void TestParallForeach() {  int[] array = Enumerable.Range(1, 100).ToArray();  long totalNum = 0;  //int 为集合元素类型  //long 为分区局部变量类型  Parallel.ForEach<int, long>(array, //源集合   () => 0, //初始化局部分区变量,每个分区执行一次   (index, state, subtotal) => //每次迭代的时候执行   {    subtotal += index; //修改分区局部变量    return subtotal; //传递给当前分区的下一次迭代   },   //每个分区结束的时间执行,并将该分区最后一次迭代的局部分区变量传递过来。   (finalTotal) => Interlocked.Add(ref totalNum, finalTotal)   );  /*重载方式:public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<TSource> source,  Func<TLocal> localInit, Func<TSource, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally);  TSource:源数据类型。 source:源数据,必须实现 IEnumerable<T>接口。  TLocal:局部分区变量类型。localInit:初始化局部分区变量的函数。每个分区都是执行此函数一次。  body:并行循环的每次迭代都是调用此方法。  body.TSource:当前元素。body.ParallelLoopState:ParallelLoopState类型的变量,可用来检索循环的状态。  body.TLocal.1:局部分区变量。  body.TLocal.2:返回值。将其传递给特定分区循环的下一个迭代。  localFinally:每个分区的循环完成时调用此委托。*/  Console.WriteLine(totalNum);

未完待续......

 

C#中异步多线程的常见用法订单处理物流入门快速致胜法国电商市场 法国乐天priceminister旺季来临!如何突破亚马逊站内流量?国际电商业务同比增长23%,疫情加速阿里全球扩张干货分享|亚马逊品牌备案2017中秋去广州百万葵园好玩吗?有什么花?惠州哪里好玩?惠州有什么好玩的地方?从深圳怎么坐高铁去武汉,在广州哪个站转,怎么转?

没有评论:

发表评论