A Simplistic TCP Finite State Machine (FSM) in Python

The challenge

Automatons, or Finite State Machines (FSM), are extremely useful to programmers when it comes to software design. You will be given a simplistic version of an FSM to code for a basic TCP session.

The outcome of this exercise will be to return the correct state of the TCP FSM based on the array of events given.

The input array of events will consist of one or more of the following strings:

APP_PASSIVE_OPEN, APP_ACTIVE_OPEN, APP_SEND, APP_CLOSE, APP_TIMEOUT, RCV_SYN, RCV_ACK, RCV_SYN_ACK, RCV_FIN, RCV_FIN_ACK

The states are as follows and should be returned in all capital letters as shown:

CLOSED, LISTEN, SYN_SENT, SYN_RCVD, ESTABLISHED, CLOSE_WAIT, LAST_ACK, FIN_WAIT_1, FIN_WAIT_2, CLOSING, TIME_WAIT

The input will be an array of events. Your job is to traverse the FSM as determined by the events, and return the proper state as a string, all caps, as shown above.

If an event is not applicable to the current state, your code will return "ERROR".

Action of each event upon each state:

(the format is INITIAL_STATE: EVENT -> NEW_STATE)

CLOSED: APP_PASSIVE_OPEN -> LISTEN CLOSED: APP_ACTIVE_OPEN -> SYN_SENT LISTEN: RCV_SYN -> SYN_RCVD LISTEN: APP_SEND -> SYN_SENT LISTEN: APP_CLOSE -> CLOSED SYN_RCVD: APP_CLOSE -> FIN_WAIT_1 SYN_RCVD: RCV_ACK -> ESTABLISHED SYN_SENT: RCV_SYN -> SYN_RCVD SYN_SENT: RCV_SYN_ACK -> ESTABLISHED SYN_SENT: APP_CLOSE -> CLOSED ESTABLISHED: APP_CLOSE -> FIN_WAIT_1 ESTABLISHED: RCV_FIN -> CLOSE_WAIT FIN_WAIT_1: RCV_FIN -> CLOSING FIN_WAIT_1: RCV_FIN_ACK -> TIME_WAIT FIN_WAIT_1: RCV_ACK -> FIN_WAIT_2 CLOSING: RCV_ACK -> TIME_WAIT FIN_WAIT_2: RCV_FIN -> TIME_WAIT TIME_WAIT: APP_TIMEOUT -> CLOSED CLOSE_WAIT: APP_CLOSE -> LAST_ACK LAST_ACK: RCV_ACK -> CLOSED
Code language: HTTP (http)

Examples

["APP_PASSIVE_OPEN", "APP_SEND", "RCV_SYN_ACK"] => "ESTABLISHED" ["APP_ACTIVE_OPEN"] => "SYN_SENT" ["APP_ACTIVE_OPEN", "RCV_SYN_ACK", "APP_CLOSE", "RCV_FIN_ACK", "RCV_ACK"] => "ERROR"
Code language: Python (python)

See wikipedia page Transmission Control Protocol for further details.

The solution in Python code

Option 1:

STATE_TO_COMMANDS = { 'CLOSED': { 'APP_PASSIVE_OPEN': 'LISTEN', 'APP_ACTIVE_OPEN': 'SYN_SENT' }, 'LISTEN': { 'RCV_SYN': 'SYN_RCVD', 'APP_SEND': 'SYN_SENT', 'APP_CLOSE': 'CLOSED' }, 'SYN_RCVD': { 'APP_CLOSE': 'FIN_WAIT_1', 'RCV_ACK': 'ESTABLISHED' }, 'SYN_SENT': { 'RCV_SYN': 'SYN_RCVD', 'RCV_SYN_ACK': 'ESTABLISHED', 'APP_CLOSE': 'CLOSED' }, 'ESTABLISHED': { 'APP_CLOSE': 'FIN_WAIT_1', 'RCV_FIN': 'CLOSE_WAIT' }, 'FIN_WAIT_1': { 'RCV_FIN': 'CLOSING', 'RCV_FIN_ACK': 'TIME_WAIT', 'RCV_ACK': 'FIN_WAIT_2' }, 'CLOSING': { 'RCV_ACK': 'TIME_WAIT' }, 'FIN_WAIT_2': { 'RCV_FIN': 'TIME_WAIT' }, 'TIME_WAIT': { 'APP_TIMEOUT': 'CLOSED' }, 'CLOSE_WAIT': { 'APP_CLOSE': 'LAST_ACK' }, 'LAST_ACK': { 'RCV_ACK': 'CLOSED' } } def traverse_TCP_states(events): state = "CLOSED" # initial state, always for event in events: if event not in STATE_TO_COMMANDS[state]: return 'ERROR' state = STATE_TO_COMMANDS[state][event] return state
Code language: Python (python)

Option 2:

