sourcetip

애처롭게 느린 Azure 테이블 배치 작업 삽입 및 삭제

fileupload 2023. 5. 3. 21:39
반응형

애처롭게 느린 Azure 테이블 배치 작업 삽입 및 삭제

Azure 테이블 스토리지를 사용할 때 엄청난 성능 병목 현상이 발생하고 있습니다.저는 테이블을 일종의 캐시로 사용하기를 원하기 때문에 프로세스가 길어지면 수백에서 수천 행의 데이터가 생성될 수 있습니다.그런 다음 파티션 및 행 키로 데이터를 빠르게 쿼리할 수 있습니다.

쿼리는 매우 빠르게 작동합니다(파티션 및 행 키만 사용하는 경우 매우 빠르며, 약간 느리지만 특정 일치 항목에 대한 속성을 검색하는 경우에도 허용됩니다).

그러나 행을 삽입하고 삭제하는 작업은 매우 느립니다.

명확화

저는 100개의 아이템을 한 번에 삽입하는 데도 몇 초가 걸린다는 것을 분명히 하고 싶습니다.이는 단순히 수천 개 행의 총 처리량의 문제가 아닙니다.100개만 넣었을 때 영향을 받고 있습니다.

다음은 테이블에 배치 삽입을 수행하는 코드의 예입니다.

static async Task BatchInsert( CloudTable table, List<ITableEntity> entities )
    {
        int rowOffset = 0;

        while ( rowOffset < entities.Count )
        {
            Stopwatch sw = Stopwatch.StartNew();

            var batch = new TableBatchOperation();

            // next batch
            var rows = entities.Skip( rowOffset ).Take( 100 ).ToList();

            foreach ( var row in rows )
                batch.Insert( row );

            // submit
            await table.ExecuteBatchAsync( batch );

            rowOffset += rows.Count;

            Trace.TraceInformation( "Elapsed time to batch insert " + rows.Count + " rows: " + sw.Elapsed.ToString( "g" ) );
        }
    }

배치 작업을 사용하고 있으며 다음은 디버그 출력 예제입니다.

Microsoft.WindowsAzure.Storage Information: 3 : b08a07da-fceb-4bec-af34-3beaa340239b: Starting asynchronous request to http://127.0.0.1:10002/devstoreaccount1.
Microsoft.WindowsAzure.Storage Verbose: 4 : b08a07da-fceb-4bec-af34-3beaa340239b: StringToSign = POST..multipart/mixed; boundary=batch_6d86d34c-5e0e-4c0c-8135-f9788ae41748.Tue, 30 Jul 2013 18:48:38 GMT./devstoreaccount1/devstoreaccount1/$batch.
Microsoft.WindowsAzure.Storage Information: 3 : b08a07da-fceb-4bec-af34-3beaa340239b: Preparing to write request data.
Microsoft.WindowsAzure.Storage Information: 3 : b08a07da-fceb-4bec-af34-3beaa340239b: Writing request data.
Microsoft.WindowsAzure.Storage Information: 3 : b08a07da-fceb-4bec-af34-3beaa340239b: Waiting for response.
Microsoft.WindowsAzure.Storage Information: 3 : b08a07da-fceb-4bec-af34-3beaa340239b: Response received. Status code = 202, Request ID = , Content-MD5 = , ETag = .
Microsoft.WindowsAzure.Storage Information: 3 : b08a07da-fceb-4bec-af34-3beaa340239b: Response headers were processed successfully, proceeding with the rest of the operation.
Microsoft.WindowsAzure.Storage Information: 3 : b08a07da-fceb-4bec-af34-3beaa340239b: Processing response body.
Microsoft.WindowsAzure.Storage Information: 3 : b08a07da-fceb-4bec-af34-3beaa340239b: Operation completed successfully.
iisexpress.exe Information: 0 : Elapsed time to batch insert 100 rows: 0:00:00.9351871

보시다시피 이 예제는 100개의 행을 삽입하는 데 거의 1초가 걸립니다.제 개발 기계(3.4 Ghz 쿼드 코어)의 평균은 약 0.8초인 것 같습니다.

이건 말도 안 되는 소리야.

다음은 배치 삭제 작업의 예입니다.

