blob: c46b21c49c42f43d2feecd24f7c15c2bc98dbf11 [file] [log] [blame]
Huaihong Leia40d4ba2023-04-14 15:13:15 +08001#!/usr/bin/python3 -B
2
3import argparse
4import binascii
5import os
6import pathlib
7import struct
8import sys
9
10from cryptography.hazmat.primitives import hashes
11from cryptography.hazmat.primitives import serialization
12from cryptography.hazmat.primitives.asymmetric import padding
13from cryptography.hazmat.primitives.asymmetric import utils
14from cryptography.hazmat.backends import default_backend
15
16#
17# Variables
18#
19
20#
21# Functions
22#
23
24def stderr_print(*arg, **kwargs):
25 print(*arg, file=sys.stderr, *kwargs);
26 return
27
28#
29# Classes
30#
31
32class DemoSign(object):
33
34 #
35 # Static variables
36 #
37
38 __FMT_HEADER__ = '<QIHH'
39 __MAGIC64__ = int.from_bytes(bytes.fromhex('58a848a4843ff07d'), byteorder='little')
40
41 #
42 # Initialization
43 #
44
45 def __init__(self, autorun=False):
46 self._args = None
47
48 if autorun:
49 self._parseArgs()
50 self._run()
51 return
52
53 #
54 # Methods (non-public)
55 #
56
57 def _parseArgs(self):
58 parser = argparse.ArgumentParser(description="Demo Sign")
59 parser.add_argument('INFILE_PRIVKEY', help='Input file of private RSA key')
60
61 self._args = parser.parse_args()
62 # print(self._args)
63 # print(vars(self._args))
64 # print(vars(self._args)['INFILE_PRIVKEY'])
65 # print(self._args.INFILE_PRIVKEY)
66
67 def _run(self):
68 self._recv_header()
69 self._recv_payload()
70
71 self._process()
72
73 self._send_header()
74 self._send_payload()
75 return
76
77 def _recv_header(self):
78 sz_hdr = struct.calcsize(type(self).__FMT_HEADER__)
79 buf = sys.stdin.buffer.read(sz_hdr)
80 assert len(buf) == sz_hdr
81
82 (
83 magic,
84 self._sz_payload,
85 self._seq_no,
86 self._flags
87 ) = struct.unpack(type(self).__FMT_HEADER__, buf)
88 # stderr_print('magic: 0x{:x}'.format(magic))
89 assert magic == type(self).__MAGIC64__
90 assert (magic % 2) == 0
91 assert (self._seq_no % 2) == 0
92
93 return
94
95 def _recv_payload(self):
96 self._buf_digest = sys.stdin.buffer.read(self._sz_payload)
97 assert len(self._buf_digest) == self._sz_payload
98 return
99
100 def _process(self):
101 # Load private key
102 with open(self._args.INFILE_PRIVKEY, "rb") as key_file:
103 stderr_print('RSA key file: {}'.format(self._args.INFILE_PRIVKEY))
104 private_key = serialization.load_pem_private_key(
105 key_file.read(), backend=default_backend(),
106 password=None,
107 )
108 del key_file
109
110 # Signing
111 assert len(self._buf_digest) == 32
112 # stderr_print(self._buf_digest.hex())
113 sig = private_key.sign(
114 self._buf_digest,
115 padding.PSS(
116 mgf=padding.MGF1(hashes.SHA256()),
117 salt_length=32
118 ),
119 utils.Prehashed(hashes.SHA256())
120 )
121 self._buf_sig = sig
122 return
123
124 def _send_header(self):
125 sz_hdr = struct.calcsize(type(self).__FMT_HEADER__)
126
127 self._seq_no += 1
128 sz_payload = len(self._buf_sig)
129 buf = struct.pack(type(self).__FMT_HEADER__,
130 type(self).__MAGIC64__ + 1,
131 sz_payload,
132 self._seq_no,
133 0
134 )
135 assert len(buf) == sz_hdr
136
137 sz_out = sys.stdout.buffer.write(buf)
138 assert sz_out == sz_hdr
139
140 return
141
142 def _send_payload(self):
143 # stderr_print(self._buf_sig.hex())
144 sz_out = sys.stdout.buffer.write(self._buf_sig)
145 with open('test.sig', 'wb') as file2:
146 file2.write(self._buf_sig)
147 file2.close()
148
149 assert sz_out == len(self._buf_sig)
150 return
151
152 #
153 # Methods (public)
154 #
155
156#
157# Main
158#
159
160if __name__ == '__main__':
161 DemoSign(True)
162
163# vim: set expandtab ai tabstop=4 shiftwidth=4 softtabstop=-1: