Архитектура Аудит Военная наука Иностранные языки Медицина Металлургия Метрология
Образование Политология Производство Психология Стандартизация Технологии


From Tasks to WaitHandles



As previously mentioned, the Task class implements IAsyncResult, and its IAsyncResult implementation exposes an AsyncWaitHandle property which returns a WaitHandle that will be set when the Task completes. As such, getting a WaitHandle for a Task is accomplished as follows:

WaitHandle wh = ((IAsyncResult)task).AsyncWaitHandle;

 

Case Study: CopyToAsync

The ability to copy one stream to another is a useful and common operation. The Stream.CopyTo instance method was added in .NET 4 to accommodate scenarios that require this functionality, such as downloading the data at a specified URL:

public static byte[] DownloadData(string url)

{

using(var request = WebRequest.Create(url))

using(var response = request.GetResponse())

using(var responseStream = response.GetResponseStream())

using(var result = new MemoryStream())

{

   responseStream.CopyTo(result);

   return result.ToArray();

}

}

 

We would like to be able to implement a method like the above with the Task-based Asynchronous Pattern so as to improve responsiveness and scalability. We might attempt to do so as follows:

public static async Task<byte[]> DownloadDataAsync(string url)

{

using(var request = WebRequest.Create(url))

{

   return await Task.Run(() =>

   {

       using(var response = request.GetResponse())

         using(var responseStream = response.GetResponseStream())

        using(var result = new MemoryStream())

        {

           responseStream.CopyTo(result);

           return result.ToArray();

        }

   }

}

}

 

This implementation would improve responsiveness if utilized, for example, from a UI thread, as it offloads from the calling thread the work of downloading the data from the network stream and copying it to the memory stream which will ultimately be used to yield the downloaded data as an array. However, this implementation does not help with scalability, as it’s still performing synchronous I/O and blocking a ThreadPool thread in the process while waiting for data to be downloaded. Instead, we would like to be able to write the following function:

public static async Task<byte[]> DownloadDataAsync(string url)

{

using(var request = WebRequest.Create(url))

using(var response = await request.GetResponseAsync())

using(var responseStream = response.GetResponseStream())

using(var result = new MemoryStream())

{

   await responseStream.CopyToAsync(result);

   return result.ToArray();

}

}

 

Unfortunately, while Stream has a synchronous CopyTo method, in .NET 4 it lacks an asynchronous CopyToAsync method. We will now walk through providing such an implementation.

A synchronous CopyTo method could be implemented as follows:

public static void CopyTo(this Stream source, Stream destination)

{

var buffer = new byte[0x1000];

int bytesRead;

while((bytesRead = source.Read(buffer, 0, buffer.Length)) > 0)

{

   destination.Write(buffer, 0, bytesRead);

}

}

 

To provide an asynchronous implementation of CopyTo, utilizing the compiler’s ability to implement the TAP, we can modify this implementation slightly:

public static async Task CopyToAsync(this Stream source, Stream destination)

{

var buffer = new byte[0x1000];

int bytesRead;

while((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length)) > 0)

{

   await destination.WriteAsync(buffer, 0, bytesRead);

}

}

 

Here, we changed the return type from void to Task, we utilized ReadAsync instead of Read and WriteAsync instead of Write, and we prefixed the calls to ReadAsync and WriteAsync with the await contextual keyword. Following the pattern, we also renamed our method by appending “Async” as a suffix. The ReadAsync and WriteAsync don’t exist in .NET 4, but they could be implemented with one statement based on Task.Factory.FromAsync as described in the “Tasks and the Asynchronous Programming Model” section of this docment:

public static Task<int> ReadAsync(

this Stream source, byte [] buffer, int offset, int count)

{

return Task<int>.Factory.FromAsync(source.BeginRead, source.EndRead,

   buffer, offset, count, null);

}

 

public static Task WriteAsync(

this Stream destination, byte [] buffer, int offset, int count)