STATES = {"CLOSED": {"APP_PASSIVE_OPEN": "LISTEN", "APP_ACTIVE_OPEN": "SYN_SENT"}, "LISTEN": {"RCV_SYN": "SYN_RCVD", "APP_SEND": "SYN_SENT", "APP_CLOSE": "CLOSED"}, "SYN_RCVD": {"APP_CLOSE": "FIN_WAIT_1", "RCV_ACK": "ESTABLISHED"}, "SYN_SENT": {"RCV_SYN": "SYN_RCVD", "RCV_SYN_ACK": "ESTABLISHED", "APP_CLOSE": "CLOSED"}, "ESTABLISHED":{"APP_CLOSE": "FIN_WAIT_1", "RCV_FIN": "CLOSE_WAIT"}, "FIN_WAIT_1": {"RCV_FIN": "CLOSING", "RCV_FIN_ACK": "TIME_WAIT", "RCV_ACK": "FIN_WAIT_2"}, "CLOSING": {"RCV_ACK": "TIME_WAIT"}, "FIN_WAIT_2": {"RCV_FIN": "TIME_WAIT"}, "TIME_WAIT": {"APP_TIMEOUT": "CLOSED"}, "CLOSE_WAIT": {"APP_CLOSE": "LAST_ACK"}, "LAST_ACK": {"RCV_ACK": "CLOSED"}, } def traverse_TCP_states(events): state = "CLOSED" try: for e in events: state = STATES[state][e] return state except KeyError: return "ERROR"
Code language: Python (python)

Option 3:

def traverse_TCP_states(events, state='CLOSED'): for event in events: state = {('CLOSED', 'APP_PASSIVE_OPEN'):'LISTEN', ('CLOSED', 'APP_ACTIVE_OPEN'): 'SYN_SENT', ('LISTEN', 'RCV_SYN'):'SYN_RCVD', ('LISTEN', 'APP_SEND'):'SYN_SENT', ('LISTEN', 'APP_CLOSE'):'CLOSED', ('SYN_RCVD', 'APP_CLOSE'):'FIN_WAIT_1', ('SYN_RCVD', 'RCV_ACK'):'ESTABLISHED', ('SYN_SENT', 'RCV_SYN'):'SYN_RCVD', ('SYN_SENT', 'RCV_SYN_ACK'):'ESTABLISHED', ('SYN_SENT', 'APP_CLOSE'):'CLOSED', ('ESTABLISHED', 'APP_CLOSE'):'FIN_WAIT_1', ('ESTABLISHED', 'RCV_FIN'):'CLOSE_WAIT', ('FIN_WAIT_1', 'RCV_FIN'):'CLOSING', ('FIN_WAIT_1', 'RCV_FIN_ACK'):'TIME_WAIT', ('FIN_WAIT_1', 'RCV_ACK'):'FIN_WAIT_2', ('CLOSING', 'RCV_ACK'):'TIME_WAIT', ('FIN_WAIT_2', 'RCV_FIN'):'TIME_WAIT', ('TIME_WAIT', 'APP_TIMEOUT'):'CLOSED', ('CLOSE_WAIT', 'APP_CLOSE'):'LAST_ACK', ('LAST_ACK', 'RCV_ACK'):'CLOSED'}.get((state, event), 'ERROR') return state
Code language: Python (python)

Test cases to validate our solution

test.assert_equals(traverse_TCP_states(["APP_ACTIVE_OPEN","RCV_SYN_ACK","RCV_FIN"]), "CLOSE_WAIT") test.assert_equals(traverse_TCP_states(["APP_PASSIVE_OPEN", "RCV_SYN","RCV_ACK"]), "ESTABLISHED") test.assert_equals(traverse_TCP_states(["APP_ACTIVE_OPEN","RCV_SYN_ACK","RCV_FIN","APP_CLOSE"]), "LAST_ACK") test.assert_equals(traverse_TCP_states(["APP_ACTIVE_OPEN"]), "SYN_SENT") test.assert_equals(traverse_TCP_states(["APP_PASSIVE_OPEN","RCV_SYN","RCV_ACK","APP_CLOSE","APP_SEND"]), "ERROR")
Code language: Python (python)

Additional test cases

