diff --git a/.ci/certs/ca_certificate.pem b/.ci/certs/ca_certificate.pem new file mode 100644 index 0000000..f1ae0b2 --- /dev/null +++ b/.ci/certs/ca_certificate.pem @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDhjCCAm6gAwIBAgIUJ2lTbiccSFtA9+8eGPQD5yGJ7w8wDQYJKoZIhvcNAQEL +BQAwTDE7MDkGA1UEAwwyVExTR2VuU2VsZlNpZ25lZHRSb290Q0EgMjAyMy0xMC0w +OFQwODoxNjowMy41OTA0NTQxDTALBgNVBAcMBCQkJCQwHhcNMjMxMDA4MTUxNjAz +WhcNMzMxMDA1MTUxNjAzWjBMMTswOQYDVQQDDDJUTFNHZW5TZWxmU2lnbmVkdFJv +b3RDQSAyMDIzLTEwLTA4VDA4OjE2OjAzLjU5MDQ1NDENMAsGA1UEBwwEJCQkJDCC +ASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBANdiiGj37094gAHfVpbIQHfu +ccBVozpexrYjDCbjw4IyJJOajJRNGbYZwEt3Jt5NaDc+zyoBZpKaZWDEjOxbNYkd +MtIHyFW4V4ooA6pySR9pzMI91dXoCkzL9Ex23Zrj0KF70qBQuPTbF5bnAbMELFuv +quFnfMw2ALsFrWh2DOwnMlt1hbdj6Iapl2yRGhVSgsr72SK+67b+b7WH02VGDrfm +Y3qqx3xAI6woKSE2Ot14Csak/iR1xit68X5GhzvSdOos0Yo3I4v8mlFEO+kpKWB0 +7y3Hb5AU/hqvSOwLRA+CV09bxN4N5rOfFHkPVuVMXQzX9mLCxzxroZn/sQzkrtMC +AwEAAaNgMF4wDwYDVR0TAQH/BAUwAwEB/zALBgNVHQ8EBAMCAQYwHQYDVR0OBBYE +FNSsn21DVr1XhhqmU+wMnLWFZc55MB8GA1UdIwQYMBaAFNSsn21DVr1XhhqmU+wM +nLWFZc55MA0GCSqGSIb3DQEBCwUAA4IBAQDRc1mAERvR1VPOaMevcB2pGaQfRLkN +fYgiO7rxd77FIQLieCbNNZ6z/fDRkBjgZ9xzAWl0vFJ0xZ0FSdUtAXEa9ES7r7eq +XOSW/5CRPeib4nMDeQvTSiCx5QqlIz4oUwW9bbpPcBQXM0IVZwo1Jbye/BGrjAmQ +Z3a5ph0f85Shjy2Yt9JB9BDCWOK8EU294CiKMUvdtQwSaQpl8GQfmvzWKAL4encu +ryEAPTDT9zuQi2bOCDY5QMwVNS6mDAsqbvMjOaHD/Cdzl26rgv+8QLVNDUvGfGtD +58bWugHyxCdnDToCtIEaJaoi7izKd0bILbuQXS7oKfryJpHwO+9U8ZjT +-----END CERTIFICATE----- diff --git a/.ci/certs/ca_key.pem b/.ci/certs/ca_key.pem new file mode 100644 index 0000000..75349f7 --- /dev/null +++ b/.ci/certs/ca_key.pem @@ -0,0 +1,30 @@ +-----BEGIN ENCRYPTED PRIVATE KEY----- +MIIFHDBOBgkqhkiG9w0BBQ0wQTApBgkqhkiG9w0BBQwwHAQIm6kLjkvzznECAggA +MAwGCCqGSIb3DQIJBQAwFAYIKoZIhvcNAwcECGjVddOBQ1QOBIIEyOAafHUtExxT +tY2ONQkUkXZG4/fIDvuNwt8IIkNUIGVp9WEDd4Mh5Ofa52uUKmlhj+FyRZ6u2mGT +VHU65e4kBYB10n0oybRPvRU1tFxgr8qI0T7Fqnx7WJAP3m0Bo/tWfqE0GHRrspZV +gABLVTOFvHE8oOsEh/ndMe+Y2qGaLsl+MF3jkfYAxSK2QwEK9HDa16Xsit7hqVbz +JUyvBmQVfTZzanIall+EpUntv/vlILKIlAFOZUXIZ/iL8LTQCmpycfGLknr4/9KP +gCYZmWFS18X9KVAwgV2kSdUebWH9phDosSw6fZh843l1SQvjG65PgrnWYb6Fw7B4 +s3Nk6bXjHYtvLT19EUrQOdeOegynaQQBs5WIcp9LbKT3LJVQpaVGV9thi+LPz1Bu +Lep583ayXTecA7Dbfa6S9R97TgRoMdDWaz1kTBReQTUhrL5736A38gpwJeBZDqel +39sRULCKARz2ZX0YpeZCmfVhVVSguO5gCfACsqHoOiTxYOA97GR128BcpEVJ1lst +sZZNwT3m6xIcXbS37EImhUMGiQ5fyGZ+8FIozTL9xNopIR97b3ceA9CoLc7EVcFC +RxHvh1HwtpyBDyopJp2wYu31nqcSDsJh+lmjo5R7bqvDDmflfkfu1G45JkXKr3Vz +M89S/y6Uo8W/EYT2MPYTsqcobtjx6oM1RYkVuYTR6cyUgQkHGtptkzGKxYE8dYwQ +4EIm87czYvCW0Mrp6yy0NGKzqBb+19Kuqc0HO+YezEQ8RjOVb8+D+cuCp2ZSItJ/ +S9m7BDTOzTS2lBotrFVkSbzaQafAmxQiaSP7gd0M9dnC0AOB2ILbyRAyIDQ0Y2dm +kMbiewQwNFiY9moRtgzHuHRfFZu4w996Q20cYZyMbxDfY17QoZQzfKWQH1BD7nq5 +G4RFpInt6q4q0F94nQWCif195VZF64+8ETMteJqtBFhUSQbq7PzKdpuf8NFxczLt +MDEWg2l6qNLP+zswulcVbFcC/HxAu4UtYf2m5MAtaurXZZ/+xPW5c/0caWMycQ7g +fbkYvC4j0OT7aqqMd1SYzEx7l75Vqn1sr2BsXZFoaqK2c/1LIb6U1kAhyhDQ46rV +0v6q4GUk4fdnE4N+9MXWBvlKSnqEVYlE54IuSUrYRuuBhO4LQpPMOAafQPR6QCTI +ikqWVmLAj50n7uba0Ao9lRKR7bFpdOQob/nYMTKT6YQaohYhbCv4zIK9fDgWWiXE +a2ecIP3KiZzw7oLMKXLcDt1RkkzE1FQxLfOeZ5EP4RwBGPDvR88ELO+lGQQt3VnS +FIZoXBUFUf7bEUzTwM4240zkjDYQPxD1j769Zq/JZfKyOEXXOJT8xHiwMg3ARWuE +hGlNKApbJGMn9myC61KaGMyCKRvMVxI25w3LfI4OAWt+67BB5OuAG11nmn9Kja73 +bhMFDIMZ8kE0p9IWfpiUJlDB9odGEc4z3Jl5CqBVDkMCDxq9BQDM0hSDk+ov8FO9 +g03PqMxvsxd2c56vkMtNY4hSGkYfN0RsM3vTXXLtPwRwRZURCmKK76BmsT4oBd+W +orqH4SABIAbYTwNOb7k/wOc4EfucawBqMG4g+29qewD67+EXjB0GadqOXRoQyhRq +hd74uUK5gzJOqStqiowQ0A== +-----END ENCRYPTED PRIVATE KEY----- diff --git a/.ci/certs/client_localhost.p12 b/.ci/certs/client_localhost.p12 new file mode 100644 index 0000000..1d688c5 Binary files /dev/null and b/.ci/certs/client_localhost.p12 differ diff --git a/.ci/certs/client_localhost_certificate.pem b/.ci/certs/client_localhost_certificate.pem new file mode 100644 index 0000000..1af5a9f --- /dev/null +++ b/.ci/certs/client_localhost_certificate.pem @@ -0,0 +1,22 @@ +-----BEGIN CERTIFICATE----- +MIIDvDCCAqSgAwIBAgIBAjANBgkqhkiG9w0BAQsFADBMMTswOQYDVQQDDDJUTFNH +ZW5TZWxmU2lnbmVkdFJvb3RDQSAyMDIzLTEwLTA4VDA4OjE2OjAzLjU5MDQ1NDEN +MAsGA1UEBwwEJCQkJDAeFw0yMzEwMDgxNTE2MDNaFw0zMzEwMDUxNTE2MDNaMCUx +EjAQBgNVBAMMCWxvY2FsaG9zdDEPMA0GA1UECgwGY2xpZW50MIIBIjANBgkqhkiG +9w0BAQEFAAOCAQ8AMIIBCgKCAQEAxoOGcKsURRZG0D89J8rGcolZVqX56rDgA0Ma +cn4AosMQTZ86XAq+Ygn6QVcFV3NjuHxb29vsZfjSYbBpgQNLfpXN9EfeswVvaJND +wblKdRo10RTPslFewI4Aac88GXva+3DBMCwv3viI2S69apcuZgGw0+EKDh+JmbcM +sdH81hZhYjmrS529qSOIji8vJYFTCQPMbGN17elnA7pZaHEmPKj5mzm0veSBvCwU +OZORr4eFE7Nct5RmhLm8DWT0EBRUWT8D6/b6+0ln32Yv30YNpKrua5wkn+kxsvKJ +tQRRKYRyfegSj6mo6L4za1ZvwV/JMN5mDLQUajvtOCsD4NpKcQIDAQABo4HPMIHM +MAkGA1UdEwQCMAAwCwYDVR0PBAQDAgWgMBMGA1UdJQQMMAoGCCsGAQUFBwMCMCoG +A1UdEQQjMCGCCWxvY2FsaG9zdIIJUFJPS09GSUVWgglsb2NhbGhvc3QwMQYDVR0f +BCowKDAmoCSgIoYgaHR0cDovL2NybC1zZXJ2ZXI6ODAwMC9iYXNpYy5jcmwwHQYD +VR0OBBYEFLPquWS+kT4+JE+cssrriRkL9UADMB8GA1UdIwQYMBaAFNSsn21DVr1X +hhqmU+wMnLWFZc55MA0GCSqGSIb3DQEBCwUAA4IBAQC1Pz8SahCsQyiyuu6dz391 +KENabMpCwb/2wxljN5lfkOvvUrVmupld8/5nIdN2drL9jCrfbBz5ZRz+9Ryb8yrc +sioH8Y9RNU5Gc3UJo7aAoMx4sIib6uJ+UO4fVlVvD4cN2h2sLHxtkI173Oo7lnMf +4c+75iyZYdkEDXaOk+UbR8dncCj84y1Sbt0FYfCMT688O4HYkIGA3xGmqyX7PYV/ +CP8CNKwJEuZpQRaGdClkmAmoEPyuFW9ec+A9gOrgCpuFJBI4MRcicC5Q+qmx+LTM +pZ2louMnnlTRoj3tL4aDgfdwV0YGxyIjIzuYLy6QCF8MZ/TLwPK0C3oXXuYmCLBO +-----END CERTIFICATE----- diff --git a/.ci/certs/client_localhost_key.pem b/.ci/certs/client_localhost_key.pem new file mode 100644 index 0000000..241f86f --- /dev/null +++ b/.ci/certs/client_localhost_key.pem @@ -0,0 +1,30 @@ +-----BEGIN ENCRYPTED PRIVATE KEY----- +MIIFLTBXBgkqhkiG9w0BBQ0wSjApBgkqhkiG9w0BBQwwHAQIqKZZASlLYRICAggA +MAwGCCqGSIb3DQIJBQAwHQYJYIZIAWUDBAEqBBCQpWBZXmYQn0c6PZ4CnLrQBIIE +0HwXxx0lDzPbw53k/ak73G4CwBilSpaIM5x7jNwwD7UhiR4Qo9JiYLRy2zn0RQZJ +wK/Hhta3SKecTHqgMwPHk8s4Bu6EhSIm3/x2OhAtk2lLeubZkjgEKCfQbu4tVpeH +jOw66Pxz52fhdJ7GzaTnWjjTYmEPxNpkRiUAe0v+lOD09OQvQIFVEDyqSATzRUjd +GTvQs8H5N/XJR7xTuPRQekauY5gIcneE4oynGF5a9L870XfLh/H62f+pD19rvESh +qqdCxklxwAfHGHni2p1UKgNPxJHzSMH9dGCAGT1fxLg0RtXfBMdl3gzPnwbZ1PmB +tjVxCqtw7XAirdlBX79+dhZ58HCN+j7pkL9LWwRap1klN7Y+Iwf0XhK6imSY7Ex4 +4odxin7kF1yW65PTYKyS7cRuFip+k2YShXApN5PrF5SqNEFVt0A9RG7h+GF7EXSD +QS0ecqwhnzuHGHSpBjvsEw9z3FWBL1tFC1i2cF7m3yTHDLVQoevkUY43Fmh8S/CZ +gthQ9P58A3dIDSJM0vcGhHqJBLbxOF7rSqwIuihZJhBfqclw1V4fKk9VuRzp4MHf +NrZEuCr8CTrcYnl2n6Z/MaJ33XRg8uwwy+O5RGF1I1GAmH2KdKORUtrYHlOdTd4K +2NXEgy2mgDQYPbl/1tk8bH6hroIY9Qofpzi7MTZ++32AY3ggf4GnqAk4eAP5R3Ey +PUYFtWaGftaOQCR5Ovocdn41YitUJxAPh6hE5HqVicO2rEfx13uzug9usdg6256i +GgKSTg4jqBiEw0oJhb9TVYNY44koh9yMRM/sfidqarNKWU7bWDVKhl3hGaNhj+oX +v6ZC8rH6m/zHRtbn7tAw/q+EtTHmLo2AaUf13V4Ii6VrEXMRSlv/AyipYmOIwgV2 +EZriwyhsT2RaVesAgKExHbnP6dzX2P2IGTMNISZDNlATMT01BfWG/loPe+6DbxzW +aHv6Y0FknGeHGLDwiZMv/hyn8a4KOvIl35YZBJqZ8UxTirs9mLRd4Us5CdXAHQlL +5skAzf5FSrVbQvUbvKIrO+ULGB5mDATHR/tgOWVaP656tiRMrtFW8XGNxaPjyDPt +xhA3fVOc68f1UzTqoGpsZtUUMQxkndW3Tg50V4ssw4F9D4Grce9XXgfBdEFz/Gfc +gSR4SYKelS5udrMvqKxUs+zobx8TH2CqzwDDcC0kxqC9VCMnHqaD3wSMbN/RBoYT +lkD4DRmDFTwlsQd80i2j6K0eDFo7uvROWM72gAOb/wmssZBaSF3g0E5CrNSxApkz ++VhgqfGBYDrFZijMHiCw+XB8kFFBrlXlcBOUHu1trIi7nwcmN1JvnXL0dVOgfSGE +VNAmVz/sHdeAJacf05tehkAFTubdiZ24M/mM+VbKiJ2dajYRSoePPc8P65urlv6E +rszIC9NhfwBy0TDgXNC/GV3y8mC7rp6kzbyzEb2H2M5ltyEKOIyjvRvtks2/opSX +5T42x6xJtS6qTRttwrpRE5KjBHgcq0m8LSQu6chKwWinFUfOAdcZODvVVqLA2e6K +plfcGb027WE4DHqTzW73nbnK+NwkP3lLORbro7KWDFdNKP3v/Rx0Uq7CPGVN994G +tH7wyGheZNTMtKDFGrAdOqse9/sKcTF3thaqYqXgDDZY +-----END ENCRYPTED PRIVATE KEY----- diff --git a/.ci/certs/server_localhost.p12 b/.ci/certs/server_localhost.p12 new file mode 100644 index 0000000..d77b7dc Binary files /dev/null and b/.ci/certs/server_localhost.p12 differ diff --git a/.ci/certs/server_localhost_certificate.pem b/.ci/certs/server_localhost_certificate.pem new file mode 100644 index 0000000..a67718d --- /dev/null +++ b/.ci/certs/server_localhost_certificate.pem @@ -0,0 +1,22 @@ +-----BEGIN CERTIFICATE----- +MIIDvDCCAqSgAwIBAgIBATANBgkqhkiG9w0BAQsFADBMMTswOQYDVQQDDDJUTFNH +ZW5TZWxmU2lnbmVkdFJvb3RDQSAyMDIzLTEwLTA4VDA4OjE2OjAzLjU5MDQ1NDEN +MAsGA1UEBwwEJCQkJDAeFw0yMzEwMDgxNTE2MDNaFw0zMzEwMDUxNTE2MDNaMCUx +EjAQBgNVBAMMCWxvY2FsaG9zdDEPMA0GA1UECgwGc2VydmVyMIIBIjANBgkqhkiG +9w0BAQEFAAOCAQ8AMIIBCgKCAQEA2dxp0wR++oE89W/mhEL7/XfJfo8iDbKKciUP +PyIgBvggv625HifmEJG+epl77KinbCuZdc0DX/2FKH6HPM/tC6VcWB2cZRSHpBSM +aieRV4yiaUFTqlOgQalJyRczRtv35QPdaIcDOX4lOw887sn6sJuZY5FtAyDr3opA +gZWLR+6fqi0YWqp5wqaz3hMzTGEEuu/ZKSqMWURRvp+Voz13auiShvhRb9hsdRp0 +zf12Y9wGhWjOg7G6v1r/BP6/Nr1gWrgNUhuomSFC1FCRdCr1VrLpUfG3VNloVEOG +mbWYfo+cDN6fV+PDlVB5UQp9YciFfpGXBzSXgNcsk8fEXpg8IQIDAQABo4HPMIHM +MAkGA1UdEwQCMAAwCwYDVR0PBAQDAgWgMBMGA1UdJQQMMAoGCCsGAQUFBwMBMCoG +A1UdEQQjMCGCCWxvY2FsaG9zdIIJUFJPS09GSUVWgglsb2NhbGhvc3QwHQYDVR0O +BBYEFPezEEGf7j3HedbaRCh4/FHT2VXrMB8GA1UdIwQYMBaAFNSsn21DVr1Xhhqm +U+wMnLWFZc55MDEGA1UdHwQqMCgwJqAkoCKGIGh0dHA6Ly9jcmwtc2VydmVyOjgw +MDAvYmFzaWMuY3JsMA0GCSqGSIb3DQEBCwUAA4IBAQBLeagmroj4FFOXgUqDQo7i +kGCBZuCmn6GnCYdwEHtMoysGZ3vNFsB1BCug4fTuL7OU1l+Xw8iVnIvnGBpKypmt +b7h9dN6urty0ewCS4WO8BTZUIdc1RJMo9N+nEMTja+5cqXHtO/VQnO2eqeALWJUU +IDPycb6HcTkHGFX0QDwxsPuMFL3p5HGr6U0llLF0J5FedxUA/YLLVCStofrWvBGT +PKngh7S6ntaIUnTvwyzY2kPJ+byqRDNrL5jdavw1U8cGh1vi3k9mf1Uloi0mnAMT +kqOPzbQmHIQjxIOwqp2xkObXgqz1b0KNDfRDTwp90wzVxOCF5JJBCAIjPyLuncDv +-----END CERTIFICATE----- diff --git a/.ci/certs/server_localhost_key.pem b/.ci/certs/server_localhost_key.pem new file mode 100644 index 0000000..a49d186 --- /dev/null +++ b/.ci/certs/server_localhost_key.pem @@ -0,0 +1,30 @@ +-----BEGIN ENCRYPTED PRIVATE KEY----- +MIIFLTBXBgkqhkiG9w0BBQ0wSjApBgkqhkiG9w0BBQwwHAQILovSnFfKBhECAggA +MAwGCCqGSIb3DQIJBQAwHQYJYIZIAWUDBAEqBBBmLCdyyKqcbbjoi1/8A+rxBIIE +0Mmi72DP32seewlELsG4gVkOH6Gwvs5iAqHYap1yOps3mfI1TtuMhDEZDH2Sj+MB +J1E35WEzJGGxTVhvK/J+R/1fUfd44Acgl1Ks1IINJyre4+vYfDUyWB5O2lS+9mr7 +L6q7kfAbBB2OuAEuGL5GMlTRetyASXbspWbi0M+vA9R+NemYbRzFpozP/fedFpQY +6r/QnogSwuRcE1VMghUjZwzZWyG2HFMFp5emiAHRVi+SxLpIIv6wwV8SB4jDMO46 +CsyxLjkjhd2GmkMRpmIxXw7eXbWa/bnf/KhJG7gSDBgmGuoBJ4cDnQc2jFN8UqXW +IG3+K6PIeGTT/t4aC6YSq+kb8R3rTfVbPdq51Uo55uMatpJg8AatsysL900nNfuz +MejikInTz4+m6jY5kzEm+fToRNHXhcmnQeD6SYc8PNi/4QfxiMcHcI91GRNQ2nFI +Xd5a1CG4f78WGUmK9PylxBdh+1nx9yQyrZKWcShuLkOQk4UAL0w31B70/l9jVoiN +gcN4w18TUfYLIg8Ab6lL6wXipBrr1AjB/Dn2oCpMTiMolyWcsPAHDtxvrsgbsXRr +vxd/vNo+RpSsvjq2wnXhxe+qC/uHBzJeyfx0m+rs6vBKPZvS7uTBfYGG+RhVJvb5 +W2RRfprvTzgBbbKBCTJ5ry4SMZX7ci008f7oVqKLAlsApA58dDgZ+ORF4TxtdSkJ +u3r2htUBvC+mzYMYU4D+sYQ7S9qqVhKe7hvNzLW5UhkEhH57SQ1dIcstTsTYUDC7 +1o/zOkpVxByudKEGwgEtyYM+DD/YoGLGB/4qPULnHFOBwxWdK6Ov9I0ezuhe/nOA +ERe3ixLklwHRI5sM/gt57A7MiMPhFHDpqt/xO/m/uCX2VRDW/IAKXpIfxuuxDcIz +MLLxJhYCrGRHMStmBAPy3zmmhpn+wHTkwVbEVRMsh+o8M2vPelrysUtUlarRBQI+ +l5tY/UCgX0bGUvHKIp5z8GuRu/CTpjtpsyuNwtpq2TrgnmyiznyfFl4oknvEcfmF +BLUd23ZrTyn1ha8cnKXY9JSHgS2cxdU0QnkPT1BEypptf30nQ1lLqiUg9GLR+xC5 +EeHn/80gL/MrpVnWdEznJdWMzau39kqf3ajNQlUb/SX5YQaeUKYrWoLHI+UNhUG2 +5fr2vcBgk0gt7k5ZDpWejhEu0BDTf3xrE9dU2jj6hOw6E+Q5bI59QvnLYqCvqBmE +asDMBafo+/Px8xnXazFr5b5FyNqeXzBRPgRw5wFmK5YdFXU0fIpuF9IJb1TwLITp +Hk+Hn760AsT3ALzHgRzC2e6bUUO6F/iw/6s6awwRbEPpLYTHwb9Mv7efeVsGTYiM +Fi0OHapnzzbb4ErVL+92mkOT8flDoLhbKHJCRbOvu4C9awRs5aVbkEsygV67tLwu +SIgUMpdxOMYYquyCJ+WUbyv5VSyvhnUIj7u2kdH+zyAendAi4Rgx/5e4PcD62c+X +tNKp4KrlpF3jGIaPODXZVE2aIrhI0njVlUjIQRs6OOMXleO6+xWQI/1fx/xn/oKm +TBUOtW3Y7AzyojbPiScvjmT+aoVwAZ3juHnUuxEuyUcI3WokkWPpllcaGd95sCUG +7iR90VPBJ/meYyQMYY1BGq4ngi5DvLGy6K/pS5CHPi0U +-----END ENCRYPTED PRIVATE KEY----- diff --git a/.ci/publish-documentation-to-github-pages.sh b/.ci/publish-documentation-to-github-pages.sh new file mode 100755 index 0000000..76a54c2 --- /dev/null +++ b/.ci/publish-documentation-to-github-pages.sh @@ -0,0 +1,49 @@ +#!/bin/bash + +. $(pwd)/release-versions.txt + +MESSAGE=$(git log -1 --pretty=%B) +./mvnw buildnumber:create pre-site --no-transfer-progress + +./mvnw javadoc:javadoc -Dmaven.javadoc.skip=false --no-transfer-progress + +if [ -e target/site/apidocs/element-list ] + then cp target/site/apidocs/element-list target/site/apidocs/package-list +fi + +RELEASE_VERSION=$(cat pom.xml | grep -oPm1 "(?<=)[^<]+") + +# GHA does shallow clones, so need the next 2 commands to have the gh-pages branch +git remote set-branches origin 'gh-pages' +git fetch -v + +git checkout gh-pages +mkdir -p $RELEASE_VERSION/htmlsingle +cp target/generated-docs/index.html $RELEASE_VERSION/htmlsingle +mkdir -p $RELEASE_VERSION/api +cp -r target/site/apidocs/* $RELEASE_VERSION/api/ +git add $RELEASE_VERSION/ + +if [[ $LATEST == "true" ]] + then + if [[ $RELEASE_VERSION == *[RCM]* ]] + then + DOC_DIR="milestone" + elif [[ $RELEASE_VERSION == *SNAPSHOT* ]] + then + DOC_DIR="snapshot" + else + DOC_DIR="stable" + fi + + mkdir -p $DOC_DIR/htmlsingle + cp target/generated-docs/index.html $DOC_DIR/htmlsingle + mkdir -p $DOC_DIR/api + cp -r target/site/apidocs/* $DOC_DIR/api/ + git add $DOC_DIR/ + +fi + +git commit -m "$MESSAGE" +git push origin gh-pages +git checkout main diff --git a/.ci/ubuntu/enabled_plugins b/.ci/ubuntu/enabled_plugins new file mode 100644 index 0000000..2e81f16 --- /dev/null +++ b/.ci/ubuntu/enabled_plugins @@ -0,0 +1 @@ +[rabbitmq_auth_mechanism_ssl,rabbitmq_management,rabbitmq_stream,rabbitmq_stream_management,rabbitmq_top]. diff --git a/.ci/ubuntu/gha-log-check.sh b/.ci/ubuntu/gha-log-check.sh new file mode 100755 index 0000000..fef23a8 --- /dev/null +++ b/.ci/ubuntu/gha-log-check.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +set -o errexit +set -o pipefail +set -o xtrace +set -o nounset + +readonly docker_name_prefix='rabbitmq-amqp-go-client' + +declare -r rabbitmq_docker_name="$docker_name_prefix-rabbitmq" + +if docker logs "$rabbitmq_docker_name" | grep -iF inet_error +then + echo '[ERROR] found inet_error in RabbitMQ logs' 1>&2 + exit 1 +fi diff --git a/.ci/ubuntu/gha-setup.sh b/.ci/ubuntu/gha-setup.sh new file mode 100755 index 0000000..646f930 --- /dev/null +++ b/.ci/ubuntu/gha-setup.sh @@ -0,0 +1,180 @@ +#!/usr/bin/env bash + +set -o errexit +set -o pipefail +set -o xtrace + +script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +readonly script_dir +echo "[INFO] script_dir: '$script_dir'" + +if [[ $3 == 'arm' ]] +then + readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq-arm64:main}" +else + readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq:main}" +fi + + +readonly docker_name_prefix='rabbitmq-amqp-go-client' +readonly docker_network_name="$docker_name_prefix-network" + +if [[ ! -v GITHUB_ACTIONS ]] +then + GITHUB_ACTIONS='false' +fi + +if [[ -d $GITHUB_WORKSPACE ]] +then + echo "[INFO] GITHUB_WORKSPACE is set: '$GITHUB_WORKSPACE'" +else + GITHUB_WORKSPACE="$(cd "$script_dir/../.." && pwd)" + echo "[INFO] set GITHUB_WORKSPACE to: '$GITHUB_WORKSPACE'" +fi + +if [[ $1 == 'toxiproxy' ]] +then + readonly run_toxiproxy='true' +else + readonly run_toxiproxy='false' +fi + +if [[ $2 == 'pull' ]] +then + readonly docker_pull_args='--pull always' +else + readonly docker_pull_args='' +fi + +if [[ $1 == 'stop' ]] +then + docker stop "$rabbitmq_docker_name" + docker stop "$toxiproxy_docker_name" + exit 0 +fi + +set -o nounset + +declare -r rabbitmq_docker_name="$docker_name_prefix-rabbitmq" +declare -r toxiproxy_docker_name="$docker_name_prefix-toxiproxy" + +function start_toxiproxy +{ + if [[ $run_toxiproxy == 'true' ]] + then + # sudo ss -4nlp + echo "[INFO] starting Toxiproxy server docker container" + docker rm --force "$toxiproxy_docker_name" 2>/dev/null || echo "[INFO] $toxiproxy_docker_name was not running" + # shellcheck disable=SC2086 + docker run --detach $docker_pull_args \ + --name "$toxiproxy_docker_name" \ + --hostname "$toxiproxy_docker_name" \ + --publish 8474:8474 \ + --publish 55670-55680:55670-55680 \ + --network "$docker_network_name" \ + 'ghcr.io/shopify/toxiproxy:latest' + fi +} + +function start_rabbitmq +{ + echo "[INFO] starting RabbitMQ server docker container" + chmod 0777 "$GITHUB_WORKSPACE/.ci/ubuntu/log" + docker rm --force "$rabbitmq_docker_name" 2>/dev/null || echo "[INFO] $rabbitmq_docker_name was not running" + # shellcheck disable=SC2086 + docker run --detach $docker_pull_args \ + --name "$rabbitmq_docker_name" \ + --hostname "$rabbitmq_docker_name" \ + --publish 5671:5671 \ + --publish 5672:5672 \ + --publish 15672:15672 \ + --network "$docker_network_name" \ + --volume "$GITHUB_WORKSPACE/.ci/ubuntu/enabled_plugins:/etc/rabbitmq/enabled_plugins" \ + --volume "$GITHUB_WORKSPACE/.ci/ubuntu/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:ro" \ + --volume "$GITHUB_WORKSPACE/.ci/certs:/etc/rabbitmq/certs:ro" \ + --volume "$GITHUB_WORKSPACE/.ci/ubuntu/log:/var/log/rabbitmq" \ + "$rabbitmq_image" +} + +function wait_rabbitmq +{ + set +o errexit + set +o xtrace + + declare -i count=12 + while (( count > 0 )) && [[ "$(docker inspect --format='{{.State.Running}}' "$rabbitmq_docker_name")" != 'true' ]] + do + echo '[WARNING] RabbitMQ container is not yet running...' + sleep 5 + (( count-- )) + done + + declare -i count=12 + while (( count > 0 )) && ! docker exec "$rabbitmq_docker_name" epmd -names | grep -F 'name rabbit' + do + echo '[WARNING] epmd is not reporting rabbit name just yet...' + sleep 5 + (( count-- )) + done + + set -o xtrace + + docker exec "$rabbitmq_docker_name" rabbitmqctl await_startup + docker exec "$rabbitmq_docker_name" rabbitmq-diagnostics erlang_version + docker exec "$rabbitmq_docker_name" rabbitmqctl version + + set -o errexit +} + +function get_rabbitmq_id +{ + local rabbitmq_docker_id + rabbitmq_docker_id="$(docker inspect --format='{{.Id}}' "$rabbitmq_docker_name")" + echo "[INFO] '$rabbitmq_docker_name' docker id is '$rabbitmq_docker_id'" + if [[ -v GITHUB_OUTPUT ]] + then + if [[ -f $GITHUB_OUTPUT ]] + then + echo "[INFO] GITHUB_OUTPUT file: '$GITHUB_OUTPUT'" + fi + echo "id=$rabbitmq_docker_id" >> "$GITHUB_OUTPUT" + fi +} + +function install_ca_certificate +{ + set +o errexit + hostname + hostname -s + hostname -f + openssl version + openssl version -d + set -o errexit + + if [[ $GITHUB_ACTIONS == 'true' ]] + then + readonly openssl_store_dir='/usr/lib/ssl/certs' + sudo cp -vf "$GITHUB_WORKSPACE/.ci/certs/ca_certificate.pem" "$openssl_store_dir" + sudo ln -vsf "$openssl_store_dir/ca_certificate.pem" "$openssl_store_dir/$(openssl x509 -hash -noout -in $openssl_store_dir/ca_certificate.pem).0" + else + echo "[WARNING] you must install '$GITHUB_WORKSPACE/.ci/certs/ca_certificate.pem' manually into your trusted root store" + fi + + openssl s_client -connect localhost:5671 \ + -CAfile "$GITHUB_WORKSPACE/.ci/certs/ca_certificate.pem" \ + -cert "$GITHUB_WORKSPACE/.ci/certs/client_localhost_certificate.pem" \ + -key "$GITHUB_WORKSPACE/.ci/certs/client_localhost_key.pem" \ + -pass pass:grapefruit < /dev/null +} + +docker network create "$docker_network_name" || echo "[INFO] network '$docker_network_name' is already created" + +start_toxiproxy + +start_rabbitmq + +wait_rabbitmq + +get_rabbitmq_id + +install_ca_certificate diff --git a/.ci/ubuntu/log/.gitkeep b/.ci/ubuntu/log/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/.ci/ubuntu/rabbitmq.conf b/.ci/ubuntu/rabbitmq.conf new file mode 100644 index 0000000..829dcec --- /dev/null +++ b/.ci/ubuntu/rabbitmq.conf @@ -0,0 +1,26 @@ +loopback_users = none +loopback_users.guest = true + +log.console = true +log.console.level = debug +log.file = /var/log/rabbitmq/rabbitmq.log +log.file.level = debug +log.exchange = false + +listeners.tcp.default = 5672 +listeners.ssl.default = 5671 +reverse_dns_lookups = false + +deprecated_features.permit.amqp_address_v1 = false + +ssl_options.cacertfile = /etc/rabbitmq/certs/ca_certificate.pem +ssl_options.certfile = /etc/rabbitmq/certs/server_localhost_certificate.pem +ssl_options.keyfile = /etc/rabbitmq/certs/server_localhost_key.pem +ssl_options.verify = verify_peer +ssl_options.password = grapefruit +ssl_options.depth = 1 +ssl_options.fail_if_no_peer_cert = false + +auth_mechanisms.1 = PLAIN +auth_mechanisms.2 = ANONYMOUS +auth_mechanisms.3 = EXTERNAL diff --git a/.github/workflows/build-test.yaml b/.github/workflows/build-test.yaml new file mode 100644 index 0000000..7284091 --- /dev/null +++ b/.github/workflows/build-test.yaml @@ -0,0 +1,33 @@ +name: Test against supported go-version + +on: + - workflow_call + +jobs: + build-ubuntu: + runs-on: ubuntu-latest + strategy: + fail-fast: true + matrix: + go: [ '1.22'] + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v5 + id: setup_go + with: + go-version: ${{ matrix.go }} + check-latest: true + - name: Start RabbitMQ + id: start-rabbitmq + run: ${{ github.workspace }}/.ci/ubuntu/gha-setup.sh + - name: Test + timeout-minutes: 15 + run: make test + - name: Check for errors in RabbitMQ logs + run: ${{ github.workspace}}/.ci/ubuntu/gha-log-check.sh + - name: Maybe upload RabbitMQ logs + if: failure() + uses: actions/upload-artifact@v4 + with: + name: rabbitmq-logs-integration-ubuntu + path: ${{ github.workspace }}/.ci/ubuntu/log/ diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml new file mode 100644 index 0000000..5289b53 --- /dev/null +++ b/.github/workflows/main.yaml @@ -0,0 +1,11 @@ +name: rabbitmq-amqp-go-client + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +jobs: + call-build-test: + uses: ./.github/workflows/build-test.yaml \ No newline at end of file diff --git a/.github/workflows/publish-nuget.yaml b/.github/workflows/publish-nuget.yaml new file mode 100644 index 0000000..1fea9db --- /dev/null +++ b/.github/workflows/publish-nuget.yaml @@ -0,0 +1,27 @@ +name: publish-nuget + +on: + workflow_call: + secrets: + NUGET_API_KEY: + required: true + +jobs: + publish-nuget: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + submodules: true + - uses: actions/cache@v4 + with: + path: | + ~/.nuget/packages + ~/.local/share/NuGet/v3-cache + key: ${{ runner.os }}-v0-nuget-${{ hashFiles('**/*.csproj') }} + restore-keys: | + ${{ runner.os }}-v0-nuget- + - name: Build (Release) + run: dotnet build ${{ github.workspace }}/Build.csproj --configuration=Release --property CI=true + - name: Publish to NuGet + run: dotnet nuget push --skip-duplicate --api-key ${{ secrets.NUGET_API_KEY }} --source 'https://api.nuget.org/v3/index.json' ${{ github.workspace }}/packages/RabbitMQ.AMQP.Client.*.nupkg diff --git a/.github/workflows/publish.yaml b/.github/workflows/publish.yaml new file mode 100644 index 0000000..c01e545 --- /dev/null +++ b/.github/workflows/publish.yaml @@ -0,0 +1,15 @@ +name: publish rabbitmq-dotnet-client + +on: + release: + types: + # https://docs.github.com/en/actions/using-workflows/events-that-trigger-workflows#release + - published + +jobs: + call-build-test: + uses: ./.github/workflows/build-test.yaml + call-publish-nuget: + uses: ./.github/workflows/publish-nuget.yaml + needs: call-build-test + secrets: inherit diff --git a/.gitignore b/.gitignore index 6f72f89..eecfdeb 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,7 @@ go.work.sum # env file .env +.idea/ +coverage.txt +.DS_Store +.ci/ubuntu/log/rabbitmq.log diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..d655c81 --- /dev/null +++ b/Makefile @@ -0,0 +1,21 @@ +all: format vet test + +format: + go fmt ./... + +vet: + go vet ./rabbitmq_amqp + +test: + cd rabbitmq_amqp && go run -mod=mod github.com/onsi/ginkgo/v2/ginkgo \ + --randomize-all --randomize-suites \ + --cover --coverprofile=coverage.txt --covermode=atomic \ + --race + + + +rabbitmq-server-start-arm: + ./.ci/ubuntu/gha-setup.sh start pull arm + +rabbitmq-server-stop: + ./.ci/ubuntu/gha-setup.sh stop diff --git a/README.md b/README.md index f90ce17..9cea4e6 100644 --- a/README.md +++ b/README.md @@ -1 +1,13 @@ -# rabbitmq-amqp-go-client \ No newline at end of file +# RabbitMQ AMQP 1.0 .Golang Client + +This library is in early stages of development. It is meant to be used with RabbitMQ 4.0. + +## How to Run + +- Start the broker with `./.ci/ubuntu/gha-setup.sh start`. Note that this has been tested on Ubuntu 22 with docker. +- `make test` to run the tests +- Stop RabbitMQ with `./.ci/ubuntu/gha-setup.sh stop` + +## Getting Started + +You can find an example in: `examples/getting_started` diff --git a/examples/getting_started/main.go b/examples/getting_started/main.go new file mode 100644 index 0000000..b7d74df --- /dev/null +++ b/examples/getting_started/main.go @@ -0,0 +1,57 @@ +package main + +import ( + "bufio" + "context" + "fmt" + mq "github.com/rabbitmq/rabbitmq-amqp-go-client/rabbitmq_amqp" + "os" +) + +func main() { + fmt.Printf("Getting started with AMQP Go AMQP 1.0 Client\n") + + chStatusChanged := make(chan *mq.StatusChanged, 1) + + go func(ch chan *mq.StatusChanged) { + for statusChanged := range ch { + fmt.Printf("Status changed from %d to %d\n", statusChanged.From, statusChanged.To) + } + }(chStatusChanged) + + amqpConnection := mq.NewAmqpConnection() + amqpConnection.NotifyStatusChange(chStatusChanged) + err := amqpConnection.Open(context.Background(), mq.NewConnectionSettings()) + if err != nil { + return + } + + fmt.Printf("AMQP Connection opened.\n") + management := amqpConnection.Management() + queueSpec := management.Queue("getting_started_queue"). + QueueType(mq.QueueType{Type: mq.Quorum}). + MaxLengthBytes(mq.CapacityGB(1)). + DeadLetterExchange("dead-letter-exchange"). + DeadLetterRoutingKey("dead-letter-routing-key") + queueInfo, err := queueSpec.Declare(context.Background()) + if err != nil { + return + } + fmt.Printf("Queue %s created.\n", queueInfo.GetName()) + err = queueSpec.Delete(context.Background()) + if err != nil { + return + } + fmt.Printf("Queue %s deleted.\n", queueInfo.GetName()) + + fmt.Println("Press any key to stop ") + reader := bufio.NewReader(os.Stdin) + _, _ = reader.ReadString('\n') + + err = amqpConnection.Close(context.Background()) + if err != nil { + return + } + fmt.Printf("AMQP Connection closed.\n") + close(chStatusChanged) +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..135f6cf --- /dev/null +++ b/go.mod @@ -0,0 +1,24 @@ +module github.com/rabbitmq/rabbitmq-amqp-go-client + +go 1.22.0 + +require ( + github.com/Azure/go-amqp v0.0.0-00010101000000-000000000000 + github.com/onsi/ginkgo/v2 v2.20.2 + github.com/onsi/gomega v1.34.2 +) + +require ( + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-task/slim-sprig/v3 v3.0.0 // indirect + github.com/google/go-cmp v0.6.0 // indirect + github.com/google/pprof v0.0.0-20240827171923-fa2c70bbbfe5 // indirect + github.com/google/uuid v1.6.0 // indirect + golang.org/x/net v0.28.0 // indirect + golang.org/x/sys v0.24.0 // indirect + golang.org/x/text v0.17.0 // indirect + golang.org/x/tools v0.24.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + +replace github.com/Azure/go-amqp => github.com/Gsantomaggio/go-amqp v0.0.0-20240905094626-af192b497e48 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..803082e --- /dev/null +++ b/go.sum @@ -0,0 +1,38 @@ +github.com/Gsantomaggio/go-amqp v0.0.0-20240905094626-af192b497e48 h1:etxEtd7qkhJD34gpQesPbZuMJrqkc+ZOXqR3diVfGWs= +github.com/Gsantomaggio/go-amqp v0.0.0-20240905094626-af192b497e48/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/pprof v0.0.0-20240827171923-fa2c70bbbfe5 h1:5iH8iuqE5apketRbSFBy+X1V0o+l+8NF1avt4HWl7cA= +github.com/google/pprof v0.0.0-20240827171923-fa2c70bbbfe5/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/onsi/ginkgo/v2 v2.20.2 h1:7NVCeyIWROIAheY21RLS+3j2bb52W0W82tkberYytp4= +github.com/onsi/ginkgo/v2 v2.20.2/go.mod h1:K9gyxPIlb+aIvnZ8bd9Ak+YP18w3APlR+5coaZoE2ag= +github.com/onsi/gomega v1.34.2 h1:pNCwDkzrsv7MS9kpaQvVb1aVLahQXyJ/Tv5oAZMI3i8= +github.com/onsi/gomega v1.34.2/go.mod h1:v1xfxRgk0KIsG+QOdm7p8UosrOzPYRo60fd3B/1Dukc= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24= +golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ= +google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= +google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/rabbitmq_amqp/amqp_connection.go b/rabbitmq_amqp/amqp_connection.go new file mode 100644 index 0000000..2cf2907 --- /dev/null +++ b/rabbitmq_amqp/amqp_connection.go @@ -0,0 +1,179 @@ +package rabbitmq_amqp + +import ( + "context" + "crypto/tls" + "fmt" + "github.com/Azure/go-amqp" +) + +type ConnectionSettings struct { + host string + port int + user string + password string + virtualHost string + scheme string + containerId string + useSsl bool + tlsConfig *tls.Config +} + +func (c *ConnectionSettings) TlsConfig(config *tls.Config) IConnectionSettings { + c.tlsConfig = config + return c +} + +func (c *ConnectionSettings) GetTlsConfig() *tls.Config { + return c.tlsConfig +} + +func (c *ConnectionSettings) Port(port int) IConnectionSettings { + c.port = port + return c +} + +func (c *ConnectionSettings) User(userName string) IConnectionSettings { + + c.user = userName + return c +} + +func (c *ConnectionSettings) Password(password string) IConnectionSettings { + c.password = password + return c +} + +func (c *ConnectionSettings) VirtualHost(virtualHost string) IConnectionSettings { + c.virtualHost = virtualHost + return c +} + +func (c *ConnectionSettings) ContainerId(containerId string) IConnectionSettings { + c.containerId = containerId + return c +} + +func (c *ConnectionSettings) GetHost() string { + return c.host +} + +func (c *ConnectionSettings) Host(hostName string) IConnectionSettings { + c.host = hostName + return c + +} + +func (c *ConnectionSettings) GetPort() int { + return c.port +} + +func (c *ConnectionSettings) GetUser() string { + return c.user +} + +func (c *ConnectionSettings) GetPassword() string { + return c.password +} + +func (c *ConnectionSettings) GetVirtualHost() string { + return c.virtualHost +} + +func (c *ConnectionSettings) GetScheme() string { + return c.scheme +} + +func (c *ConnectionSettings) GetContainerId() string { + return c.containerId +} + +func (c *ConnectionSettings) UseSsl(value bool) IConnectionSettings { + c.useSsl = value + if value { + c.scheme = "amqps" + } else { + c.scheme = "amqp" + } + return c +} + +func (c *ConnectionSettings) IsSsl() bool { + return c.useSsl +} + +func (c *ConnectionSettings) BuildAddress() string { + return c.scheme + "://" + c.host + ":" + fmt.Sprint(c.port) +} + +func NewConnectionSettings() IConnectionSettings { + return &ConnectionSettings{ + host: "localhost", + port: 5672, + user: "guest", + password: "guest", + virtualHost: "/", + scheme: "amqp", + containerId: "amqp-go-client", + useSsl: false, + tlsConfig: nil, + } +} + +type AmqpConnection struct { + Connection *amqp.Conn + management IManagement + lifeCycle *LifeCycle +} + +func (a *AmqpConnection) Management() IManagement { + return a.management +} + +func NewAmqpConnection() IConnection { + return &AmqpConnection{ + management: NewAmqpManagement(), + lifeCycle: NewLifeCycle(), + } +} + +func (a *AmqpConnection) Open(ctx context.Context, connectionSettings IConnectionSettings) error { + // TODO: add support for other SASL types + sASLType := amqp.SASLTypeAnonymous() + + conn, err := amqp.Dial(ctx, connectionSettings.BuildAddress(), &amqp.ConnOptions{ + ContainerID: connectionSettings.GetContainerId(), + SASLType: sASLType, + HostName: connectionSettings.GetVirtualHost(), + TLSConfig: connectionSettings.GetTlsConfig(), + }) + if err != nil { + return err + } + a.Connection = conn + a.lifeCycle.SetStatus(Open) + + err = a.Management().Open(ctx, a) + if err != nil { + return err + } + return nil +} + +func (a *AmqpConnection) Close(ctx context.Context) error { + err := a.Management().Close(ctx) + if err != nil { + return err + } + err = a.Connection.Close() + a.lifeCycle.SetStatus(Closed) + return err +} + +func (a *AmqpConnection) NotifyStatusChange(channel chan *StatusChanged) { + a.lifeCycle.chStatusChanged = channel +} + +func (a *AmqpConnection) GetStatus() int { + return a.lifeCycle.Status() +} diff --git a/rabbitmq_amqp/amqp_connection_test.go b/rabbitmq_amqp/amqp_connection_test.go new file mode 100644 index 0000000..34b2d7e --- /dev/null +++ b/rabbitmq_amqp/amqp_connection_test.go @@ -0,0 +1,96 @@ +package rabbitmq_amqp + +import ( + "context" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "time" +) + +var _ = Describe("AMQP Connection Test", func() { + It("AMQP Connection should success", func() { + amqpConnection := NewAmqpConnection() + Expect(amqpConnection).NotTo(BeNil()) + Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{})) + + connectionSettings := NewConnectionSettings() + Expect(connectionSettings).NotTo(BeNil()) + Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) + err := amqpConnection.Open(context.TODO(), connectionSettings) + Expect(err).To(BeNil()) + }) + + It("AMQP Connection should fail due of wrong port", func() { + amqpConnection := NewAmqpConnection() + Expect(amqpConnection).NotTo(BeNil()) + Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{})) + connectionSettings := NewConnectionSettings() + Expect(connectionSettings).NotTo(BeNil()) + Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) + connectionSettings.Host("localhost").Port(1234) + err := amqpConnection.Open(context.TODO(), connectionSettings) + Expect(err).NotTo(BeNil()) + }) + + It("AMQP Connection should fail due of wrong host", func() { + + amqpConnection := NewAmqpConnection() + Expect(amqpConnection).NotTo(BeNil()) + Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{})) + + connectionSettings := NewConnectionSettings() + Expect(connectionSettings).NotTo(BeNil()) + Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) + connectionSettings.Host("wronghost").Port(5672) + err := amqpConnection.Open(context.TODO(), connectionSettings) + Expect(err).NotTo(BeNil()) + }) + + It("AMQP Connection should fail due of context cancelled", func() { + amqpConnection := NewAmqpConnection() + Expect(amqpConnection).NotTo(BeNil()) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + cancel() + err := amqpConnection.Open(ctx, NewConnectionSettings()) + Expect(err).NotTo(BeNil()) + }) + + It("AMQP Connection should receive events ", func() { + amqpConnection := NewAmqpConnection() + Expect(amqpConnection).NotTo(BeNil()) + ch := make(chan *StatusChanged, 1) + amqpConnection.NotifyStatusChange(ch) + err := amqpConnection.Open(context.TODO(), NewConnectionSettings()) + Expect(err).To(BeNil()) + recv := <-ch + Expect(recv).NotTo(BeNil()) + Expect(recv.From).To(Equal(Closed)) + Expect(recv.To).To(Equal(Open)) + + err = amqpConnection.Close(context.Background()) + Expect(err).To(BeNil()) + recv = <-ch + Expect(recv).NotTo(BeNil()) + + Expect(recv.From).To(Equal(Open)) + Expect(recv.To).To(Equal(Closed)) + + }) + + //It("AMQP TLS Connection should success with SASLTypeAnonymous ", func() { + // amqpConnection := NewAmqpConnection() + // Expect(amqpConnection).NotTo(BeNil()) + // Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{})) + // + // connectionSettings := NewConnectionSettings(). + // UseSsl(true).Port(5671).TlsConfig(&tls.Config{ + // //ServerName: "localhost", + // InsecureSkipVerify: true, + // }) + // Expect(connectionSettings).NotTo(BeNil()) + // Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) + // err := amqpConnection.Open(context.TODO(), connectionSettings) + // Expect(err).To(BeNil()) + //}) + +}) diff --git a/rabbitmq_amqp/amqp_managent.go b/rabbitmq_amqp/amqp_managent.go new file mode 100644 index 0000000..2994191 --- /dev/null +++ b/rabbitmq_amqp/amqp_managent.go @@ -0,0 +1,222 @@ +package rabbitmq_amqp + +import ( + "context" + "errors" + "fmt" + "github.com/Azure/go-amqp" + "github.com/google/uuid" + "strconv" + "time" +) + +var PreconditionFailed = errors.New("precondition Failed") + +type AmqpManagement struct { + session *amqp.Session + sender *amqp.Sender + receiver *amqp.Receiver + lifeCycle *LifeCycle + cancel context.CancelFunc +} + +func NewAmqpManagement() *AmqpManagement { + return &AmqpManagement{ + lifeCycle: NewLifeCycle(), + } + +} + +func (a *AmqpManagement) ensureReceiverLink(ctx context.Context) error { + if a.receiver == nil { + prop := make(map[string]any) + prop["paired"] = true + opts := &amqp.ReceiverOptions{ + DynamicAddress: false, + Name: linkPairName, + Properties: prop, + RequestedSenderSettleMode: amqp.SenderSettleModeSettled.Ptr(), + SettlementMode: amqp.ReceiverSettleModeFirst.Ptr(), + TargetAddress: managementNodeAddress, + ExpiryPolicy: amqp.ExpiryPolicyLinkDetach, + Credit: 100, + } + receiver, err := a.session.NewReceiver(ctx, managementNodeAddress, opts) + if err != nil { + return err + } + a.receiver = receiver + return nil + } + return nil +} + +//func (a *AmqpManagement) processMessages(ctx context.Context) error { +// +// go func() { +// +// for a.GetStatus() == Open { +// msg, err := a.receiver.Receive(ctx, nil) // blocking call +// if err != nil { +// fmt.Printf("Exiting processMessages %s\n", err) +// return +// } +// +// if msg != nil { +// a.receiver.AcceptMessage(ctx, msg) +// } +// } +// +// fmt.Printf("Exiting processMessages\n") +// }() + +//return nil +//} + +func (a *AmqpManagement) ensureSenderLink(ctx context.Context) error { + if a.sender == nil { + prop := make(map[string]any) + prop["paired"] = true + opts := &amqp.SenderOptions{ + DynamicAddress: false, + ExpiryPolicy: amqp.ExpiryPolicyLinkDetach, + ExpiryTimeout: 0, + Name: linkPairName, + Properties: prop, + SettlementMode: amqp.SenderSettleModeSettled.Ptr(), + RequestedReceiverSettleMode: amqp.ReceiverSettleModeFirst.Ptr(), + SourceAddress: managementNodeAddress, + } + sender, err := a.session.NewSender(ctx, managementNodeAddress, opts) + if err != nil { + return err + } + + a.sender = sender + return nil + } + return nil +} + +func (a *AmqpManagement) Open(ctx context.Context, connection IConnection) error { + session, err := connection.(*AmqpConnection).Connection.NewSession(ctx, nil) + if err != nil { + return err + } + a.session = session + err = a.ensureSenderLink(ctx) + + if err != nil { + return err + } + + time.Sleep(500 * time.Millisecond) + err = a.ensureReceiverLink(ctx) + time.Sleep(500 * time.Millisecond) + if err != nil { + return err + } + //if ctx.Err() != nil { + // // start processing messages. Here we pass a context that will be closed + // // when the receiver session is closed. + // // we won't expose To the user since the user will call Close + // // and the processing _must_ be running in the background + // // for the management session life. + // //err = a.processMessages(context.Background()) + // //if err != nil { + // // return err + // //} + //} + a.lifeCycle.SetStatus(Open) + return ctx.Err() +} + +func (a *AmqpManagement) Close(ctx context.Context) error { + _ = a.sender.Close(ctx) + _ = a.receiver.Close(ctx) + err := a.session.Close(ctx) + a.lifeCycle.SetStatus(Closed) + return err +} + +func (a *AmqpManagement) Request(ctx context.Context, body any, path string, method string, + expectedResponseCodes []int) (map[string]any, error) { + + return a.request(ctx, uuid.New().String(), body, path, method, expectedResponseCodes) + +} + +func (a *AmqpManagement) validateResponseCode(responseCode int, expectedResponseCodes []int) error { + + for _, code := range expectedResponseCodes { + if code == responseCode { + return nil + } + } + return PreconditionFailed +} + +func (a *AmqpManagement) request(ctx context.Context, id string, body any, path string, method string, + expectedResponseCodes []int) (map[string]any, error) { + amqpMessage := amqp.NewMessageWithValue(body) + s := commandReplyTo + amqpMessage.Properties = &amqp.MessageProperties{ + ReplyTo: &s, + To: &path, + Subject: &method, + MessageID: &id, + } + opts := &amqp.SendOptions{Settled: true} + err := a.sender.Send(ctx, amqpMessage, opts) + if err != nil { + return make(map[string]any), err + } + msg, err := a.receiver.Receive(ctx, nil) + if err != nil { + return make(map[string]any), err + } + err = a.receiver.AcceptMessage(ctx, msg) + if err != nil { + return nil, err + } + if msg.Properties == nil { + return make(map[string]any), fmt.Errorf("expected properties in the message") + } + + if msg.Properties.CorrelationID == nil { + return make(map[string]any), fmt.Errorf("expected correlation id in the message") + } + + if msg.Properties.CorrelationID != id { + return make(map[string]any), fmt.Errorf("expected correlation id %s got %s", id, msg.Properties.CorrelationID) + } + switch msg.Value.(type) { + case map[string]interface{}: + return msg.Value.(map[string]any), nil + } + + i, _ := strconv.Atoi(*msg.Properties.Subject) + + err = a.validateResponseCode(i, expectedResponseCodes) + if err != nil { + return nil, err + } + + return make(map[string]any), nil +} + +func (a *AmqpManagement) Queue(queueName string) IQueueSpecification { + return newAmqpQueue(a, queueName) +} + +func (a *AmqpManagement) QueueClientName() IQueueSpecification { + return newAmqpQueue(a, "") +} + +func (a *AmqpManagement) NotifyStatusChange(channel chan *StatusChanged) { + a.lifeCycle.chStatusChanged = channel +} + +func (a *AmqpManagement) GetStatus() int { + return a.lifeCycle.Status() +} diff --git a/rabbitmq_amqp/amqp_managent_test.go b/rabbitmq_amqp/amqp_managent_test.go new file mode 100644 index 0000000..d2b7521 --- /dev/null +++ b/rabbitmq_amqp/amqp_managent_test.go @@ -0,0 +1,70 @@ +package rabbitmq_amqp + +import ( + "context" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "time" +) + +var _ = Describe("Management tests", func() { + + It("AMQP Management should fail due of context cancelled", func() { + amqpConnection := NewAmqpConnection() + Expect(amqpConnection).NotTo(BeNil()) + err := amqpConnection.Open(context.Background(), NewConnectionSettings()) + Expect(err).To(BeNil()) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + cancel() + err = amqpConnection.Management().Open(ctx, amqpConnection) + Expect(err).NotTo(BeNil()) + }) + + It("AMQP Management should receive events ", func() { + amqpConnection := NewAmqpConnection() + Expect(amqpConnection).NotTo(BeNil()) + ch := make(chan *StatusChanged, 1) + amqpConnection.Management().NotifyStatusChange(ch) + err := amqpConnection.Open(context.TODO(), NewConnectionSettings()) + Expect(err).To(BeNil()) + recv := <-ch + Expect(recv).NotTo(BeNil()) + Expect(recv.From).To(Equal(Closed)) + Expect(recv.To).To(Equal(Open)) + + err = amqpConnection.Close(context.Background()) + Expect(err).To(BeNil()) + recv = <-ch + Expect(recv).NotTo(BeNil()) + + Expect(recv.From).To(Equal(Open)) + Expect(recv.To).To(Equal(Closed)) + + }) + + It("Request", func() { + amqpConnection := NewAmqpConnection() + Expect(amqpConnection).NotTo(BeNil()) + Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{})) + + connectionSettings := NewConnectionSettings() + Expect(connectionSettings).NotTo(BeNil()) + Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) + err := amqpConnection.Open(context.TODO(), connectionSettings) + Expect(err).To(BeNil()) + + management := amqpConnection.Management() + kv := make(map[string]any) + kv["durable"] = true + kv["auto_delete"] = false + _queueArguments := make(map[string]any) + _queueArguments["x-queue-type"] = "quorum" + kv["arguments"] = _queueArguments + path := "/queues/test" + result, err := management.Request(context.TODO(), kv, path, "PUT", []int{200}) + Expect(err).To(BeNil()) + Expect(result).NotTo(BeNil()) + Expect(management.Close(context.TODO())).To(BeNil()) + }) +}) diff --git a/rabbitmq_amqp/amqp_queue.go b/rabbitmq_amqp/amqp_queue.go new file mode 100644 index 0000000..9199347 --- /dev/null +++ b/rabbitmq_amqp/amqp_queue.go @@ -0,0 +1,178 @@ +package rabbitmq_amqp + +import ( + "context" +) + +type AmqpQueueInfo struct { + name string + isDurable bool + isAutoDelete bool + isExclusive bool + leader string + replicas []string + arguments map[string]any + queueType TQueueType +} + +func (a *AmqpQueueInfo) GetLeader() string { + return a.leader +} + +func (a *AmqpQueueInfo) GetReplicas() []string { + return a.replicas +} + +func newAmqpQueueInfo(response map[string]any) IQueueInfo { + return &AmqpQueueInfo{ + name: response["name"].(string), + isDurable: response["durable"].(bool), + isAutoDelete: response["auto_delete"].(bool), + isExclusive: response["exclusive"].(bool), + queueType: TQueueType(response["type"].(string)), + leader: response["leader"].(string), + replicas: response["replicas"].([]string), + arguments: response["arguments"].(map[string]any), + } +} + +func (a *AmqpQueueInfo) IsDurable() bool { + return a.isDurable +} + +func (a *AmqpQueueInfo) IsAutoDelete() bool { + return a.isAutoDelete +} + +func (a *AmqpQueueInfo) Exclusive() bool { + return a.isExclusive +} + +func (a *AmqpQueueInfo) Type() TQueueType { + return a.queueType +} + +func (a *AmqpQueueInfo) GetName() string { + return a.name +} + +func (a *AmqpQueueInfo) GetArguments() map[string]any { + return a.arguments +} + +type AmqpQueue struct { + management *AmqpManagement + queueArguments map[string]any + isExclusive bool + isAutoDelete bool + name string +} + +func (a *AmqpQueue) DeadLetterExchange(dlx string) IQueueSpecification { + a.queueArguments["x-dead-letter-exchange"] = dlx + return a +} + +func (a *AmqpQueue) DeadLetterRoutingKey(dlrk string) IQueueSpecification { + a.queueArguments["x-dead-letter-routing-key"] = dlrk + return a +} + +func (a *AmqpQueue) MaxLengthBytes(length int64) IQueueSpecification { + a.queueArguments["max-length-bytes"] = length + return a +} + +func (a *AmqpQueue) QueueType(queueType QueueType) IQueueSpecification { + a.queueArguments["x-queue-type"] = queueType.String() + return a +} + +func (a *AmqpQueue) GetQueueType() TQueueType { + if a.queueArguments["x-queue-type"] == nil { + return Classic + } + return TQueueType(a.queueArguments["x-queue-type"].(string)) +} + +func (a *AmqpQueue) Exclusive(isExclusive bool) IQueueSpecification { + a.isExclusive = isExclusive + return a +} + +func (a *AmqpQueue) IsExclusive() bool { + return a.isExclusive +} + +func (a *AmqpQueue) AutoDelete(isAutoDelete bool) IQueueSpecification { + a.isAutoDelete = isAutoDelete + return a +} + +func (a *AmqpQueue) IsAutoDelete() bool { + return a.isAutoDelete +} + +func newAmqpQueue(management *AmqpManagement, queueName string) IQueueSpecification { + return &AmqpQueue{management: management, + name: queueName, + queueArguments: make(map[string]any)} +} + +func (a *AmqpQueue) validate() error { + + if a.queueArguments["max-length-bytes"] != nil { + + err := validatePositive("max length", a.queueArguments["max-length-bytes"].(int64)) + if err != nil { + return err + } + } + return nil +} + +func (a *AmqpQueue) Declare(ctx context.Context) (IQueueInfo, error) { + + if Quorum == a.GetQueueType() || + Stream == a.GetQueueType() { + // mandatory arguments for quorum queues and streams + a.Exclusive(false).AutoDelete(false) + } + if err := a.validate(); err != nil { + return nil, err + } + + if a.name == "" { + a.name = GenerateNameWithDefaultPrefix() + } + + path := queuePath(a.name) + kv := make(map[string]any) + kv["durable"] = true + kv["auto_delete"] = a.isAutoDelete + kv["exclusive"] = a.isExclusive + kv["arguments"] = a.queueArguments + response, err := a.management.Request(ctx, kv, path, commandPut, []int{200}) + if err != nil { + return nil, err + } + return newAmqpQueueInfo(response), nil +} + +func (a *AmqpQueue) Delete(ctx context.Context) error { + path := queuePath(a.name) + _, err := a.management.Request(ctx, nil, path, commandDelete, []int{200}) + if err != nil { + return err + } + return nil +} + +func (a *AmqpQueue) Name(queueName string) IQueueSpecification { + a.name = queueName + return a +} + +func (a *AmqpQueue) GetName() string { + return a.name +} diff --git a/rabbitmq_amqp/amqp_queue_test.go b/rabbitmq_amqp/amqp_queue_test.go new file mode 100644 index 0000000..5e2d79c --- /dev/null +++ b/rabbitmq_amqp/amqp_queue_test.go @@ -0,0 +1,153 @@ +package rabbitmq_amqp + +import ( + "context" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("AMQP Queue test ", func() { + + var connection IConnection + var management IManagement + BeforeEach(func() { + connection = NewAmqpConnection() + Expect(connection).NotTo(BeNil()) + Expect(connection).To(BeAssignableToTypeOf(&AmqpConnection{})) + connectionSettings := NewConnectionSettings() + Expect(connectionSettings).NotTo(BeNil()) + Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) + err := connection.Open(context.TODO(), connectionSettings) + Expect(err).To(BeNil()) + management = connection.Management() + + }) + + AfterEach(func() { + Expect(connection.Close(context.Background())).To(BeNil()) + }) + + It("AMQP Queue Declare With Response and Delete should success ", func() { + const queueName = "AMQP Queue Declare With Response and Delete should success" + queueSpec := management.Queue(queueName) + queueInfo, err := queueSpec.Declare(context.TODO()) + Expect(err).To(BeNil()) + Expect(queueInfo).NotTo(BeNil()) + Expect(queueInfo.GetName()).To(Equal(queueName)) + Expect(queueInfo.IsDurable()).To(BeTrue()) + Expect(queueInfo.IsAutoDelete()).To(BeFalse()) + Expect(queueInfo.Exclusive()).To(BeFalse()) + Expect(queueInfo.Type()).To(Equal(Classic)) + err = queueSpec.Delete(context.TODO()) + Expect(err).To(BeNil()) + }) + + It("AMQP Queue Declare With Parameters and Delete should success ", func() { + const queueName = "AMQP Queue Declare With Parameters and Delete should success" + queueSpec := management.Queue(queueName).Exclusive(true). + AutoDelete(true). + QueueType(QueueType{Classic}). + MaxLengthBytes(CapacityGB(1)). + DeadLetterExchange("dead-letter-exchange"). + DeadLetterRoutingKey("dead-letter-routing-key") + queueInfo, err := queueSpec.Declare(context.TODO()) + Expect(err).To(BeNil()) + Expect(queueInfo).NotTo(BeNil()) + Expect(queueInfo.GetName()).To(Equal(queueName)) + Expect(queueInfo.IsDurable()).To(BeTrue()) + Expect(queueInfo.IsAutoDelete()).To(BeTrue()) + Expect(queueInfo.Exclusive()).To(BeTrue()) + Expect(queueInfo.Type()).To(Equal(Classic)) + Expect(queueInfo.GetLeader()).To(ContainSubstring("rabbit")) + Expect(len(queueInfo.GetReplicas())).To(BeNumerically(">", 0)) + + Expect(queueInfo.GetArguments()).To(HaveKeyWithValue("x-dead-letter-exchange", "dead-letter-exchange")) + Expect(queueInfo.GetArguments()).To(HaveKeyWithValue("x-dead-letter-routing-key", "dead-letter-routing-key")) + Expect(queueInfo.GetArguments()).To(HaveKeyWithValue("max-length-bytes", int64(1000000000))) + + err = queueSpec.Delete(context.TODO()) + Expect(err).To(BeNil()) + }) + + It("AMQP Declare Quorum Queue and Delete should success ", func() { + const queueName = "AMQP Declare Quorum Queue and Delete should success" + // Quorum queue will ignore Exclusive and AutoDelete settings + // since they are not supported by quorum queues + queueSpec := management.Queue(queueName). + Exclusive(true). + AutoDelete(true).QueueType(QueueType{Quorum}) + queueInfo, err := queueSpec.Declare(context.TODO()) + Expect(err).To(BeNil()) + Expect(queueInfo).NotTo(BeNil()) + Expect(queueInfo.GetName()).To(Equal(queueName)) + Expect(queueInfo.IsDurable()).To(BeTrue()) + Expect(queueInfo.IsAutoDelete()).To(BeFalse()) + Expect(queueInfo.Exclusive()).To(BeFalse()) + Expect(queueInfo.Type()).To(Equal(Quorum)) + err = queueSpec.Delete(context.TODO()) + Expect(err).To(BeNil()) + }) + + It("AMQP Declare Stream Queue and Delete should success ", func() { + const queueName = "AMQP Declare Stream Queue and Delete should success" + // Stream queue will ignore Exclusive and AutoDelete settings + // since they are not supported by quorum queues + queueSpec := management.Queue(queueName). + Exclusive(true). + AutoDelete(true).QueueType(QueueType{Stream}) + queueInfo, err := queueSpec.Declare(context.TODO()) + Expect(err).To(BeNil()) + Expect(queueInfo).NotTo(BeNil()) + Expect(queueInfo.GetName()).To(Equal(queueName)) + Expect(queueInfo.IsDurable()).To(BeTrue()) + Expect(queueInfo.IsAutoDelete()).To(BeFalse()) + Expect(queueInfo.Exclusive()).To(BeFalse()) + Expect(queueInfo.Type()).To(Equal(Stream)) + err = queueSpec.Delete(context.TODO()) + Expect(err).To(BeNil()) + }) + + It("AMQP Declare Queue with invalid type should fail ", func() { + const queueName = "AMQP Declare Queue with invalid type should fail" + queueSpec := management.Queue(queueName). + QueueType(QueueType{Type: "invalid"}) + _, err := queueSpec.Declare(context.TODO()) + Expect(err).NotTo(BeNil()) + }) + + It("AMQP Declare Queue should fail with Precondition fail ", func() { + + // The first queue is declared as Classic and it should succeed + // The second queue is declared as Quorum and it should fail since it is already declared as Classic + const queueName = "AMQP Declare Queue should fail with Precondition fail" + queueSpec := management.Queue(queueName).QueueType(QueueType{Classic}) + _, err := queueSpec.Declare(context.TODO()) + Expect(err).To(BeNil()) + queueSpecFail := management.Queue(queueName).QueueType(QueueType{Quorum}) + _, err = queueSpecFail.Declare(context.TODO()) + Expect(err).NotTo(BeNil()) + Expect(err).To(Equal(PreconditionFailed)) + err = queueSpec.Delete(context.TODO()) + Expect(err).To(BeNil()) + }) + + It("AMQP Declare Queue should fail during validation", func() { + + const queueName = "AMQP Declare Queue should fail during validation" + queueSpec := management.Queue(queueName).MaxLengthBytes(-1) + _, err := queueSpec.Declare(context.TODO()) + Expect(err).NotTo(BeNil()) + Expect(err).To(HaveOccurred()) + }) + + It("AMQP Declare Queue should create client name queue", func() { + queueSpec := management.QueueClientName() + queueInfo, err := queueSpec.Declare(context.TODO()) + Expect(err).To(BeNil()) + Expect(queueInfo).NotTo(BeNil()) + Expect(queueInfo.GetName()).To(ContainSubstring("client.gen-")) + err = queueSpec.Delete(context.TODO()) + Expect(err).To(BeNil()) + }) + +}) diff --git a/rabbitmq_amqp/common.go b/rabbitmq_amqp/common.go new file mode 100644 index 0000000..89ce280 --- /dev/null +++ b/rabbitmq_amqp/common.go @@ -0,0 +1,117 @@ +package rabbitmq_amqp + +import ( + "crypto/md5" + "encoding/base64" + "fmt" + "github.com/google/uuid" + "net/url" + "strings" +) + +type PercentCodec struct{} + +// Encode takes a string and returns its percent-encoded representation. +func (pc *PercentCodec) Encode(input string) string { + var encoded strings.Builder + + // Iterate over each character in the input string + for _, char := range input { + // Check if the character is an unreserved character (i.e., it doesn't need encoding) + if isUnreserved(char) { + encoded.WriteRune(char) // Append as is + } else { + // Encode character To %HH format + encoded.WriteString(fmt.Sprintf("%%%02X", char)) + } + } + + return encoded.String() +} + +// Decode takes a percent-encoded string and returns its decoded representation. +func (pc *PercentCodec) Decode(input string) (string, error) { + // Use url.QueryUnescape which properly decodes percent-encoded strings + decoded, err := url.QueryUnescape(input) + if err != nil { + return "", err + } + + return decoded, nil +} + +const ( + responseCode200 = 200 + responseCode201 = 201 + responseCode204 = 204 + responseCode409 = 409 + commandPut = "PUT" + commandGet = "GET" + commandPost = "POST" + commandDelete = "DELETE" + commandReplyTo = "$me" + managementNodeAddress = "/management" + linkPairName = "management-link-pair" +) + +const ( + Exchanges = "exchanges" + Key = "key" + Queues = "queues" + Bindings = "bindings" +) + +// isUnreserved checks if a character is an unreserved character in percent encoding +// Unreserved characters are: A-Z, a-z, 0-9, -, ., _, ~ +func isUnreserved(char rune) bool { + return (char >= 'A' && char <= 'Z') || + (char >= 'a' && char <= 'z') || + (char >= '0' && char <= '9') || + char == '-' || char == '.' || char == '_' || char == '~' +} + +func encodePathSegments(pathSegments string) string { + return (&PercentCodec{}).Encode(pathSegments) +} + +func queuePath(queueName string) string { + return "/" + Queues + "/" + encodePathSegments(queueName) +} + +func validatePositive(label string, value int64) error { + if value < 0 { + return fmt.Errorf("value for %s must be positive, got %d", label, value) + } + return nil +} + +//internal static string GenerateName(string prefix) +//{ +//string uuidStr = Guid.NewGuid().ToString(); +//byte[] uuidBytes = Encoding.ASCII.GetBytes(uuidStr); +//var md5 = MD5.Create(); +//byte[] digest = md5.ComputeHash(uuidBytes); +//return prefix + Convert.ToBase64String(digest) +//.Replace('+', '-') +//.Replace('/', '_') +//.Replace("=", ""); +//} + +func GenerateNameWithDefaultPrefix() string { + return GenerateName("client.gen-") +} + +// GenerateName generates a unique name with the given prefix +func GenerateName(prefix string) string { + + var uid = uuid.New() + var uuidBytes = []byte(uid.String()) + var _md5 = md5.New() + var digest = _md5.Sum(uuidBytes) + result := base64.StdEncoding.EncodeToString(digest) + result = strings.ReplaceAll(result, "+", "-") + result = strings.ReplaceAll(result, "/", "_") + result = strings.ReplaceAll(result, "=", "") + return prefix + result + +} diff --git a/rabbitmq_amqp/connection.go b/rabbitmq_amqp/connection.go new file mode 100644 index 0000000..39b1923 --- /dev/null +++ b/rabbitmq_amqp/connection.go @@ -0,0 +1,35 @@ +package rabbitmq_amqp + +import ( + "context" + "crypto/tls" +) + +type IConnectionSettings interface { + GetHost() string + Host(hostName string) IConnectionSettings + GetPort() int + Port(port int) IConnectionSettings + GetUser() string + User(userName string) IConnectionSettings + GetPassword() string + Password(password string) IConnectionSettings + GetVirtualHost() string + VirtualHost(virtualHost string) IConnectionSettings + GetScheme() string + GetContainerId() string + ContainerId(containerId string) IConnectionSettings + UseSsl(value bool) IConnectionSettings + IsSsl() bool + BuildAddress() string + TlsConfig(config *tls.Config) IConnectionSettings + GetTlsConfig() *tls.Config +} + +type IConnection interface { + Open(ctx context.Context, connectionSettings IConnectionSettings) error + Close(ctx context.Context) error + Management() IManagement + NotifyStatusChange(channel chan *StatusChanged) + GetStatus() int +} diff --git a/rabbitmq_amqp/converters.go b/rabbitmq_amqp/converters.go new file mode 100644 index 0000000..e7b2c0a --- /dev/null +++ b/rabbitmq_amqp/converters.go @@ -0,0 +1,79 @@ +package rabbitmq_amqp + +import ( + "errors" + "fmt" + "regexp" + "strconv" + "strings" +) + +const ( + UnitMb string = "mb" + UnitKb string = "kb" + UnitGb string = "gb" + UnitTb string = "tb" + kilobytesMultiplier = 1000 + megabytesMultiplier = 1000 * 1000 + gigabytesMultiplier = 1000 * 1000 * 1000 + terabytesMultiplier = 1000 * 1000 * 1000 * 1000 +) + +func CapacityBytes(value int64) int64 { + return int64(value) +} + +func CapacityKB(value int64) int64 { + return int64(value * kilobytesMultiplier) +} + +func CapacityMB(value int64) int64 { + return int64(value * megabytesMultiplier) +} + +func CapacityGB(value int64) int64 { + return int64(value * gigabytesMultiplier) +} + +func CapacityTB(value int64) int64 { + return int64(value * terabytesMultiplier) +} + +func CapacityFrom(value string) (int64, error) { + if value == "" || value == "0" { + return 0, nil + } + + match, err := regexp.Compile("^((kb|mb|gb|tb))") + if err != nil { + return 0, + errors.New(fmt.Sprintf("Capacity, invalid unit size format:%s", value)) + } + + foundUnitSize := strings.ToLower(value[len(value)-2:]) + + if match.MatchString(foundUnitSize) { + + size, err := strconv.Atoi(value[:len(value)-2]) + if err != nil { + return 0, errors.New(fmt.Sprintf("Capacity, Invalid number format: %s", value)) + } + switch foundUnitSize { + case UnitKb: + return CapacityKB(int64(size)), nil + + case UnitMb: + return CapacityMB(int64(size)), nil + + case UnitGb: + return CapacityGB(int64(size)), nil + + case UnitTb: + return CapacityTB(int64(size)), nil + } + + } + + return 0, + errors.New(fmt.Sprintf("Capacity, Invalid unit size format: %s", value)) +} diff --git a/rabbitmq_amqp/converters_test.go b/rabbitmq_amqp/converters_test.go new file mode 100644 index 0000000..504c835 --- /dev/null +++ b/rabbitmq_amqp/converters_test.go @@ -0,0 +1,55 @@ +package rabbitmq_amqp + +import ( + "fmt" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Converters", func() { + + It("Converter from number", func() { + Expect(CapacityBytes(100)).To(Equal(int64(100))) + Expect(CapacityKB(1)).To(Equal(int64(1000))) + Expect(CapacityMB(1)).To(Equal(int64(1000 * 1000))) + Expect(CapacityGB(1)).To(Equal(int64(1000 * 1000 * 1000))) + Expect(CapacityTB(1)).To(Equal(int64(1000 * 1000 * 1000 * 1000))) + }) + + It("Converter from string", func() { + v, err := CapacityFrom("1KB") + Expect(err).NotTo(HaveOccurred()) + Expect(v).To(Equal(int64(1000))) + + v, err = CapacityFrom("1MB") + Expect(err).NotTo(HaveOccurred()) + Expect(v).To(Equal(int64(1000 * 1000))) + + v, err = CapacityFrom("1GB") + Expect(err).NotTo(HaveOccurred()) + Expect(v).To(Equal(int64(1000 * 1000 * 1000))) + + v, err = CapacityFrom("1tb") + Expect(err).NotTo(HaveOccurred()) + Expect(v).To(Equal(int64(1000 * 1000 * 1000 * 1000))) + }) + + It("Converter from string logError", func() { + v, err := CapacityFrom("10LL") + Expect(fmt.Sprintf("%s", err)). + To(ContainSubstring("Invalid unit size format")) + + v, err = CapacityFrom("aGB") + Expect(fmt.Sprintf("%s", err)). + To(ContainSubstring("Invalid number format")) + + v, err = CapacityFrom("") + Expect(v).To(Equal(int64(0))) + Expect(err).To(BeNil()) + + v, err = CapacityFrom("0") + Expect(v).To(Equal(int64(0))) + Expect(err).To(BeNil()) + }) + +}) diff --git a/rabbitmq_amqp/entities.go b/rabbitmq_amqp/entities.go new file mode 100644 index 0000000..e373ee7 --- /dev/null +++ b/rabbitmq_amqp/entities.go @@ -0,0 +1,52 @@ +package rabbitmq_amqp + +import ( + "context" +) + +type TQueueType string + +const ( + Quorum TQueueType = "quorum" + Classic TQueueType = "classic" + Stream TQueueType = "stream" +) + +type QueueType struct { + Type TQueueType +} + +func (e QueueType) String() string { + return string(e.Type) +} + +type IEntityInfoSpecification[T any] interface { + Declare(ctx context.Context) (T, error) + Delete(ctx context.Context) error +} + +type IQueueSpecification interface { + GetName() string + Exclusive(isExclusive bool) IQueueSpecification + IsExclusive() bool + AutoDelete(isAutoDelete bool) IQueueSpecification + IsAutoDelete() bool + IEntityInfoSpecification[IQueueInfo] + QueueType(queueType QueueType) IQueueSpecification + GetQueueType() TQueueType + + MaxLengthBytes(length int64) IQueueSpecification + DeadLetterExchange(dlx string) IQueueSpecification + DeadLetterRoutingKey(dlrk string) IQueueSpecification +} + +type IQueueInfo interface { + GetName() string + IsDurable() bool + IsAutoDelete() bool + Exclusive() bool + Type() TQueueType + GetLeader() string + GetReplicas() []string + GetArguments() map[string]any +} diff --git a/rabbitmq_amqp/life_cycle.go b/rabbitmq_amqp/life_cycle.go new file mode 100644 index 0000000..10b4dd5 --- /dev/null +++ b/rabbitmq_amqp/life_cycle.go @@ -0,0 +1,53 @@ +package rabbitmq_amqp + +import "sync" + +const ( + Open = iota + Reconnecting = iota + Closing = iota + Closed = iota +) + +type StatusChanged struct { + From int + To int +} + +type LifeCycle struct { + status int + chStatusChanged chan *StatusChanged + mutex *sync.Mutex +} + +func NewLifeCycle() *LifeCycle { + return &LifeCycle{ + status: Closed, + mutex: &sync.Mutex{}, + } +} + +func (l *LifeCycle) Status() int { + l.mutex.Lock() + defer l.mutex.Unlock() + return l.status +} + +func (l *LifeCycle) SetStatus(value int) { + l.mutex.Lock() + defer l.mutex.Unlock() + if l.status == value { + return + } + + oldState := l.status + l.status = value + + if l.chStatusChanged == nil { + return + } + l.chStatusChanged <- &StatusChanged{ + From: oldState, + To: value, + } +} diff --git a/rabbitmq_amqp/management.go b/rabbitmq_amqp/management.go new file mode 100644 index 0000000..a6130ec --- /dev/null +++ b/rabbitmq_amqp/management.go @@ -0,0 +1,16 @@ +package rabbitmq_amqp + +import ( + "context" +) + +type IManagement interface { + Open(ctx context.Context, connection IConnection) error + Close(ctx context.Context) error + Queue(queueName string) IQueueSpecification + QueueClientName() IQueueSpecification + GetStatus() int + NotifyStatusChange(channel chan *StatusChanged) + Request(ctx context.Context, body any, path string, method string, + expectedResponseCodes []int) (map[string]any, error) +} diff --git a/rabbitmq_amqp/pkg_suite_test.go b/rabbitmq_amqp/pkg_suite_test.go new file mode 100644 index 0000000..08ee73d --- /dev/null +++ b/rabbitmq_amqp/pkg_suite_test.go @@ -0,0 +1,13 @@ +package rabbitmq_amqp_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestPkg(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Pkg Suite") +}