Microsoft.WindowsAzure.Storage Information: 3 : 4c271cb5-7463-44b1-b2e5-848b8fb10a93: Starting asynchronous request to http://127.0.0.1:10002/devstoreaccount1.
Microsoft.WindowsAzure.Storage Verbose: 4 : 4c271cb5-7463-44b1-b2e5-848b8fb10a93: StringToSign = POST..multipart/mixed; boundary=batch_7e3d229f-f8ac-4aa0-8ce9-ed00cb0ba321.Tue, 30 Jul 2013 18:47:41 GMT./devstoreaccount1/devstoreaccount1/$batch.
Microsoft.WindowsAzure.Storage Information: 3 : 4c271cb5-7463-44b1-b2e5-848b8fb10a93: Preparing to write request data.
Microsoft.WindowsAzure.Storage Information: 3 : 4c271cb5-7463-44b1-b2e5-848b8fb10a93: Writing request data.
Microsoft.WindowsAzure.Storage Information: 3 : 4c271cb5-7463-44b1-b2e5-848b8fb10a93: Waiting for response.
Microsoft.WindowsAzure.Storage Information: 3 : 4c271cb5-7463-44b1-b2e5-848b8fb10a93: Response received. Status code = 202, Request ID = , Content-MD5 = , ETag = .
Microsoft.WindowsAzure.Storage Information: 3 : 4c271cb5-7463-44b1-b2e5-848b8fb10a93: Response headers were processed successfully, proceeding with the rest of the operation.
Microsoft.WindowsAzure.Storage Information: 3 : 4c271cb5-7463-44b1-b2e5-848b8fb10a93: Processing response body.
Microsoft.WindowsAzure.Storage Information: 3 : 4c271cb5-7463-44b1-b2e5-848b8fb10a93: Operation completed successfully.
iisexpress.exe Information: 0 : Elapsed time to batch delete 100 rows: 0:00:00.6524402

0.5초 이상 지속됩니다.

이를 Azure(작은 인스턴스)에도 배포하여 실행한 결과, 28000개의 행을 삽입하는 데 20분이 소요되었습니다.

현재 Storage Client Library의 2.1 RC 버전을 사용하고 있습니다. MSDN Blog

제가 뭔가 잘못하고 있는 게 분명해요.무슨 생각 있어요?

갱신하다

전반적인 속도 향상(및 최대 8개의 논리 프로세서)의 순 효과와 병렬화를 시도했지만 여전히 개발 기계에서 초당 150개의 행 삽입에 불과합니다.

전체적으로 제가 알 수 있는 것보다 더 나은 것은 없으며, Azure(작은 예)에 배포할 때 더 나쁠 수도 있습니다.

조언에 따라 스레드 풀을 늘리고 웹 역할에 대한 최대 HTTP 연결 수를 늘렸습니다.

저는 여전히 제 삽입/삭제를 150 ROP로 제한하는 근본적인 무언가를 놓치고 있다고 느낍니다.

업데이트 2

(2.1 RC Storage Client에 내장된 새로운 로깅을 사용하여) Azure에 구축된 소규모 인스턴스의 일부 진단 로그를 분석한 후에 약간의 정보를 얻었습니다.

배치 삽입에 대한 첫 번째 스토리지 클라이언트 로그는 다음과 같습니다.635109046781264034표시된 눈금:

caf06fca-1857-4875-9923-98979d850df3: Starting synchronous request to https://?.table.core.windows.net/.; TraceSource 'Microsoft.WindowsAzure.Storage' event

에 저는 이를 고초거의봅 3나는이로다니를에서 .635109046810104314표시된 눈금:

caf06fca-1857-4875-9923-98979d850df3: Preparing to write request data.; TraceSource 'Microsoft.WindowsAzure.Storage' event

되는 로그 몇 개가 더 , 이 는 0.15초에 종료됩니다.635109046811645418삽입물을 감싸는 눈금:

caf06fca-1857-4875-9923-98979d850df3: Operation completed successfully.; TraceSource 'Microsoft.WindowsAzure.Storage' event

어떻게 해야 할지는 모르겠지만, 제가 조사한 배치 삽입 로그 전체에 걸쳐 상당히 일치합니다.

업데이트 3

병렬로 배치 삽입하는 데 사용되는 코드입니다.이 코드에서는 테스트를 위해 100개씩의 배치를 고유 파티션에 삽입하고 있습니다.