@test.describe("Fixed tests") def fixed_tests(): test.assert_equals(traverse_TCP_states(["APP_PASSIVE_OPEN", "RCV_SYN","RCV_ACK", "APP_CLOSE"]),"FIN_WAIT_1") test.assert_equals(traverse_TCP_states(["APP_PASSIVE_OPEN", "RCV_SYN","RCV_ACK"]), "ESTABLISHED") test.assert_equals(traverse_TCP_states(["APP_PASSIVE_OPEN", "RCV_SYN"]), "SYN_RCVD") test.assert_equals(traverse_TCP_states(["APP_PASSIVE_OPEN"]), "LISTEN") test.assert_equals(traverse_TCP_states(["APP_ACTIVE_OPEN","APP_CLOSE"]), "CLOSED") test.assert_equals(traverse_TCP_states(["APP_ACTIVE_OPEN","RCV_SYN","APP_CLOSE","RCV_FIN","RCV_ACK"]), "TIME_WAIT") test.assert_equals(traverse_TCP_states(["APP_ACTIVE_OPEN","RCV_SYN","APP_CLOSE","RCV_FIN","RCV_ACK","APP_TIMEOUT"]), "CLOSED") test.assert_equals(traverse_TCP_states(["RCV_SYN","RCV_ACK","APP_CLOSE"]),"ERROR") test.assert_equals(traverse_TCP_states(["APP_ACTIVE_OPEN","RCV_SYN","APP_CLOSE","RCV_ACK"]), "FIN_WAIT_2") test.assert_equals(traverse_TCP_states(["APP_ACTIVE_OPEN","RCV_SYN_ACK","RCV_FIN"]), "CLOSE_WAIT") test.assert_equals(traverse_TCP_states(["APP_ACTIVE_OPEN","RCV_SYN_ACK","RCV_FIN","APP_CLOSE"]), "LAST_ACK") test.assert_equals(traverse_TCP_states(["APP_ACTIVE_OPEN"]), "SYN_SENT") test.assert_equals(traverse_TCP_states(["APP_PASSIVE_OPEN","APP_CLOSE"]), "CLOSED") test.assert_equals(traverse_TCP_states(["APP_ACTIVE_OPEN","RCV_SYN_ACK","APP_CLOSE"]), "FIN_WAIT_1") test.assert_equals(traverse_TCP_states(["APP_PASSIVE_OPEN","RCV_SYN","RCV_ACK","APP_PASSIVE_OPEN"]), "ERROR") test.assert_equals(traverse_TCP_states(["APP_PASSIVE_OPEN","RCV_SYN","RCV_ACK","APP_CLOSE","RCV_FIN_ACK","APP_TIMEOUT","APP_ACTIVE_OPEN","RCV_SYN","APP_CLOSE","RCV_FIN","RCV_ACK"]), "TIME_WAIT") test.assert_equals(traverse_TCP_states(["APP_PASSIVE_OPEN","RCV_SYN","RCV_ACK","APP_CLOSE","RCV_SYN"]), "ERROR") test.assert_equals(traverse_TCP_states(["APP_PASSIVE_OPEN","APP_CLOSE","RCV_SYN"]), "ERROR") test.assert_equals(traverse_TCP_states(["APP_PASSIVE_OPEN","RCV_SYN","RCV_ACK","APP_CLOSE"]), "FIN_WAIT_1") test.assert_equals(traverse_TCP_states(["APP_PASSIVE_OPEN","RCV_SYN","RCV_ACK","APP_CLOSE","RCV_FIN"]), "CLOSING") @test.describe("Random tests") def random_tests(): from random import randint as rand, choice STATES = {"CLOSED": {"APP_PASSIVE_OPEN": "LISTEN", "APP_ACTIVE_OPEN": "SYN_SENT"}, "LISTEN": {"RCV_SYN": "SYN_RCVD", "APP_SEND": "SYN_SENT", "APP_CLOSE": "CLOSED"}, "SYN_RCVD": {"APP_CLOSE": "FIN_WAIT_1", "RCV_ACK": "ESTABLISHED"}, "SYN_SENT": {"RCV_SYN": "SYN_RCVD", "RCV_SYN_ACK": "ESTABLISHED", "APP_CLOSE": "CLOSED"}, "ESTABLISHED":{"APP_CLOSE": "FIN_WAIT_1", "RCV_FIN": "CLOSE_WAIT"}, "FIN_WAIT_1": {"RCV_FIN": "CLOSING", "RCV_FIN_ACK": "TIME_WAIT", "RCV_ACK": "FIN_WAIT_2"}, "CLOSING": {"RCV_ACK": "TIME_WAIT"}, "FIN_WAIT_2": {"RCV_FIN": "TIME_WAIT"}, "TIME_WAIT": {"APP_TIMEOUT": "CLOSED"}, "CLOSE_WAIT": {"APP_CLOSE": "LAST_ACK"}, "LAST_ACK": {"RCV_ACK": "CLOSED"}, } START, ERROR = 'CLOSED', 'ERROR' ALL_CMDS = "APP_PASSIVE_OPEN, APP_ACTIVE_OPEN, APP_SEND, APP_CLOSE, APP_TIMEOUT, RCV_SYN, RCV_ACK, RCV_SYN_ACK, RCV_FIN, RCV_FIN_ACK".split(", ") def doTest(nTest, cmdMin, cmdMax, endProba): for _ in range(nTest): state, cmds = START, [] for _ in range(rand(cmdMin,cmdMax)): endIt = rand(0,100) > endProba last = choice(ALL_CMDS if endIt else list(STATES[state].keys()) ) state = STATES[state].get(last, ERROR) cmds.append(last) if endIt: break test.assert_equals(traverse_TCP_states(cmds), state) doTest(100,2,4,80) doTest(100,10,50,98)
Code language: Python (python)
Tags:
Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments