Disclaimer: I am a C++ guy, so my C# is not even close. Therefore some review might be helpful.
Here is my 4th attempt at a client using asynchronous sockets. Please give feedback.
The hardest part of this endeavour is synchronizing what can happen and when it can happen. Some examples I had to tackle in the debugger:
-
If the client cannot connect, how does startup alert the caller while the state machine loop is trying to reconnect. they both can’t fight over state.
-
When we disconnect and there is an outstanding call to receive, how can it safely finish and how do we control if another call to receive should occur or not
-
What happens if we pull the plug after calling send but before it complete.
etc. etc.
I found making a looping thread with a state machine that included an error state took care of a lot, but startup and shutdown had to be handled a little different. I think I took care of all problems. Tests seemed to work OK.
I wonder how others would go about it. A lot of example code I found on the web, simply doesn’t handle errors or seem to care about cases like those I mentioned, but that is the hardest part.
Anyway, feedback is appreciated.
using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Net.Sockets; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Collections.Concurrent; namespace IntegrationTests { public class Client : IDisposable { static private ulong _lastId = 1; private ulong _id; private bool _disposed; protected class ReceiveContext { public const int _bufferSize = 1024; public byte[] _buffer = new byte[_bufferSize]; // Contains bytes from one receive public StringBuilder _stringBuilder = new StringBuilder(); // Contains bytes for multiple receives in order to build message up to delim }; protected enum ConnectionState { DISCONNECTED , LOGGING_IN , CONNECTED , ERROR , SHUTTING_DOWN , SHUTDOWN }; protected string _host; protected int _port; protected TimeSpan _timeout; protected TimeSpan _reconnectAttemptInterval; protected string _sessionId; protected Socket _socket; protected object _lockNumOutstandingAsyncOps; protected int _numOutstandingAsyncOps; protected int _connectionState; /// <summary> /// Manages the state machine that is our connection state /// Issues Connect, Disconnect, and Receive calls /// </summary> protected Thread _processingThread; /// <summary> /// Startup should not return until we are connected and logged in, or it has failed in the attempt for the duration of the timeout /// </summary> protected ManualResetEvent _connectedEvent; /// <summary> /// We need a signalling mechanism outside of the connection state, because we do not want /// Startup, Shutdown, and the ProcessingThread competing to set the state. /// This flag is set from the thread calling Startup and Shutdown, /// while ConnectionState is set from the Processing thread or async send/receive calls and callbacks resulting from them /// </summary> protected ManualResetEvent _shutdownEvent; BlockingCollection<string> _messageQueue; public Client(string host, int port, string sessionId, TimeSpan timeoutMilliseconds, TimeSpan reconnectAttemptInterval) { _id = _lastId++; _disposed = false; _host = host; _port = port; _sessionId = sessionId; _timeout = timeoutMilliseconds; _reconnectAttemptInterval = reconnectAttemptInterval; _socket = null; _lockNumOutstandingAsyncOps = new object(); _numOutstandingAsyncOps = 0; _connectionState = (int)ConnectionState.DISCONNECTED; _processingThread = new Thread(ProcessingThreadProc); _connectedEvent = new ManualResetEvent(false); _shutdownEvent = new ManualResetEvent(false); _messageQueue = new BlockingCollection<string>(); } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected virtual void Dispose(bool disposing) { if(_disposed) { return; } if (disposing) { Shutdown(); _messageQueue.Dispose(); } _disposed = true; } /// <summary> /// Coming up with names that don't match the type is annoying /// </summary> protected ConnectionState ConnectionStateProperty { get { return (ConnectionState)_connectionState; } set { Interlocked.Exchange(ref _connectionState, (int)value); } } //-------------------------------------------------------------------------------------------------- public void Startup() { if (_processingThread.IsAlive) { // Error throw new ApplicationException(string.Format("Client #{0} Call to Startup, but processing thread is already started")); } Debug.WriteLine(string.Format("Client #{0} Starting up.", _id)); // We want this call to be synchronous, so wait until we are actually connected to return // That way, the caller will not start trying to send data until we connect _connectedEvent.Reset(); _processingThread.Start(); if (_connectedEvent.WaitOne(_timeout)) { Debug.WriteLine(string.Format("Client #{0} Started.", _id)); } else { _shutdownEvent.Set(); throw new ApplicationException(string.Format("Client #{0} Startup timed out.", _id)); } } //-------------------------------------------------------------------------------------------------- public void Shutdown() { Debug.WriteLine(string.Format("Client #{0} Shutting Down...", _id)); // Signal out processing thread that we want to shutdown _shutdownEvent.Set(); // Wait for the thread to complete if (_processingThread.IsAlive) { _processingThread.Join(); } // Done Debug.WriteLine(string.Format("Client #{0} Shutdown Complete...", _id)); } //-------------------------------------------------------------------------------------------------- protected void ProcessingThreadProc() { try { System.Threading.Thread.CurrentThread.Name = "Processing Thread"; while(!_shutdownEvent.WaitOne(0)) { switch (ConnectionStateProperty) { case ConnectionState.DISCONNECTED: { // Connect to the server if (!Connect()) { // Try again, after a little wait, until we connect System.Threading.Thread.Sleep(_reconnectAttemptInterval); } else { // Start reading from the socket lock (_lockNumOutstandingAsyncOps) { ReceiveContext context = new ReceiveContext(); _numOutstandingAsyncOps = 1; _socket.BeginReceive(context._buffer, 0, ReceiveContext._bufferSize, SocketFlags.None, new AsyncCallback(OnReceive), context); } // Successfully connected ConnectionStateProperty = ConnectionState.LOGGING_IN; } break; } case ConnectionState.LOGGING_IN: { // Send the Login request string loginRequest = string.Format("loginstuff{0}", _sessionId); var data = Encoding.ASCII.GetBytes(loginRequest); lock (_lockNumOutstandingAsyncOps) { Debug.WriteLine(string.Format("Client #{0} Sending Login Request: {1}" , _id, loginRequest)); ++_numOutstandingAsyncOps; _socket.BeginSend(data, 0, data.Length, 0, new AsyncCallback(OnSend), _socket); } ConnectionStateProperty = ConnectionState.CONNECTED; // Signal Startup(), if it happend to be waiting _connectedEvent.Set(); break; } case ConnectionState.ERROR: { // If anything went wrong we are going to go to the disconnected state which will cause a reconnection on the next loop iteration Disconnect(); ConnectionStateProperty = ConnectionState.DISCONNECTED; break; } case ConnectionState.CONNECTED: { // TODO - We don't wan't to keep looping with no yield // We should wait to do the next iteration until // A) There is an item on the queue to be processed or // B) The connection state has changed // Currently, I only wait on A and we are polling for B string message; if( !_messageQueue.TryTake(out message, _timeout) ) { // Timed out // Nothing to process, go on to the next iteration in order to handle connection status changes break; } else { if (!string.IsNullOrEmpty(message)) { Debug.WriteLine(string.Format("Client #{0} Processed Received Data: {1}" , _id, message)); } } break; } } } // We were signalled to shutdown // Disconnect the socket ConnectionStateProperty = ConnectionState.SHUTTING_DOWN; Disconnect(); ConnectionStateProperty = ConnectionState.SHUTDOWN; } catch (Exception e) { Debug.WriteLine(string.Format("Client #{0} Unhandled exception caught in processing thread. Exception: {1}" , _id, e.ToString())); } Debug.WriteLine(string.Format("Client #{0} Processing Thread exiting." , _id)); } public void MakeRequest(string thingy) { if (ConnectionStateProperty != ConnectionState.CONNECTED) { // Error - Cannot log in while not connected throw new ApplicationException(string.Format("Client #{0} MakeRequest was called for thingy{1}, but client is not connected." , _id, thingy)); } string message = string.Format("requeststuff{0}", thingy); var data = Encoding.ASCII.GetBytes(message); lock (_lockNumOutstandingAsyncOps) { Debug.WriteLine(string.Format("Client #{0} Sending Request: {1}" , _id, message)); ++_numOutstandingAsyncOps; _socket.BeginSend(data, 0, data.Length, 0, new AsyncCallback(OnSend), _socket); } } //-------------------------------------------------------------------------------------------------- protected bool Connect() { Disconnect(); IPHostEntry ipHostInfo = Dns.GetHostEntry(_host); IPAddress[] ipV4Addresses = ipHostInfo.AddressList.Where(x => x.AddressFamily == AddressFamily.InterNetwork).ToArray(); IPAddress[] ipV6Addresses = ipHostInfo.AddressList.Where(x => x.AddressFamily == AddressFamily.InterNetworkV6).ToArray(); IPEndPoint endpoint = new IPEndPoint(ipV4Addresses[0], _port); _socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); _socket.ReceiveTimeout = _timeout.Milliseconds; _socket.SendTimeout = _timeout.Milliseconds; try { _socket.Connect(endpoint); } catch (Exception e) { // Error Debug.WriteLine(string.Format("Client #{0} Exception caught while attempting to Connect. Exception: {1}" , _id, e.ToString())); return false; } Debug.WriteLine(string.Format("Client #{0} connected to: {1}", _id, _socket.RemoteEndPoint.ToString())); return true; } protected void Disconnect() { Debug.WriteLine(string.Format("Client #{0} Disconnecting...", _id)); // We need to wait here until all outstanding async operations complete // In order to avoid getting 'Object was disposed' exceptions in those async ops that use the socket lock (_lockNumOutstandingAsyncOps) { while (_numOutstandingAsyncOps > 0) { Monitor.Wait(_lockNumOutstandingAsyncOps); } if (_socket != null) { if (_socket.Connected) { _socket.Shutdown(SocketShutdown.Both); } _socket.Close(); _socket = null; } } Debug.WriteLine(string.Format("Client #{0} Disconnected...", _id)); } protected void OnReceive(IAsyncResult asyncResult) { ReceiveContext context = (ReceiveContext)asyncResult.AsyncState; string data = null; try { int bytesReceived = _socket.EndReceive(asyncResult); data = Encoding.ASCII.GetString(context._buffer, 0, bytesReceived); // If the remote host shuts down the Socket connection with the Shutdown method, and all available data has been received, // the EndReceive method will complete immediately and return zero bytes if (bytesReceived > 0) { StringBuilder stringBuilder = context._stringBuilder.Append(data); int index = -1; do { index = stringBuilder.ToString().IndexOf("#"); if (index != -1) { string message = stringBuilder.ToString().Substring(0, index + 1); stringBuilder.Remove(0, index + 1); if (!_messageQueue.TryAdd(message, _timeout)) { Debug.WriteLine(string.Format("Client #{0} Timed out while attempting to queue Received Data: {1}" , _id, message)); } } } while (index != -1); } } catch (Exception e) { // Error Debug.WriteLine(string.Format("Client #{0} Exception caught OnReceive. Exception: {1}" , _id, e.ToString())); ConnectionStateProperty = ConnectionState.ERROR; } finally { lock (_lockNumOutstandingAsyncOps) { --_numOutstandingAsyncOps; Monitor.Pulse(_lockNumOutstandingAsyncOps); } } // Issue the next async receive if (ConnectionStateProperty == ConnectionState.CONNECTED) { lock (_lockNumOutstandingAsyncOps) { try { ++_numOutstandingAsyncOps; ReceiveContext newContext = new ReceiveContext(); _socket.BeginReceive(newContext._buffer, 0, ReceiveContext._bufferSize, SocketFlags.None, new AsyncCallback(OnReceive), newContext); } catch(Exception e) { // Error Debug.WriteLine(string.Format("Client #{0} Exception caught OnReceive. Exception: {1}" , _id, e.ToString())); --_numOutstandingAsyncOps; ConnectionStateProperty = ConnectionState.ERROR; } } } } protected void OnSend(IAsyncResult asyncResult) { try { int bytesSent = _socket.EndSend(asyncResult); } catch (Exception e) { Debug.WriteLine(string.Format("Client #{0} Exception caught OnSend. Exception: {1}" , _id, e.ToString())); ConnectionStateProperty = ConnectionState.ERROR; } finally { lock (_lockNumOutstandingAsyncOps) { --_numOutstandingAsyncOps; Monitor.Pulse(_lockNumOutstandingAsyncOps); } } } } }
Main:
using System; using System.Collections.Generic; using System.Configuration; using System.Linq; using System.Text; using System.Threading.Tasks; namespace IntegrationTests { class Program { static void Main(string[] args) { string server = ConfigurationManager.AppSettings["server"]; int port = int.Parse(ConfigurationManager.AppSettings["port"]); int numClients = int.Parse(ConfigurationManager.AppSettings["numberOfClients"]); TimeSpan clientLifetime = TimeSpan.Parse(ConfigurationManager.AppSettings["clientLifetime"]); TimeSpan timeout = TimeSpan.Parse(ConfigurationManager.AppSettings["timeout"]); TimeSpan reconnectInterval = TimeSpan.Parse(ConfigurationManager.AppSettings["reconnectInterval"]); List<string> clientIds = ConfigurationManager.GetSection("clientIds") as List<string>; try { Task[] tasks = new Task[numClients]; for(int count = 0; count < numClients; ++count) { var copyOfcount = count; tasks[count] = Task.Factory.StartNew(() => { try { Client client = new Client(server, port, clientIds[copyOfcount], timeout, reconnectInterval); client.Startup(); client.MakeRequest("Request"); System.Threading.Thread.Sleep(clientLifetime); client.Shutdown(); } catch(Exception e) { Debug.WriteLine(string.Format("Caught an exception in task procedure. Exception: {0}" , e.ToString())); } }); } Task.WaitAll(tasks); } catch (Exception e) { Debug.WriteLine(string.Format("Caught an exception in main. Exception: {0}" , e.ToString())); } } } }