{

return Task.Factory.FromAsync(

   destination.BeginWrite, destination.EndWrite,

   buffer, offset, count, null);

}

 

With these methods in hand, we can now successfully implement the CopyToAsync method. We can also optionally support cancellation in the method by adding a CancellationToken that will, for example, be monitored during the copy after every read and write pair (if ReadAsync and/or WriteAsync supported cancellation, the CancellationToken could also be threaded into those calls):

public static async Task CopyToAsync(

this Stream source, Stream destination,

CancellationToken cancellationToken)

{

var buffer = new byte[0x1000];

int bytesRead;

while((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length)) > 0)

{

   await destination.WriteAsync(buffer, 0, bytesRead);

   cancellationToken.ThrowIfCancellationRequested();

}

}

 

(Note that such cancellation could also be useful in a synchronous implementation of CopyTo, and the ability to pass in a CancellationToken enables this. Approaches that would rely on a cancelable object being returned from the method would receive that object too late, since by the time the synchronous call completed, there would be nothing left to cancel.)

We could also add support for progress notification, including how much data has thus far been copied:

public static async Task CopyToAsync(

this Stream source, Stream destination,

CancellationToken cancellationToken,

IProgress<long> progress)

{

var buffer = new byte[0x1000];

int bytesRead;

long totalRead = 0;

while((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length)) > 0)

{

   await destination.WriteAsync(buffer, 0, bytesRead);

   cancellationToken.ThrowIfCancellationRequested();

   totalRead += bytesRead;

   progress.Report(totalRead);

}

}

 

With this method in hand, we can now fully implement our DownloadDataAsync method, including now adding in cancellation and progress support:

public static async Task<byte[]> DownloadDataAsync(

string url,

CancellationToken cancellationToken,

IProgress<long> progress)

{

using(var request = WebRequest.Create(url))

using(var response = await request.GetResponseAsync())

using(var responseStream = response.GetResponseStream())

using(var result = new MemoryStream())

{

   await responseStream.CopyToAsync(

       result, cancellationToken, progress);

   return result.ToArray();

}

}

 

Further optimizations are also possible for our CopyToAsync method. For example, if we were to use two buffers instead of one, we could be writing the previously read data while reading in the next piece of data, thus overlapping latencies if both the read and the write are utilizing asynchronous I/O:

public static async Task CopyToAsync(this Stream source, Stream destination)

{

int i = 0;

var buffers = new [] { new byte[0x1000], new byte[0x1000] };

Task writeTask = null;

while(true)

{

   var readTask = source.ReadAsync(buffers[i], 0, buffers[i].Length))>0;

   if (writeTask != null) await Task.WhenAll(readTask, writeTask);

   int bytesRead = await readTask;

   if (bytesRead == 0) break;

   writeTask = destination.WriteAsync(buffers[i], 0, bytesRead);

   i ^= 1; // swap buffers

}

}

 

Another optimization is to eliminate unnecessary context switches. As mentioned earlier in this document, by default awaiting on a Task will transition back to the SynchronizationContext that was current when the await began. In the case of the CopyToAsync implementation, there’s no need to employ such transitions, since we’re not manipulating any UI state. We can take advantage of the Task.ConfigureAwait method to disable this automatic switch. For simplicity, changes are shown on the original asynchronous implementation from above:

public static Task CopyToAsync(this Stream source, Stream destination)

{

var buffer = new byte[0x1000];

int bytesRead;

while((bytesRead = await

   source.ReadAsync(buffer, 0, buffer.Length).ConfigureAwait(false)) > 0)

{

   await destination.WriteAsync(buffer, 0, bytesRead)

       .ConfigureAwait(false);

}

}

 

 


Поделиться:



Последнее изменение этой страницы: 2019-06-09; Просмотров: 189; Нарушение авторского права страницы


lektsia.com 2007 - 2024 год. Все материалы представленные на сайте исключительно с целью ознакомления читателями и не преследуют коммерческих целей или нарушение авторских прав! (0.036 с.)
Главная | Случайная страница | Обратная связь