22
33
44import java .sql .Connection ;
5+ import java .sql .DriverManager ;
6+ import java .sql .PreparedStatement ;
7+ import java .sql .ResultSet ;
58import java .sql .SQLException ;
9+ import java .sql .Timestamp ;
610import java .util .List ;
11+ import java .util .Map .Entry ;
712import java .util .logging .Level ;
813import java .util .logging .Logger ;
914
@@ -34,17 +39,47 @@ public class JDBCOutput implements MessageOutput {
3439 private String username ;
3540 private String password ;
3641 private String driver ;
42+ private boolean shutdown ;
3743
3844 private Connection connection ;
45+ private PreparedStatement logInsert ;
46+ private PreparedStatement logInsertAttribute ;
3947
4048 @ Inject
41- public JDBCOutput (@ Assisted Stream stream , @ Assisted Configuration conf ) {
49+ public JDBCOutput (@ Assisted Stream stream , @ Assisted Configuration conf ) throws SQLException {
4250 url = conf .getString ("url" );
4351 username = conf .getString ("username" );
4452 password = conf .getString ("password" );
4553 driver = conf .getString ("driver" );
4654 log .info ("Creating JDBC output " + url );
55+
56+ if (driver != null && !driver .trim ().isEmpty ()) {
57+ try {
58+ Class .forName (driver );
59+ } catch (Exception e ) {
60+ log .log (Level .SEVERE , "Failed to find/register driver (" + driver + "): " + e .getMessage (), e );
61+ }
62+ }
63+
64+ reconnect ();
4765 }
66+
67+ private void reconnect () throws SQLException {
68+ if (connection != null ) {
69+ try {
70+ connection .close ();
71+ } catch (SQLException e ) {
72+ log .log (Level .WARNING , e .getMessage (), e );
73+ }
74+ }
75+
76+ connection = username != null && !username .trim ().isEmpty () ?
77+ DriverManager .getConnection (url , username .trim (), password != null ? password .trim () : null ) :
78+ DriverManager .getConnection (url );
79+
80+ logInsert = connection .prepareStatement ("insert into log (message_date, message_id, source, message) values (?, ?, ?, ?)" );
81+ logInsertAttribute = connection .prepareStatement ("insert into log_attribute (message_id, name, value) values (?, ?, ?)" );
82+ }
4883
4984 @ Override
5085 public boolean isRunning () {
@@ -53,6 +88,26 @@ public boolean isRunning() {
5388
5489 @ Override
5590 public void stop () {
91+ shutdown = true ;
92+
93+ if (logInsertAttribute != null ) {
94+ try {
95+ logInsertAttribute .close ();
96+ } catch (SQLException e ) {
97+ log .log (Level .WARNING , e .getMessage (), e );
98+ }
99+ logInsert = null ;
100+ }
101+
102+ if (logInsert != null ) {
103+ try {
104+ logInsert .close ();
105+ } catch (SQLException e ) {
106+ log .log (Level .WARNING , e .getMessage (), e );
107+ }
108+ logInsert = null ;
109+ }
110+
56111 if (connection != null ) {
57112 try {
58113 connection .close ();
@@ -72,6 +127,49 @@ public void write(List<Message> msgs) throws Exception {
72127
73128 @ Override
74129 public void write (Message msg ) throws Exception {
130+ if (shutdown ) {
131+ return ;
132+ }
133+
134+ try {
135+ if (connection == null ) {
136+ reconnect ();
137+ }
138+
139+ connection .setAutoCommit (false );
140+ try {
141+ int index = 1 ;
142+ logInsert .setTimestamp (index ++, new Timestamp (msg .getTimestamp ().getMillis ()));
143+ logInsert .setString (index ++, msg .getId ());
144+ logInsert .setString (index ++, msg .getSource ());
145+ logInsert .setString (index ++, msg .getMessage ());
146+ logInsert .execute ();
147+ Object id = null ;
148+ ResultSet ids = logInsert .getGeneratedKeys ();
149+ while (ids != null && ids .next ()) {
150+ id = ids .getObject (1 );
151+ }
152+ if (id != null ) {
153+ for (Entry <String , Object > e : msg .getFieldsEntries ()) {
154+ String name = e .getKey ();
155+ Object value = e .getValue ();
156+ logInsertAttribute .setObject (1 , id );
157+ logInsertAttribute .setString (2 , name );
158+ logInsertAttribute .setObject (3 , value );
159+ logInsertAttribute .execute ();
160+ }
161+ } else {
162+ throw new SQLException ("Failed to generate ID for primary log record!" );
163+ }
164+ } finally {
165+ connection .rollback ();
166+ connection .commit ();
167+ connection .setAutoCommit (true );
168+ }
169+ } catch (SQLException e ) {
170+ log .log (Level .WARNING , "JDBC output error: " + e .getMessage (), e );
171+ connection = null ;
172+ }
75173 }
76174
77175 public interface Factory extends MessageOutput .Factory <JDBCOutput > {
0 commit comments