C#: Read lines from a text file in batches and process in parallel
I have had numerous requests from readers if I could provide some sample C# code which will read lines from a text file in batches and process in parallel, so here it is!
As bench-marked in my article, “The fastest way to read and process text files”, here is some sample code which does the following:
- reads an array of CSV files line by line
- when a “batch” threshold (eg, a set number of lines) is hit, it will then process all the lines that have been read in parallel
- will repeat until all the entirety of the file(s) has been read.
This code is not meant to be production ready, but to simply serve as a quick guide and starting point to help you with your own production ready code.
Also included are the database and datatable snippets you’ll need if you’re uploading into a database. Obviously if you don’t want/need this functionality, just remove or comment out.
This sample code, as it stands, assumes:
- the text files are of type CSV and SimpleDelimited
- the processed data is uploaded into a database
- the column names in the files and database table match
Hope this helps! If you have any questions, feel free to ask in the comments!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 |
using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Data.SqlClient; using System.Data; //For the TextFieldParse class //Be sure to add it as a reference for your project! using Microsoft.VisualBasic.FileIO; namespace ConsoleApplication1 { /// <summary> ///Simple code snippets to read a file, process the lines in parallel, ///and upload the results into a database table. /// ///Obviously if you don't need to copy the data into a database, just modify the code ///as appropriate. /// /// NOTE that this will have compile and runtime errors as it's just a /// rough code snippet to help you get started with your own implementation. /// /// Think of it like a guide. /// /// Code source from http://cc.davelozinski.com /// </summary> class Program { static void Main(string[] args) { //Basic usage to help you get started: ProcessFileTaskItem( new string[] { "\\\\the\\full\\oath\\to\\the\\first\\file.txt", "\\\\full\\path\\to\\second\\file.txt" } , "Data Source=YourDatabaseServerName;Initial Catalog=YourDatabaseName;Integrated Security=SSPI;" , "YourDatabaseTableToUploadTheFileDataInto"); } /// <summary> /// Process File - Rough code outline. /// This will read an array of input files, process the lines in parallel, and upload /// everything into the specified destination database table. /// /// NOTE that this will have compile and runtime errors as it's just a /// rough code snippet to help you get started with your own implementation. /// /// Think of it like a guide. /// /// Code source from http://cc.davelozinski.com /// </summary> /// <param name="SourceFiles">Array containing the files to be read, including the full path.</param> /// <param name="DatabaseConnectionString">Connection string to your database if you're going to upload data to it</param> /// <param name="DestinationTable">The database table to upload the file information into</param> public static void ProcessFileTaskItem(string[] SourceFiles, string DatabaseConnectionString, string DestinationTable) { //Make sure there's files to read if (SourceFiles != null && SourceFiles.Length > 0) { //Loop through the file array //Depending on your system's set up, if it's fast enough you can //implement this in a Parallel.For loop as well to read multiple //files simultaneously. //Parallel.For(0, SourceFiles.Length, x => for (int x = 0; x < SourceFiles.Length; x++) { //Make sure the file exists and if so open it for reading. if (File.Exists(SourceFiles[x])) { //Use Microsoft's built in TextFieldParser class for parsing CSV files. using (TextFieldParser tfp = new TextFieldParser(SourceFiles[x])) { //If you're going to upload the data in your files to a database, this is a good //place to open the connection string so it's only opened and used once instead of //opening/closing/opening/closing. // //If you're not uploading to a database, then obviously comment out or delete the //relevant lines of code. using (SqlConnection connectionDest = new SqlConnection(DatabaseConnectionString)) { connectionDest.Open(); //Configure everything to upload to the database via bulk copy. using (SqlBulkCopy sbc = new SqlBulkCopy(connectionDest, SqlBulkCopyOptions.TableLock, null)) { //Configure the bulk copy settings sbc.DestinationTableName = DestinationTable; sbc.BulkCopyTimeout = 28800; //8 hours //Now read and process the file ProcessAllLinesInInputFile(tfp, SourceFiles[x], connectionDest, sbc); } connectionDest.Close(); } //using (SqlConnection connectionDest = new SqlConnection(DatabaseConnectionString)) //If you're not uploading to a database then you can just uncomment and use this: //ProcessAllLinesInInputFile(tfp, SourceFiles[x]); tfp.Close(); } //using (TextFieldParser tfp = new TextFieldParser(SourceFiles[x])) } //if File.Exists(SourceFiles[x]) else { //The file doesn't exist //Do whatever you need to here } } //for //); //End Parallel reading of files //Explicitly clean up before exiting Array.Clear(SourceFiles, 0, SourceFiles.Length); } else { //Do whatever you need to here if SourceFiles == null || SourceFiles.Length == 0) } } //ProcessFileTaskItem /// <summary> /// Processes every line in the source input file. /// /// NOTE that this will have compile and runtime errors as it's just a /// rough code snippet to help you get started with your own implementation. /// /// Think of it like a guide. /// /// Code source from http://cc.davelozinski.com /// </summary> /// <param name="tfp">The open textfieldparser used to read each line of the file</param> /// <param name="SourceFile">The collection of information on the source file</param> /// <param name="connectionDest">The open SQL Server connection to the destination SQL server for bulk copying</param> /// <param name="sbc">The open SQL Server Bulk Copy object</param> private static void ProcessAllLinesInInputFile(TextFieldParser tfp, string SourceFile, SqlConnection connectionDest, SqlBulkCopy sbc) { //Put these here so new objects aren't created with each loop iteration //The number of lines to read before processing all of them in parallel. //You obviously could make this a parameter too. int BatchSize = 50000; //Will hold each line and each column of each line read string[][] CurrentLines = new string[BatchSize][]; //Create a local data table. Should be the same name as the table //in the database you'll be uploading everything to. //Obviously this could be a parameter too. DataTable CurrentRecords = new DataTable("YourDestinationTableNameInYourDatabase"); //The column names. They should match what's in the database table. //If the column names are in the text file, you'll have to write your own routine //to read the first line of your file and extract/parse the column names to include here //as I'm not doing everything for you unless you give me a good rate. ;-) string[] ColumnNames = new string[] { "Column1", "Column2", "etc" }; //The number of records currently processed for SQL bulk copy int BatchCount = 0; //The total number of records processed. Could be used for logging purposes. int RecordCount = 0; //More lines to process in the file? bool blnFileHasMoreLines = true; //The number of lines read thus far int intLineReadCounter = 0; //used for thread locking purposes object oSyncLock = new object(); //Could be used for logging and stat keeping purposes. DateTime batchStartTime = DateTime.Now; DateTime batchEndTime = DateTime.Now; TimeSpan batchTimeSpan = batchEndTime - batchStartTime; //Set the next line as appropriate if using a CSV and there's data enclused in quotes. //tfp.HasFieldsEnclosedInQuotes = true; //If the file is Delimited, set these values tfp.TextFieldType = FieldType.Delimited; tfp.Delimiters = new string[] { "," }; //If the file is FixedWidth, you'll have to write your own code. //tfp.TextFieldType = FieldType.FixedWidth; //tfp.SetFieldWidths( ); //Create the datatable with the column names. for (int x = 0; x < ColumnNames.Length; x++) CurrentRecords.Columns.Add(ColumnNames[x], typeof(string)); //Of note: it's faster to read all the lines we are going to act on and //then process them in parallel instead of reading and processing line by line. while (blnFileHasMoreLines) { batchStartTime = DateTime.Now; //Reset the timer //Read in all the lines up to the BatchCopy size or //until there's no more lines in the file while (intLineReadCounter < BatchSize && !tfp.EndOfData) { CurrentLines[intLineReadCounter] = tfp.ReadFields(); intLineReadCounter += 1; BatchCount += 1; RecordCount += 1; } batchEndTime = DateTime.Now; //record the end time of the current batch batchTimeSpan = batchEndTime - batchStartTime; //get the timespan for stats //Now process each line in parallel. Parallel.For(0, intLineReadCounter, x => //for (int x=0; x < intLineReadCounter; x++) //Or the slower single threaded version for debugging { List<object> values = null; //so each thread gets its own copy. if (tfp.TextFieldType == FieldType.Delimited) { if (CurrentLines[x].Length != CurrentRecords.Columns.Count) { //Do what you need to if the number of columns in the current line //don't match the number of expected columns return; //stop now and don't add this record to the current collection of valid records. } //Number of columns match so copy over the values into the datatable //for later upload into a database values = new List<object>(CurrentRecords.Columns.Count); for (int i = 0; i < CurrentLines[x].Length; i++) values.Add(CurrentLines[x][i].ToString()); //OR do your own custom processing here if not using a database. } else if (tfp.TextFieldType == FieldType.FixedWidth) { //Implement your own processing if the file columns are fixed width. } //Now lock the data table before saving the results so there's no thread bashing on the datatable lock (oSyncLock) { CurrentRecords.LoadDataRow(values.ToArray(), true); } values.Clear(); } ); //Parallel.For //If you're not using a database, you obviously won't need this next piece of code. if (BatchCount >= BatchSize) { //Do the SQL bulk copy and save the info into the database sbc.BatchSize = CurrentRecords.Rows.Count; sbc.WriteToServer(CurrentRecords); BatchCount = 0; //Reset these values CurrentRecords.Clear(); // " } if (CurrentLines[intLineReadCounter] == null) blnFileHasMoreLines = false; //we're all done, so signal while loop to stop intLineReadCounter = 0; //reset for next pass Array.Clear(CurrentLines, 0, CurrentLines.Length); } //while blnhasmorelines //Write out the last of the good records to the database if (CurrentRecords.Rows.Count > 0) { sbc.BatchSize = CurrentRecords.Rows.Count; sbc.WriteToServer(CurrentRecords); } //Clean up if (CurrentRecords != null) CurrentRecords.Clear(); if (CurrentLines != null) Array.Clear(CurrentLines, 0, CurrentLines.Length); oSyncLock = null; } } } |