My current project makes heavy use of DataTables and parallel processing via the TPL. DataTables are not thread safe and really bite the dust hard when asked to do more than one thing at once.

My project breaks up operations that need to be performed on a DataTable into groups that can be executed concurrently. In total that are about 15 groups each with 2-5 concurrent processes. Each process works with certain columns on a DataRow but still, the DataTable can’t handle more than one change request at a time or it just has a stroke. It’s very annoying. I could easily combat this by using POCOs instead of DataTables but it isn’t an option. So how do I do it? Using what I call the Change Request pattern.

The concept is simple, there are operations reading and making changes to each row in the DataTable. Usually data correction, number crunching, etc. Instead of writing the new value back to the DataColumn, it queues up the change in a thread safe collection using a POCO to describe what has to change. When the operations in the group have finished, the changes are processed one-by-one in a thread safe manner.

private void DoWork(DataTable table)
{
    var chgcrd = new ChangeCoordinator(table);
   
    Task t1 = Task.Factory.StartNew(new Action(() => { UpdateColumn1(table, chgcrd); }));
    Task t2 = Task.Factory.StartNew(new Action(() => { UpdateColumn2(table, chgcrd); }));

    Task.WaitAll(t1, t2);

    chgcrd.ProcessChanges();
}

private void UpdateColumn2(DataTable table, ChangeCoordinator changes)
{
    Parallel.For(0, table.Rows.Count, new Action<int>((i) =>
    {
        int oldValue = table.Rows[i].Field<int>("column2");
        int newValue = oldValue * 5;
        long rowId = table.Rows[i].Field<long>("id");

        changes.ScheduleChange(new ChangeRequest(rowId, "column2", newValue));
    }));
}

private void UpdateColumn1(DataTable table, ChangeCoordinator changes)
{
    Parallel.For(0, table.Rows.Count, new Action<int>((i) =>
    {
        int oldValue = table.Rows[i].Field<int>("column1");
        int newValue = oldValue * 2;
        long rowId = table.Rows[i].Field<long>("id");

        changes.ScheduleChange(new ChangeRequest(rowId, "column1", newValue));                
    }));
}
public class ChangeRequest
{
    public long ID { get; set; }
    public string Column { get; set; }
    public object Value { get; set; }

    public ChangeRequest(long id, string column, object value)
    {
        this.ID = id;
        this.Column = column;
        this.Value = value;
    }
}
public class ChangeCoordinator
{  
    public ChangeCoordinator(DataTable table) 
    {
        _table = table;
    }

    private ConcurrentBag<ChangeRequest> _changes = new ConcurrentBag<ChangeRequest>();
    private DataTable _table;

    public void ScheduleChange(ChangeRequest request)
    {
        _changes.Add(request);
    }

    public void ProcessChanges()
    {
        ChangeRequest cr;
            
        while(_changes.TryTake(out cr)) 
        {
            var row = _table.Rows.Find(cr.ID);
            if (row == null) { continue; }

            row[cr.Column] = cr.Value;
        }

        _table.AcceptChanges();            
    }
}

The code is a dumbed down version of what I’ve implemented, just to show the basic concept. The applications are numerous but I chose to use the DataTable example because it’s a thorn in my side at the moment.

Advertisements