static async Task BatchInsert( CloudTable table, List<ITableEntity> entities )
    {
        int rowOffset = 0;

        var tasks = new List<Task>();

        while ( rowOffset < entities.Count )
        {
            // next batch
            var rows = entities.Skip( rowOffset ).Take( 100 ).ToList();

            rowOffset += rows.Count;

            string partition = "$" + rowOffset.ToString();

            var task = Task.Factory.StartNew( () =>
                {
                    Stopwatch sw = Stopwatch.StartNew();

                    var batch = new TableBatchOperation();

                    foreach ( var row in rows )
                    {
                        row.PartitionKey = row.PartitionKey + partition;
                        batch.InsertOrReplace( row );
                    }

                    // submit
                    table.ExecuteBatch( batch );

                    Trace.TraceInformation( "Elapsed time to batch insert " + rows.Count + " rows: " + sw.Elapsed.ToString( "F2" ) );
                } );

            tasks.Add( task );
        }

        await Task.WhenAll( tasks );
    }

위에서 설명한 바와 같이, 이는 수천 개의 행을 삽입하는 데 걸리는 전체 시간을 개선하는 데 도움이 되지만, 100개의 각 배치는 여전히 몇 초가 걸립니다.

업데이트 4

그래서 VS2012.2를 사용하여 웹 역할을 단일 페이지 템플릿(TODO 샘플이 포함된 새로운 템플릿)으로 사용하여 완전히 새로운 Azure Cloud Service 프로젝트를 만들었습니다.

이것은 새로운 NuGet 패키지나 그 어떤 것도 없이 바로 사용할 수 있습니다.기본적으로 스토리지 클라이언트 라이브러리 v2와 EDM 및 관련 라이브러리 v5.2를 사용합니다.

홈 컨트롤러 코드를 다음과 같이 수정했습니다(일부 랜덤 데이터를 사용하여 실제 앱에 저장할 열을 시뮬레이션).

public ActionResult Index( string returnUrl )
    {
        ViewBag.ReturnUrl = returnUrl;

        Task.Factory.StartNew( () =>
            {
                TableTest();
            } );

        return View();
    }

    static Random random = new Random();
    static double RandomDouble( double maxValue )
    {
        // the Random class is not thread safe!
        lock ( random ) return random.NextDouble() * maxValue;
    }

    void TableTest()
    {
        // Retrieve storage account from connection-string
        CloudStorageAccount storageAccount = CloudStorageAccount.Parse(
            CloudConfigurationManager.GetSetting( "CloudStorageConnectionString" ) );

        // create the table client
        CloudTableClient tableClient = storageAccount.CreateCloudTableClient();

        // retrieve the table
        CloudTable table = tableClient.GetTableReference( "test" );

        // create it if it doesn't already exist
        if ( table.CreateIfNotExists() )
        {
            // the container is new and was just created
            Trace.TraceInformation( "Created table named " + "test" );
        }


        Stopwatch sw = Stopwatch.StartNew();

        // create a bunch of objects
        int count = 28000;
        List<DynamicTableEntity> entities = new List<DynamicTableEntity>( count );

        for ( int i = 0; i < count; i++ )
        {
            var row = new DynamicTableEntity()
            {
                PartitionKey = "filename.txt",
                RowKey = string.Format( "$item{0:D10}", i ),
            };

            row.Properties.Add( "Name", EntityProperty.GeneratePropertyForString( i.ToString() ) );
            row.Properties.Add( "Data", EntityProperty.GeneratePropertyForString( string.Format( "data{0}", i ) ) );
            row.Properties.Add( "Value1", EntityProperty.GeneratePropertyForDouble( RandomDouble( 10000 ) ) );
            row.Properties.Add( "Value2", EntityProperty.GeneratePropertyForDouble( RandomDouble( 10000 ) ) );
            row.Properties.Add( "Value3", EntityProperty.GeneratePropertyForDouble( RandomDouble( 1000 ) ) );
            row.Properties.Add( "Value4", EntityProperty.GeneratePropertyForDouble( RandomDouble( 90 ) ) );
            row.Properties.Add( "Value5", EntityProperty.GeneratePropertyForDouble( RandomDouble( 180 ) ) );
            row.Properties.Add( "Value6", EntityProperty.GeneratePropertyForDouble( RandomDouble( 1000 ) ) );

            entities.Add( row );
        }

        Trace.TraceInformation( "Elapsed time to create record rows: " + sw.Elapsed.ToString() );

        sw = Stopwatch.StartNew();

        Trace.TraceInformation( "Inserting rows" );

        // batch our inserts (100 max)
        BatchInsert( table, entities ).Wait();

        Trace.TraceInformation( "Successfully inserted " + entities.Count + " rows into table " + table.Name );
        Trace.TraceInformation( "Elapsed time: " + sw.Elapsed.ToString() );

        Trace.TraceInformation( "Done" );
    }


            static async Task BatchInsert( CloudTable table, List<DynamicTableEntity> entities )
    {
        int rowOffset = 0;

        var tasks = new List<Task>();

        while ( rowOffset < entities.Count )
        {
            // next batch
            var rows = entities.Skip( rowOffset ).Take( 100 ).ToList();

            rowOffset += rows.Count;

            string partition = "$" + rowOffset.ToString();

            var task = Task.Factory.StartNew( () =>
            {
                var batch = new TableBatchOperation();

                foreach ( var row in rows )
                {
                    row.PartitionKey = row.PartitionKey + partition;
                    batch.InsertOrReplace( row );
                }

                // submit
                table.ExecuteBatch( batch );

                Trace.TraceInformation( "Inserted batch for partition " + partition );
            } );

            tasks.Add( task );
        }

        await Task.WhenAll( tasks );
    }

