Архитектура Аудит Военная наука Иностранные языки Медицина Металлургия Метрология
Образование Политология Производство Психология Стандартизация Технологии


AsyncProducerConsumerCollection



Tasks can also be used to build data structures for coordinating between asynchronous activities. Consider one of the classic parallel design patterns: producer/consumer. In producer/consumer, producers generate data which is consumed by consumers, and the producers and consumers may run in parallel (e.g. the consumer processing item 1 which was previously generated by a producer now producing item 2). For producer/consumer, we invariably need some data structure to store the work created by producers so that the consumers may be notified of new data and find it when available. Here’s a simple data structure built on top of tasks that enables asynchronous methods to be used as producers and consumers:

public class AsyncProducerConsumerCollection<T>

{

private readonly Queue<T> m_collection = new Queue<T>();

private readonly Queue<TaskCompletionSource<T>> m_waiting =

   new Queue<TaskCompletionSource<T>>();

 

public void Add(T item)

{

   TaskCompletionSource<T> tcs = null;

   lock (m_collection)

   {

       if (m_waiting.Count > 0) tcs = m_waiting.Dequeue();

       else m_collection.Enqueue(item);

   }

   if (tcs != null) tcs.TrySetResult(item);

}

 

public Task<T> Take()

{

   lock (m_collection)

   {

       if (m_collection.Count > 0)

       {

           return Task.FromResult(m_collection.Dequeue());

       }

       else

       {

           var tcs = new TaskCompletionSource<T>();

           m_waiting.Enqueue(tcs);

           return tcs.Task;

        }

   }

}

}

 

With that in place, we can now write code like the following:

private static AsyncProducerConsumerCollection<int> m_data = …;

private static async Task ConsumerAsync()

{

while(true)

{

   int nextItem = await m_data.Take();

   ProcessNextItem(nextItem);

}

}

private static void Produce(int data)

{

m_data.Add(data);
}

 

Included in .NET 4.5 is the System.Threading.Tasks.Dataflow.dll assembly. This assembly includes the BufferBlock<T> type, which may be used in a similar manner and without having to build a custom collection type:

private static BufferBlock<int> m_data = …;

private static async Task ConsumerAsync()

{

while(true)

{

   int nextItem = await m_data.ReceiveAsync();

   ProcessNextItem(nextItem);

}

}

private static void Produce(int data)

{

m_data.Post(data);
}



Interop with Other .NET Asynchronous Patterns and Types

The .NET Framework 1.0 saw the introduction of the IAsyncResult pattern, otherwise known as the Asynchronous Programming Model (APM) pattern, or the Begin/End pattern. The .NET Framework 2.0 then brought with it the event-based asynchronous pattern (EAP). The new TAP deprecates both of its predecessors, while at the same time providing the ability to easily build migration routines from the APM and EAP to TAP.

Tasks and the Asynchronous Programming Model (APM)

From APM to Tasks

The APM pattern relies on two corresponding methods to represent an asynchronous operation: BeginMethodName and EndMethodName. At a high-level, the begin method accepts as parameters to the method the same parameters that would be supplied to the MethodName synchronous method counterpart, as well as also accepting an AsyncCallback delegate and an object state. The begin method then returns an IAsyncResult, which returns from its AsyncState property the object state passed to the begin method. When the asynchronous operation completes, the IAsyncResult’s IsCompleted will start returning true, and its AsyncWaitHandle will be set. Additionally, if the AsyncCallback parameter to the begin method was non-null, the callback will be invoked and passed the same IAsyncResult that was returned from the begin method. When the asynchronous operation does complete, the EndMethodName method is used to join with the operation, retrieving any results or forcing any exceptions that occurred to then propagate. There are further details around the IAsyncResult’s CompletedSynchronously property that are beyond the scope of this document; for more information, see MSDN.

Given the very structured nature of the APM pattern, it is quite easy to build a wrapper for an APM implementation to expose it as a TAP implementation. In fact, the .NET Framework 4 includes helper routines in the form of TaskFactory.FromAsync to provide this translation.

Consider the .NET Stream class and its BeginRead/EndRead methods, which represent the APM counterpart to the synchronous Read method:

public int Read(

byte [] buffer, int offset, int count);

public IAsyncResult BeginRead(

byte [] buffer, int offset, int count,

AsyncCallback callback, object state);

public int EndRead(IAsyncResult asyncResult);

 

Utilizing FromAsync, we can implement a TAP wrapper for this method as follows:

public static Task<int> ReadAsync(

this Stream stream, byte [] buffer, int offset, int count)

{

if (stream == null) throw new ArgumentNullException(“stream”);

return Task<int>.Factory.FromAsync(stream.BeginRead, stream.EndRead,

   buffer, offset, count, null);

}

 

This implementation that utilizes FromAsync is effectively equivalent to the following:

public static Task<int> ReadAsync(

this Stream stream, byte [] buffer, int offset, int count)

{

if (stream == null) throw new ArgumentNullException(“stream”);

var tcs = new TaskCompletionSource<int>();

stream.BeginRead(buffer, offset, count, iar =>

{

   try { tcs.TrySetResult(stream.EndRead(iar)); }

   catch(OperationCanceledException) { tcs.TrySetCanceled(); }

   catch(Exception exc) { tcs.TrySetException(exc); }

}, null);

return tcs.Task;

}

 

From Tasks to APM

For cases where existing infrastructure expects code to implement the APM pattern, it is also important to be able to be able to take a TAP implementation and use it where an APM implementation is expected. Thanks to the composability of tasks, and the fact that Task itself implements IAsyncResult, this is achievable with a straightforward helper function (shown here as an extension for Task<TResult>, but an almost identical function may be used for the non-generic Task):

public static IAsyncResult AsApm<T>(

this Task<T> task, AsyncCallback callback, object state)

{

if (task == null) throw new ArgumentNullException(“task”);

var tcs = new TaskCompletionSource<T>(state);

task.ContinueWith(t =>

{

   if (t.IsFaulted) tcs.TrySetException(t.Exception.InnerExceptions)

   else if (t.IsCanceled) tcs.TrySetCanceled();

   else tcs.TrySetResult(t.Result);

 

   if (callback != null) callback(tcs.Task);

}, TaskScheduler.Default);

return tcs.Task;

}

 

Now, consider a case where we have a TAP implementation:

public static Task<string> DownloadStringAsync(Uri url);

 

and we need to provide an APM implementation:

public IAsyncResult BeginDownloadString(

Uri url, AsyncCallback callback, object state);

public string EndDownloadString(IAsyncResult asyncResult);

 

This is achievable with the following code:

public IAsyncResult BeginDownloadString(

Uri url, AsyncCallback callback, object state)

{

return DownloadStringAsync(url).AsApm(callback, state);

}

 

public string EndDownloadString(IAsyncResult asyncResult)

{

return ((Task<string>)asyncResult).Result;

}


Поделиться:



Последнее изменение этой страницы: 2019-06-09; Просмотров: 194; Нарушение авторского права страницы


lektsia.com 2007 - 2024 год. Все материалы представленные на сайте исключительно с целью ознакомления читателями и не преследуют коммерческих целей или нарушение авторских прав! (0.022 с.)
Главная | Случайная страница | Обратная связь