aboutsummaryrefslogtreecommitdiff
path: root/debug/rbb_daisychain.py
blob: 4d9ea8a7d5a47d458118d6ca2e2b5322e230dcdf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#!/usr/bin/python3

import argparse
import sys
import socket

# https://github.com/ntfreak/openocd/blob/master/doc/manual/jtag/drivers/remote_bitbang.txt

class Tap:
    def __init__(self, port):
        self.port = port
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.connect(("localhost", port))

    def execute(self, commands):
        sent = self.socket.send(commands)
        assert len(commands) == sent
        read_count = 0
        for command in commands:
            if command == ord('R'):
                read_count += 1
        result = b""
        while len(result) < read_count:
            result += self.socket.recv(read_count - len(result))
        assert len(result) == read_count
        return result

class Chain:
    def __init__(self, debug=False):
        self.debug = debug
        self.taps = []

    def append(self, tap):
        self.taps.append(tap)

    def execute(self, commands):
        values = []
        for i, tap in enumerate(self.taps):
            tmp_commands = []
            for command in commands:
                if i > 0 and ord('0') <= command <= ord('7'):
                    # Replace TDI with the value from the previous TAP.
                    v = values.pop(0)
                    command &= 0xfe
                    if v == ord('1'):
                        command |= 1

                if i < len(self.taps) - 1:
                    if command != ord('R'):
                        tmp_commands.append(command)
                    if ord('0') <= command <= ord('7'):
                        # Read TDO before every scan.
                        tmp_commands.append(ord('R'))
                else:
                    tmp_commands.append(command)
            assert len(values) == 0
            values = list(tap.execute(bytes(tmp_commands)))
            if self.debug:
                sys.stdout.write(f"    {i} {bytes(tmp_commands)!r} -> "
                                 f"{bytes(values)!r}\n")
        return bytes(values)

def main():
    parser = argparse.ArgumentParser(
            description='Combine multiple remote_bitbang processes into a '
            'single scan-chain.')
    parser.add_argument("listen_port", type=int,
            help="port to listen on")
    parser.add_argument("tap_port", nargs="+", type=int,
            help="port of a remote_bitbang TAP to connect to")
    parser.add_argument("--debug", action='store_true',
                        help="Print out debug messages.")
    args = parser.parse_args()

    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(("localhost", args.listen_port))
    server.listen(1)

    chain = Chain(args.debug)
    for port in args.tap_port:
        chain.append(Tap(port))

    sys.stdout.write(f"Listening on port {server.getsockname()[1]}.\n")
    sys.stdout.flush()

    while True:
        (client, _) = server.accept()

        while True:
            try:
                commands = client.recv(4096)
            except (ConnectionResetError, OSError):
                sys.stdout.write("Client disconnected due to exception.\n")
                break

            if len(commands) == 0:
                sys.stdout.write("Client disconnected.\n")
                break

            if args.debug:
                sys.stdout.write(f"{commands!r}\n")
            result = chain.execute(commands)
            if args.debug:
                sys.stdout.write(f"   -> {result!r}\n")
            client.send(result)

        client.close()
        sys.stdout.flush()

if __name__ == '__main__':
    sys.exit(main())