다음과 같은 결과를 얻을 수 있습니다.

iisexpress.exe Information: 0 : Elapsed time to create record rows: 00:00:00.0719448
iisexpress.exe Information: 0 : Inserting rows
iisexpress.exe Information: 0 : Inserted batch for partition $100
...
iisexpress.exe Information: 0 : Successfully inserted 28000 rows into table test
iisexpress.exe Information: 0 : Elapsed time: 00:01:07.1398928

이것은 제 다른 앱보다 조금 더 빠릅니다. 460 ROP 이상입니다.이것은 여전히 받아들일 수 없습니다.이 테스트에서도 CPU(논리 프로세서 8개)가 거의 한계에 달했고 디스크 액세스도 거의 유휴 상태였습니다.

저는 무엇이 잘못되었는지 막막합니다.

업데이트 5

조정과 조정을 한 바퀴 돌고 돌아보니 몇 가지 개선이 이루어졌지만, 배치 삽입 또는 교체 작업(100개 배치)을 수행하는 500-700(ish) ROP보다 훨씬 빨리 수행할 수 없습니다.

이 테스트는 Azure 클라우드에서 작은 인스턴스(또는 두 개)를 사용하여 수행됩니다.아래의 의견에 따르면, 저는 현지 테스트가 기껏해야 느릴 것이라는 사실을 인정합니다.

여기 몇 가지 예가 있습니다.각 예는 매우 고유한 파티션 키입니다.

Successfully inserted 904 rows into table org1; TraceSource 'w3wp.exe' event
Elapsed time: 00:00:01.3401031; TraceSource 'w3wp.exe' event

Successfully inserted 4130 rows into table org1; TraceSource 'w3wp.exe' event
Elapsed time: 00:00:07.3522871; TraceSource 'w3wp.exe' event

Successfully inserted 28020 rows into table org1; TraceSource 'w3wp.exe' event
Elapsed time: 00:00:51.9319217; TraceSource 'w3wp.exe' event

성능 상한선이 있는 것은 제 MSDN Azure 계정일 수도 있습니다.몰라.

이 시점에서 저는 이것을 다 끝냈다고 생각합니다.제 목적에 맞게 사용할 수 있을 정도로 빠를 수도 있고, 다른 길을 걸을 수도 있습니다.

결론

아래의 모든 답은 좋습니다!

제 구체적인 질문을 위해, 저는 작은 Azure 인스턴스에서 최대 2k ROPS, 더 일반적으로 약 1k의 속도를 볼 수 있었습니다.비용을 낮게 유지해야 하기 때문에(따라서 인스턴스 크기를 낮게 유지해야 하기 때문에) 테이블을 사용할 수 있는 용도가 정의됩니다.

모든 도움에 감사드립니다.

기본 개념 - 병렬을 사용하여 속도를 높입니다.

1단계 - 스레드 풀에서 이 작업을 수행할 수 있는 충분한 스레드를 제공합니다. - 스레드 풀.최소 스레드 설정(1024, 256);

2단계 - 파티션을 사용합니다.나는 가이드를 ID로 사용하고, 나는 256개의 고유 파티션으로 분할하기 위해 last to 문자를 사용합니다(실제로 나의 경우 48개의 파티션에서 N개의 하위 집합으로 그룹화합니다).

3단계 - 작업을 사용하여 삽입, 테이블 참조에 개체 풀링 사용

public List<T> InsertOrUpdate(List<T> items)
        {
            var subLists = SplitIntoPartitionedSublists(items);

            var tasks = new List<Task>();

            foreach (var subList in subLists)
            {
                List<T> list = subList;
                var task = Task.Factory.StartNew(() =>
                    {
                        var batchOp = new TableBatchOperation();
                        var tableRef = GetTableRef();

                        foreach (var item in list)
                        {
                            batchOp.Add(TableOperation.InsertOrReplace(item));
                        }

                        tableRef.ExecuteBatch(batchOp);
                        ReleaseTableRef(tableRef);
                    });
                tasks.Add(task);
            }

            Task.WaitAll(tasks.ToArray());

            return items;
        }

private IEnumerable<List<T>> SplitIntoPartitionedSublists(IEnumerable<T> items)
        {
            var itemsByPartion = new Dictionary<string, List<T>>();

            //split items into partitions
            foreach (var item in items)
            {
                var partition = GetPartition(item);
                if (itemsByPartion.ContainsKey(partition) == false)
                {
                    itemsByPartion[partition] = new List<T>();
                }
                item.PartitionKey = partition;
                item.ETag = "*";
                itemsByPartion[partition].Add(item);
            }

            //split into subsets
            var subLists = new List<List<T>>();
            foreach (var partition in itemsByPartion.Keys)
            {
                var partitionItems = itemsByPartion[partition];
                for (int i = 0; i < partitionItems.Count; i += MaxBatch)
                {
                    subLists.Add(partitionItems.Skip(i).Take(MaxBatch).ToList());
                }
            }

            return subLists;
        }

        private void BuildPartitionIndentifiers(int partitonCount)
        {
            var chars = new char[] { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' }.ToList();
            var keys = new List<string>();

            for (int i = 0; i < chars.Count; i++)
            {
                var keyA = chars[i];
                for (int j = 0; j < chars.Count; j++)
                {
                    var keyB = chars[j];
                    keys.Add(string.Concat(keyA, keyB));
                }
            }


            var keySetMaxSize = Math.Max(1, (int)Math.Floor((double)keys.Count / ((double)partitonCount)));
            var keySets = new List<List<string>>();

            if (partitonCount > keys.Count)
            {
                partitonCount = keys.Count;
            }

            //Build the key sets
            var index = 0;
            while (index < keys.Count)
            {
                var keysSet = keys.Skip(index).Take(keySetMaxSize).ToList();
                keySets.Add(keysSet);
                index += keySetMaxSize;
            }

            //build the lookups and datatable for each key set
            _partitions = new List<string>();
            for (int i = 0; i < keySets.Count; i++)
            {
                var partitionName = String.Concat("subSet_", i);
                foreach (var key in keySets[i])
                {
                    _partitionByKey[key] = partitionName;
                }
                _partitions.Add(partitionName);
            }

        }

        private string GetPartition(T item)
        {
            var partKey = item.Id.ToString().Substring(34,2);
            return _partitionByKey[partKey];
        }

        private string GetPartition(Guid id)
        {
            var partKey = id.ToString().Substring(34, 2);
            return _partitionByKey[partKey];
        }

        private CloudTable GetTableRef()
        {
            CloudTable tableRef = null;
            //try to pop a table ref out of the stack
            var foundTableRefInStack = _tableRefs.TryPop(out tableRef);
            if (foundTableRefInStack == false)
            {
                //no table ref available must create a new one
                var client = _account.CreateCloudTableClient();
                client.RetryPolicy = new ExponentialRetry(TimeSpan.FromSeconds(1), 4);
                tableRef = client.GetTableReference(_sTableName);
            }

            //ensure table is created
            if (_bTableCreated != true)
            {
                tableRef.CreateIfNotExists();
                _bTableCreated = true;
            }

            return tableRef;
        }

결과 - 19-22kops 스토리지 계정 최대값

만약 당신이 전체 소스에 관심이 있다면 저에게 연락하세요.

계류장이 필요합니까?여러 스토리지 계정 사용!

이건 몇 달간의 시행착오, 테스트, 책상에 머리를 부딪힌 결과입니다.정말 도움이 되었으면 좋겠습니다.

좋아요, 3등이 매력에 답하나요?

http://blogs.msdn.com/b/windowsazurestorage/archive/2010/11/06/how-to-get-most-out-of-windows-azure-tables.aspx

스토리지 에뮬레이터에 대해 진지하게 연구한 친구가 몇 가지 이야기를 했습니다.

"모든 것이 단일 데이터베이스의 단일 테이블에 도달하고 있습니다(더 많은 파티션은 아무 영향도 주지 않습니다).각 테이블 삽입 작업은 3개 이상의 SQL 작업입니다.모든 배치는 트랜잭션 내부에 있습니다.트랜잭션 격리 수준에 따라 이러한 배치는 병렬로 실행할 수 있는 기능이 제한됩니다.

sql server 동작으로 인해 직렬 배치는 개별 삽입보다 빨라야 합니다. (개별 삽입은 기본적으로 각 디스크로 플러시되는 작은 트랜잭션이지만 실제 트랜잭션은 그룹으로 Disk로 플러시됩니다.)"

여러 파티션을 사용하는 IE는 실제 스토리지에 대해 영향을 미치는 반면 에뮬레이터의 성능에는 영향을 미치지 않습니다.

또한 로깅을 사용하도록 설정하고 로그를 약간 확인합니다. - c:\users\username\appdata\local\development storage

100의 배치 크기는 최고의 실제 성능을 제공하는 것으로 보이며, 잔글을 끄고, 기대 100을 끄고, 연결 제한을 강화합니다.

또한 실수로 중복 항목을 삽입하지 않도록 하십시오. 실수로 인해 오류가 발생하고 모든 것이 느려집니다.

실제 스토리지에 대한 테스트를 수행합니다.당신을 위해 이것의 대부분을 처리하는 꽤 괜찮은 라이브러리가 있습니다. http://www.nuget.org/packages/WindowsAzure.StorageExtensions/, 은 실제로 추가된 목록에 있는 ToList를 호출하는지 확인하고 열거되기 전까지는 실제로 실행되지 않습니다.또한 라이브러리는 동적 테이블 엔티티를 사용하므로 직렬화에 대한 성능은 작지만 TableEntity 항목 없이 순수 POCO 개체를 사용할 수 있습니다.

JT

많은 어려움과 실험을 거친 후, Azure Table 스토리지를 사용하여 단일 테이블 파티션에 대한 최적의 처리량(초당 2,000개 이상의 배치 쓰기 작업)과 스토리지 계정에 대한 훨씬 더 나은 처리량(초당 3,500개 이상의 배치 쓰기 작업)을 얻을 수 있었습니다.모든 다른 방법을 시도했지만 아래와 같이 .net 연결 제한을 프로그래밍 방식으로 설정(구성 샘플을 시도했지만 작동하지 않음)하여 문제가 해결되었습니다(Microsoft에서 제공한 백서를 기반으로 함).

ServicePoint tableServicePoint = ServicePointManager
    .FindServicePoint(_StorageAccount.TableEndpoint);

//This is a notorious issue that has affected many developers. By default, the value 
//for the number of .NET HTTP connections is 2.
//This implies that only 2 concurrent connections can be maintained. This manifests itself
//as "underlying connection was closed..." when the number of concurrent requests is
//greater than 2.

tableServicePoint.ConnectionLimit = 1000;

스토리지 계정당 20K 이상의 일괄 쓰기 작업을 수행한 다른 사용자는 경험을 공유하십시오.

좀 더 재미있게 말하자면, 여기 새로운 해답이 있습니다. 즉, 독립적인 테스트를 통해 운영 환경에서 쓰기 성능에 대해 놀라운 수치를 얻을 수 있으며 IO 차단 및 연결 관리를 훨씬 더 효과적으로 방지할 수 있습니다.저는 우리가 터무니없는 쓰기 속도(> 7kps)를 얻고 있기 때문에 이것이 당신에게 어떻게 적용되는지 매우 흥미롭습니다.

webconfig

 <system.net>
    <connectionManagement>
      <add address="*" maxconnection="48"/>
    </connectionManagement>
  </system.net>

테스트에서는 볼륨에 따른 파라미터를 사용했기 때문에 25000개의 아이템, 24개의 파티션, 100개의 배치 크기, 20개의 refecount가 항상 최선인 것 같습니다.이는 TPL 데이터 흐름(http://www.nuget.org/packages/Microsoft.Tpl.Dataflow/) for BufflerBlock)을 사용하여 대기 가능한 스레드 안전 테이블 참조 풀링을 제공합니다.

public class DyanmicBulkInsertTestPooledRefsAndAsynch : WebTest, IDynamicWebTest
{
    private int _itemCount;
    private int _partitionCount;
    private int _batchSize;
    private List<TestTableEntity> _items;
    private GuidIdPartitionSplitter<TestTableEntity> _partitionSplitter;
    private string _tableName;
    private CloudStorageAccount _account;
    private CloudTableClient _tableClient;
    private Dictionary<string, List<TestTableEntity>> _itemsByParition;
    private int _maxRefCount;
    private BufferBlock<CloudTable> _tableRefs;


    public DyanmicBulkInsertTestPooledRefsAndAsynch()
    {
        Properties = new List<ItemProp>();    
        Properties.Add(new ItemProp("ItemCount", typeof(int)));
        Properties.Add(new ItemProp("PartitionCount", typeof(int)));
        Properties.Add(new ItemProp("BatchSize", typeof(int)));
        Properties.Add(new ItemProp("MaxRefs", typeof(int)));


    }

    public List<ItemProp> Properties { get; set; }

    public void SetProps(Dictionary<string, object> propValuesByPropName)
    {
        _itemCount = (int)propValuesByPropName["ItemCount"];
        _partitionCount = (int)propValuesByPropName["PartitionCount"];
        _batchSize = (int)propValuesByPropName["BatchSize"];
        _maxRefCount = (int)propValuesByPropName["MaxRefs"];
    }

    protected override void SetupTest()
    {
        base.SetupTest();

        ThreadPool.SetMinThreads(1024, 256);
        ServicePointManager.DefaultConnectionLimit = 256;
        ServicePointManager.UseNagleAlgorithm = false;
        ServicePointManager.Expect100Continue = false;


        _account = CloudStorageAccount.Parse(CloudConfigurationManager.GetSetting("DataConnectionString"));
        _tableClient = _account.CreateCloudTableClient();
        _tableName = "testtable" + new Random().Next(100000);

        //create the refs
        _tableRefs = new BufferBlock<CloudTable>();
        for (int i = 0; i < _maxRefCount; i++)
        {
            _tableRefs.Post(_tableClient.GetTableReference(_tableName));
        }

        var tableRefTask = GetTableRef();
        tableRefTask.Wait();
        var tableRef = tableRefTask.Result;

        tableRef.CreateIfNotExists();
        ReleaseRef(tableRef);

        _items = TestUtils.GenerateTableItems(_itemCount);
        _partitionSplitter = new GuidIdPartitionSplitter<TestTableEntity>();
        _partitionSplitter.BuildPartitions(_partitionCount);

        _items.ForEach(o =>
            {
                o.ETag = "*";
                o.Timestamp = DateTime.Now;
                o.PartitionKey = _partitionSplitter.GetPartition(o);
            });

        _itemsByParition = _partitionSplitter.SplitIntoPartitionedSublists(_items);
    }

    private async Task<CloudTable> GetTableRef()
    {
        return await _tableRefs.ReceiveAsync();            
    }

    private void ReleaseRef(CloudTable tableRef)
    {
        _tableRefs.Post(tableRef);
    }

    protected override void ExecuteTest()
    {
        Task.WaitAll(_itemsByParition.Keys.Select(parition => Task.Factory.StartNew(() => InsertParitionItems(_itemsByParition[parition]))).ToArray());
    }

    private void InsertParitionItems(List<TestTableEntity> items)
    {

        var tasks = new List<Task>();

        for (int i = 0; i < items.Count; i += _batchSize)
        {
            int i1 = i;

            var task = Task.Factory.StartNew(async () =>
            {
                var batchItems = items.Skip(i1).Take(_batchSize).ToList();

                if (batchItems.Select(o => o.PartitionKey).Distinct().Count() > 1)
                {
                    throw new Exception("Multiple partitions batch");
                }

                var batchOp = new TableBatchOperation();
                batchItems.ForEach(batchOp.InsertOrReplace);   

                var tableRef = GetTableRef.Result();
                tableRef.ExecuteBatch(batchOp);
                ReleaseRef(tableRef);
            });

            tasks.Add(task);

        }

        Task.WaitAll(tasks.ToArray());


    }

    protected override void CleanupTest()
    {
        var tableRefTask = GetTableRef();
        tableRefTask.Wait();
        var tableRef = tableRefTask.Result;
        tableRef.DeleteIfExists();
        ReleaseRef(tableRef);
    }

현재 여러 스토리지 계정을 처리할 수 있는 버전을 개발하고 있습니다. 이 버전을 사용하여 속도를 높일 수 있기를 바랍니다.또한 대규모 데이터셋을 위해 8개의 코어 가상 머신에서 이러한 가상 머신을 실행하고 있지만, 새로운 비블로킹 IO를 사용하면 제한된 VM에서 원활하게 실행될 수 있습니다.행운을 빕니다.

 public class SimpleGuidIdPartitionSplitter<T> where T : IUniqueId
{
    private ConcurrentDictionary<string, string> _partitionByKey = new ConcurrentDictionary<string, string>();
    private List<string> _partitions;
    private bool _bPartitionsBuilt;

    public SimpleGuidIdPartitionSplitter()
    {

    }

    public void BuildPartitions(int iPartCount)
    {
        BuildPartitionIndentifiers(iPartCount);
    }

    public string GetPartition(T item)
    {
        if (_bPartitionsBuilt == false)
        {
            throw new Exception("Partitions Not Built");
        }

        var partKey = item.Id.ToString().Substring(34, 2);
        return _partitionByKey[partKey];
    }

    public string GetPartition(Guid id)
    {
        if (_bPartitionsBuilt == false)
        {
            throw new Exception("Partitions Not Built");
        }

        var partKey = id.ToString().Substring(34, 2);
        return _partitionByKey[partKey];
    }

    #region Helpers
    private void BuildPartitionIndentifiers(int partitonCount)
    {
        var chars = new char[] { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' }.ToList();
        var keys = new List<string>();

        for (int i = 0; i < chars.Count; i++)
        {
            var keyA = chars[i];
            for (int j = 0; j < chars.Count; j++)
            {
                var keyB = chars[j];
                keys.Add(string.Concat(keyA, keyB));
            }
        }


        var keySetMaxSize = Math.Max(1, (int)Math.Floor((double)keys.Count / ((double)partitonCount)));
        var keySets = new List<List<string>>();

        if (partitonCount > keys.Count)
        {
            partitonCount = keys.Count;
        }

        //Build the key sets
        var index = 0;
        while (index < keys.Count)
        {
            var keysSet = keys.Skip(index).Take(keySetMaxSize).ToList();
            keySets.Add(keysSet);
            index += keySetMaxSize;
        }

        //build the lookups and datatable for each key set
        _partitions = new List<string>();
        for (int i = 0; i < keySets.Count; i++)
        {
            var partitionName = String.Concat("subSet_", i);
            foreach (var key in keySets[i])
            {
                _partitionByKey[key] = partitionName;
            }
            _partitions.Add(partitionName);
        }

        _bPartitionsBuilt = true;
    }
    #endregion
}



internal static List<TestTableEntity> GenerateTableItems(int count)
        {
            var items = new List<TestTableEntity>();
            var random = new Random();

            for (int i = 0; i < count; i++)
            {
                var itemId = Guid.NewGuid();

                items.Add(new TestTableEntity()
                {
                    Id = itemId,
                    TestGuid = Guid.NewGuid(),
                    RowKey = itemId.ToString(),
                    TestBool = true,
                    TestDateTime = DateTime.Now,
                    TestDouble = random.Next() * 1000000,
                    TestInt = random.Next(10000),
                    TestString = Guid.NewGuid().ToString(),
                });
            }

            var dupRowKeys = items.GroupBy(o => o.RowKey).Where(o => o.Count() > 1).Select(o => o.Key).ToList();
            if (dupRowKeys.Count > 0)
            {
                throw  new Exception("Dupicate Row Keys");
            }

            return items;
        }

그리고 한 가지 더 - 당신의 시기와 프레임워크가 영향을 받은 방식은 이 http://blogs.msdn.com/b/windowsazurestorage/archive/2013/08/08/net-clients-encountering-port-exhaustion-after-installing-kb2750149-or-kb2805227.aspx 의 포인트입니다.

언급URL : https://stackoverflow.com/questions/17955557/painfully-slow-azure-table-insert-and-delete-batch-operations

반응형